diff --git a/.contributors.yaml b/.contributors.yaml new file mode 100644 index 00000000..64a3937a --- /dev/null +++ b/.contributors.yaml @@ -0,0 +1,59 @@ +- path: lib/io/event/priority_heap.rb + time: 2021-02-12T12:19:44+01:00 + author: + name: Wander Hillen + email: wjw.hillen@gmail.com + +- path: lib/io/event/priority_heap.rb + time: 2021-02-12T13:19:56+01:00 + author: + name: Wander Hillen + email: wjw.hillen@gmail.com + +- path: lib/io/event/priority_heap.rb + time: 2021-02-12T13:28:58+01:00 + author: + name: Wander Hillen + email: wjw.hillen@gmail.com + +- path: lib/io/event/priority_heap.rb + time: 2021-02-13T18:44:46+13:00 + author: + name: Samuel Williams + email: samuel.williams@oriontransfer.co.nz + +- path: lib/io/event/priority_heap.rb + time: 2021-02-13T12:40:15+01:00 + author: + name: Wander Hillen + email: wjw.hillen@gmail.com + +- path: lib/io/event/priority_heap.rb + time: 2021-02-13T12:47:51+01:00 + author: + name: Wander Hillen + email: wjw.hillen@gmail.com + +- path: lib/io/event/priority_heap.rb + time: 2022-09-02T13:45:20+12:00 + author: + name: Samuel Williams + email: samuel.williams@oriontransfer.co.nz + +- path: lib/io/event/priority_heap.rb + time: 2022-09-02T13:45:20+12:00 + author: + name: Samuel Williams + email: samuel.williams@oriontransfer.co.nz + +- path: lib/io/event/priority_heap.rb + time: 2022-10-13T11:06:34+13:00 + author: + name: Samuel Williams + email: samuel.williams@oriontransfer.co.nz + +- path: lib/io/event/priority_heap.rb + time: 2023-04-12T17:25:59+12:00 + author: + name: Samuel Williams + email: samuel.williams@oriontransfer.co.nz diff --git a/io-event.gemspec b/io-event.gemspec index 42ee472d..ff9b397e 100644 --- a/io-event.gemspec +++ b/io-event.gemspec @@ -7,7 +7,7 @@ Gem::Specification.new do |spec| spec.version = IO::Event::VERSION spec.summary = "An event loop." - spec.authors = ["Samuel Williams", "Math Ieu", "Benoit Daloze", "Bruno Sutic", "Alex Matchneer", "Delton Ding", "Pavel Rosický"] + spec.authors = ["Samuel Williams", "Math Ieu", "Wander Hillen", "Benoit Daloze", "Bruno Sutic", "Alex Matchneer", "Delton Ding", "Pavel Rosický"] spec.license = "MIT" spec.cert_chain = ['release.cert'] diff --git a/lib/io/event.rb b/lib/io/event.rb index 4f6efcb2..f5cb70c0 100644 --- a/lib/io/event.rb +++ b/lib/io/event.rb @@ -1,10 +1,11 @@ # frozen_string_literal: true # Released under the MIT License. -# Copyright, 2021-2023, by Samuel Williams. +# Copyright, 2021-2024, by Samuel Williams. require_relative 'event/version' require_relative 'event/selector' +require_relative 'event/timers' begin require 'IO_Event' diff --git a/lib/io/event/priority_heap.rb b/lib/io/event/priority_heap.rb new file mode 100644 index 00000000..3f564e16 --- /dev/null +++ b/lib/io/event/priority_heap.rb @@ -0,0 +1,148 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2021, by Wander Hillen. +# Copyright, 2021-2024, by Samuel Williams. + +class IO + module Event + # A priority queue implementation using a standard binary minheap. It uses straight comparison + # of its contents to determine priority. + # See for explanations of the main methods. + class PriorityHeap + def initialize + # The heap is represented with an array containing a binary tree. See + # https://en.wikipedia.org/wiki/Binary_heap#Heap_implementation for how this array + # is built up. + @contents = [] + end + + # Returns the earliest timer or nil if the heap is empty. + def peek + @contents[0] + end + + # Returns the number of elements in the heap + def size + @contents.size + end + + # Returns the earliest timer if the heap is non-empty and removes it from the heap. + # Returns nil if the heap is empty. (and doesn't change the heap in that case) + def pop + # If the heap is empty: + if @contents.empty? + return nil + end + + # If we have only one item, no swapping is required: + if @contents.size == 1 + return @contents.pop + end + + # Take the root of the tree: + value = @contents[0] + + # Remove the last item in the tree: + last = @contents.pop + + # Overwrite the root of the tree with the item: + @contents[0] = last + + # Bubble it down into place: + bubble_down(0) + + # validate! + + return value + end + + # Inserts a new timer into the heap, then rearranges elements until the heap invariant is true again. + def push(element) + # Insert the item at the end of the heap: + @contents.push(element) + + # Bubble it up into position: + bubble_up(@contents.size - 1) + + # validate! + + return self + end + + # Empties out the heap, discarding all elements + def clear! + @contents = [] + end + + # Validate the heap invariant. Every element except the root must not be smaller than + # its parent element. Note that it MAY be equal. + def valid? + # notice we skip index 0 on purpose, because it has no parent + (1..(@contents.size - 1)).all? { |e| @contents[e] >= @contents[(e - 1) / 2] } + end + + private + + # Left here for reference, but unused. + # def swap(i, j) + # @contents[i], @contents[j] = @contents[j], @contents[i] + # end + + def bubble_up(index) + parent_index = (index - 1) / 2 # watch out, integer division! + + while index > 0 && @contents[index] < @contents[parent_index] + # if the node has a smaller value than its parent, swap these nodes + # to uphold the minheap invariant and update the index of the 'current' + # node. If the node is already at index 0, we can also stop because that + # is the root of the heap. + # swap(index, parent_index) + @contents[index], @contents[parent_index] = @contents[parent_index], @contents[index] + + index = parent_index + parent_index = (index - 1) / 2 # watch out, integer division! + end + end + + def bubble_down(index) + swap_value = 0 + swap_index = nil + + while true + left_index = (2 * index) + 1 + left_value = @contents[left_index] + + if left_value.nil? + # This node has no children so it can't bubble down any further. + # We're done here! + return + end + + # Determine which of the child nodes has the smallest value: + right_index = left_index + 1 + right_value = @contents[right_index] + + if right_value.nil? or right_value > left_value + swap_value = left_value + swap_index = left_index + else + swap_value = right_value + swap_index = right_index + end + + if @contents[index] < swap_value + # No need to swap, the minheap invariant is already satisfied: + return + else + # At least one of the child node has a smaller value than the current node, swap current node with that child and update current node for if it might need to bubble down even further: + # swap(index, swap_index) + @contents[index], @contents[swap_index] = @contents[swap_index], @contents[index] + + index = swap_index + end + end + end + end + end +end diff --git a/lib/io/event/timers.rb b/lib/io/event/timers.rb new file mode 100644 index 00000000..ef2183da --- /dev/null +++ b/lib/io/event/timers.rb @@ -0,0 +1,106 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +require_relative 'priority_heap' + +class IO + module Event + class Timers + class Handle + def initialize(offset, block) + @offset = offset + @block = block + end + + def < other + @offset < other.offset + end + + def > other + @offset > other.offset + end + + attr :offset + attr :block + + def call(...) + @block.call(...) + end + + def cancel! + @block = nil + end + + def cancelled? + @block.nil? + end + end + + def initialize + @heap = PriorityHeap.new + @scheduled = [] + end + + def size + flush! + + return @heap.size + end + + def schedule(offset, block) + handle = Handle.new(offset, block) + @scheduled << handle + + return handle + end + + def after(timeout, &block) + schedule(now + timeout, block) + end + + def wait_interval(now = self.now) + flush! + + while handle = @heap.peek + if handle.cancelled? + @heap.pop + else + return handle.offset - now + end + end + end + + def now + ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + end + + def fire(now = self.now) + # Flush scheduled timers into the heap: + flush! + + # Get the earliest timer: + while handle = @heap.peek + if handle.cancelled? + @heap.pop + elsif handle.offset <= now + # Remove the earliest timer from the heap: + @heap.pop + + # Call the block: + handle.call(now) + else + break + end + end + end + + protected def flush! + while handle = @scheduled.pop + @heap.push(handle) unless handle.cancelled? + end + end + end + end +end diff --git a/license.md b/license.md index 65084862..61b2a5da 100644 --- a/license.md +++ b/license.md @@ -1,5 +1,6 @@ # MIT License +Copyright, 2021, by Wander Hillen. Copyright, 2021-2024, by Samuel Williams. Copyright, 2021, by Delton Ding. Copyright, 2021-2024, by Benoit Daloze. diff --git a/test/io/event/priority_heap.rb b/test/io/event/priority_heap.rb new file mode 100644 index 00000000..8f51e681 --- /dev/null +++ b/test/io/event/priority_heap.rb @@ -0,0 +1,84 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +require 'io/event/priority_heap' + +describe IO::Event::PriorityHeap do + let(:priority_heap) {subject.new} + + with "empty heap" do + it "should return nil when the first element is requested" do + expect(priority_heap.peek).to be_nil + end + + it "should return nil when the first element is extracted" do + expect(priority_heap.pop).to be_nil + end + + it "should report its size as zero" do + expect(priority_heap.size).to be(:zero?) + end + end + + it "returns the same element after inserting a single element" do + priority_heap.push(1) + expect(priority_heap.size).to be == 1 + expect(priority_heap.pop).to be == 1 + expect(priority_heap.size).to be(:zero?) + end + + it "should return inserted elements in ascending order no matter the insertion order" do + (1..10).to_a.shuffle.each do |e| + priority_heap.push(e) + end + + expect(priority_heap.size).to be == 10 + expect(priority_heap.peek).to be == 1 + + result = [] + 10.times do + result << priority_heap.pop + end + + expect(result.size).to be == 10 + expect(priority_heap.size).to be(:zero?) + expect(result.sort).to be == result + end + + with "maintaining the heap invariant" do + it "for empty heaps" do + expect(priority_heap).to be(:valid?) + end + + it "for heap of size 1" do + priority_heap.push(123) + expect(priority_heap).to be(:valid?) + end + # Exhaustive testing of all permutations of [1..6] + it "for all permutations of size 6" do + [1,2,3,4,5,6].permutation do |arr| + priority_heap.clear! + arr.each { |e| priority_heap.push(e) } + expect(priority_heap).to be(:valid?) + end + end + + # A few examples with more elements (but not ALL permutations) + it "for larger amounts of values" do + 5.times do + priority_heap.clear! + (1..1000).to_a.shuffle.each { |e| priority_heap.push(e) } + expect(priority_heap).to be(:valid?) + end + end + + # What if we insert several of the same item along with others? + it "with several elements of the same value" do + test_values = (1..10).to_a + [4] * 5 + test_values.each { |e| priority_heap.push(e) } + expect(priority_heap).to be(:valid?) + end + end +end diff --git a/test/io/event/timers.rb b/test/io/event/timers.rb new file mode 100644 index 00000000..44911d0d --- /dev/null +++ b/test/io/event/timers.rb @@ -0,0 +1,74 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +require 'io/event/timers' + +describe IO::Event::Timers do + let(:timers) {subject.new} + + it "should register an event" do + fired = false + + callback = proc do |_time| + fired = true + end + + timers.schedule(0.1, callback) + + expect(timers.size).to be == 1 + + timers.fire(0.15) + + expect(timers.size).to be == 0 + + expect(fired).to be == true + end + + it "should register timers in order" do + fired = [] + + times = [0.95, 0.1, 0.3, 0.5, 0.4, 0.2, 0.01, 0.9] + + times.each do |requested_time| + callback = proc do |_time| + fired << requested_time + end + + timers.schedule(requested_time, callback) + end + + timers.fire(0.5) + expect(fired).to be == times.sort.first(6) + + timers.fire(1.0) + expect(fired).to be == times.sort + end + + it "should fire timers with the time they were fired at" do + fired_at = :not_fired + + callback = proc do |time| + # The time we actually were fired at: + fired_at = time + end + + timers.schedule(0.5, callback) + + timers.fire(1.0) + + expect(fired_at).to be == 1.0 + end + + it "should flush cancelled timers" do + callback = proc{} + + 10.times do + handle = timers.schedule(0.1, callback) + handle.cancel! + end + + expect(timers.size).to be == 0 + end +end