From aff09d6d8df14cc94fd15ebd592354d588fefcca Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 12 Jul 2024 14:22:45 +1200 Subject: [PATCH] More documentation + documentation coverage. --- .github/workflows/documentation-coverage.yaml | 25 +++++++++++ .../{coverage.yaml => test-coverage.yaml} | 2 +- gems.rb | 2 + lib/async.rb | 1 + lib/async/condition.rb | 1 + lib/async/idler.rb | 18 ++++++++ lib/async/list.rb | 9 ++-- lib/async/node.rb | 16 +++++++ lib/async/notification.rb | 2 + lib/async/queue.rb | 42 +++++++++++++++++- lib/async/reactor.rb | 2 + lib/async/scheduler.rb | 28 +++++++++++- lib/async/task.rb | 44 +++++++++++++++---- lib/async/variable.rb | 16 +++++++ lib/async/waiter.rb | 4 ++ lib/async/wrapper.rb | 4 +- 16 files changed, 199 insertions(+), 17 deletions(-) create mode 100644 .github/workflows/documentation-coverage.yaml rename .github/workflows/{coverage.yaml => test-coverage.yaml} (98%) diff --git a/.github/workflows/documentation-coverage.yaml b/.github/workflows/documentation-coverage.yaml new file mode 100644 index 00000000..b3bac9a7 --- /dev/null +++ b/.github/workflows/documentation-coverage.yaml @@ -0,0 +1,25 @@ +name: Documentation Coverage + +on: [push, pull_request] + +permissions: + contents: read + +env: + CONSOLE_OUTPUT: XTerm + COVERAGE: PartialSummary + +jobs: + validate: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - uses: ruby/setup-ruby@v1 + with: + ruby-version: "3.3" + bundler-cache: true + + - name: Validate coverage + timeout-minutes: 5 + run: bundle exec bake decode:index:coverage lib diff --git a/.github/workflows/coverage.yaml b/.github/workflows/test-coverage.yaml similarity index 98% rename from .github/workflows/coverage.yaml rename to .github/workflows/test-coverage.yaml index 83a98ff1..982806e5 100644 --- a/.github/workflows/coverage.yaml +++ b/.github/workflows/test-coverage.yaml @@ -1,4 +1,4 @@ -name: Coverage +name: Test Coverage on: [push, pull_request] diff --git a/gems.rb b/gems.rb index a66fbac8..924874f3 100644 --- a/gems.rb +++ b/gems.rb @@ -25,4 +25,6 @@ gem "covered" gem "sus" gem "sus-fixtures-async" + + gem "decode", path: "../../ioquatix/decode" end diff --git a/lib/async.rb b/lib/async.rb index 83591b0a..d1b0b774 100644 --- a/lib/async.rb +++ b/lib/async.rb @@ -10,5 +10,6 @@ require_relative "kernel/async" require_relative "kernel/sync" +# Asynchronous programming framework. module Async end diff --git a/lib/async/condition.rb b/lib/async/condition.rb index 98c63b31..703e8be4 100644 --- a/lib/async/condition.rb +++ b/lib/async/condition.rb @@ -11,6 +11,7 @@ module Async # A synchronization primitive, which allows fibers to wait until a particular condition is (edge) triggered. # @public Since `stable-v1`. class Condition + # Create a new condition. def initialize @waiting = List.new end diff --git a/lib/async/idler.rb b/lib/async/idler.rb index 9226d62a..5cc9fa7e 100644 --- a/lib/async/idler.rb +++ b/lib/async/idler.rb @@ -4,13 +4,28 @@ # Copyright, 2024, by Samuel Williams. module Async + # A load balancing mechanism that can be used process work when the system is idle. class Idler + # Create a new idler. + # @public Since `stable-v2`. + # + # @parameter maximum_load [Numeric] The maximum load before we start shedding work. + # @parameter backoff [Numeric] The initial backoff time, used for delaying work. + # @parameter parent [Interface(:async) | Nil] The parent task to use for async operations. def initialize(maximum_load = 0.8, backoff: 0.01, parent: nil) @maximum_load = maximum_load @backoff = backoff @parent = parent end + # Wait until the system is idle, then execute the given block in a new task. + # + # @asynchronous Executes the given block concurrently. + # + # @parameter arguments [Array] The arguments to pass to the block. + # @parameter parent [Interface(:async) | Nil] The parent task to use for async operations. + # @parameter options [Hash] The options to pass to the task. + # @yields {|task| ...} When the system is idle, the block will be executed in a new task. def async(*arguments, parent: (@parent or Task.current), **options, &block) wait @@ -18,6 +33,9 @@ def async(*arguments, parent: (@parent or Task.current), **options, &block) parent.async(*arguments, **options, &block) end + # Wait until the system is idle, according to the maximum load specified. + # + # If the scheduler is overloaded, this method will sleep for an exponentially increasing amount of time. def wait scheduler = Fiber.scheduler backoff = nil diff --git a/lib/async/list.rb b/lib/async/list.rb index bb78579d..9a1610bb 100644 --- a/lib/async/list.rb +++ b/lib/async/list.rb @@ -13,7 +13,7 @@ def initialize @size = 0 end - # Print a short summary of the list. + # @returns [String] A short summary of the list. def to_s sprintf("#<%s:0x%x size=%d>", self.class.name, object_id, @size) end @@ -36,12 +36,13 @@ def to_a return items end - # Points at the end of the list. + # @attribute [Node | Nil] Points at the end of the list. attr_accessor :head - # Points at the start of the list. + # @attribute [Node | Nil] Points at the start of the list. attr_accessor :tail + # @attribute [Integer] The number of nodes in the list. attr :size # A callback that is invoked when an item is added to the list. @@ -64,6 +65,7 @@ def append(node) return added(node) end + # Prepend a node to the start of the list. def prepend(node) if node.head raise ArgumentError, "Node is already in a list!" @@ -224,6 +226,7 @@ def last return nil end + # Shift the first node off the list, if it is not empty. def shift if node = first remove!(node) diff --git a/lib/async/node.rb b/lib/async/node.rb index b8bd2b9e..6038ad6a 100644 --- a/lib/async/node.rb +++ b/lib/async/node.rb @@ -12,6 +12,7 @@ module Async # A list of children tasks. class Children < List + # Create an empty list of children tasks. def initialize super @transient_count = 0 @@ -109,6 +110,9 @@ def transient? @transient end + # Annotate the node with a description. + # + # @parameter annotation [String] The description to annotate the node with. def annotate(annotation) if block_given? begin @@ -123,6 +127,9 @@ def annotate(annotation) end end + # A description of the node, including the annotation and object name. + # + # @returns [String] The description of the node. def description @object_name ||= "#{self.class}:#{format '%#018x', object_id}#{@transient ? ' transient' : nil}" @@ -135,10 +142,14 @@ def description end end + # Provides a backtrace for nodes that have an active execution context. + # + # @returns [Array(Thread::Backtrace::Locations) | Nil] The backtrace of the node, if available. def backtrace(*arguments) nil end + # @returns [String] A description of the node. def to_s "\#<#{self.description}>" end @@ -255,10 +266,15 @@ def stop(later = false) end end + # Whether the node has been stopped. def stopped? @children.nil? end + # Print the hierarchy of the task tree from the given node. + # + # @parameter out [IO] The output stream to write to. + # @parameter backtrace [Boolean] Whether to print the backtrace of each node. def print_hierarchy(out = $stdout, backtrace: true) self.traverse do |node, level| indent = "\t" * level diff --git a/lib/async/notification.rb b/lib/async/notification.rb index 99044b28..2d52d86a 100644 --- a/lib/async/notification.rb +++ b/lib/async/notification.rb @@ -29,5 +29,7 @@ def transfer end end end + + private_constant Signal end end diff --git a/lib/async/queue.rb b/lib/async/queue.rb index 47015a4a..77a8f9ad 100644 --- a/lib/async/queue.rb +++ b/lib/async/queue.rb @@ -11,6 +11,9 @@ module Async # A queue which allows items to be processed in order. # @public Since `stable-v1`. class Queue < Notification + # Create a new queue. + # + # @parameter parent [Interface(:async) | Nil] The parent task to use for async operations. def initialize(parent: nil) super() @@ -18,28 +21,34 @@ def initialize(parent: nil) @parent = parent end + # @attribute [Array] The items in the queue. attr :items + # @returns [Integer] The number of items in the queue. def size @items.size end - + + # @returns [Boolean] Whether the queue is empty. def empty? @items.empty? end + # Add an item to the queue. def <<(item) @items << item self.signal unless self.empty? end + # Add multiple items to the queue. def enqueue(*items) @items.concat(items) self.signal unless self.empty? end + # Remove and return the next item from the queue. def dequeue while @items.empty? self.wait @@ -48,12 +57,21 @@ def dequeue @items.shift end + # Process each item in the queue. + # + # @asynchronous Executes the given block concurrently for each item. + # + # @parameter arguments [Array] The arguments to pass to the block. + # @parameter parent [Interface(:async) | Nil] The parent task to use for async operations. + # @parameter options [Hash] The options to pass to the task. + # @yields {|task| ...} When the system is idle, the block will be executed in a new task. def async(parent: (@parent or Task.current), **options, &block) while item = self.dequeue parent.async(item, **options, &block) end end + # Enumerate each item in the queue. def each while item = self.dequeue yield item @@ -61,8 +79,12 @@ def each end end + # A queue which limits the number of items that can be enqueued. # @public Since `stable-v1`. class LimitedQueue < Queue + # Create a new limited queue. + # + # @parameter limit [Integer] The maximum number of items that can be enqueued. def initialize(limit = 1, **options) super(**options) @@ -71,6 +93,7 @@ def initialize(limit = 1, **options) @full = Notification.new end + # @attribute [Integer] The maximum number of items that can be enqueued. attr :limit # @returns [Boolean] Whether trying to enqueue an item would block. @@ -78,6 +101,11 @@ def limited? @items.size >= @limit end + # Add an item to the queue. + # + # If the queue is full, this method will block until there is space available. + # + # @parameter item [Object] The item to add to the queue. def <<(item) while limited? @full.wait @@ -86,7 +114,12 @@ def <<(item) super end - def enqueue *items + # Add multiple items to the queue. + # + # If the queue is full, this method will block until there is space available. + # + # @parameter items [Array] The items to add to the queue. + def enqueue(*items) while !items.empty? while limited? @full.wait @@ -99,6 +132,11 @@ def enqueue *items end end + # Remove and return the next item from the queue. + # + # If the queue is empty, this method will block until an item is available. + # + # @returns [Object] The next item in the queue. def dequeue item = super diff --git a/lib/async/reactor.rb b/lib/async/reactor.rb index 232f8835..30ce895d 100644 --- a/lib/async/reactor.rb +++ b/lib/async/reactor.rb @@ -15,12 +15,14 @@ def self.run(...) Async(...) end + # Initialize the reactor and assign it to the current Fiber scheduler. def initialize(...) super Fiber.set_scheduler(self) end + # Close the reactor and remove it from the current Fiber scheduler. def scheduler_close self.close end diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index 548e10cc..b665d372 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -16,7 +16,11 @@ module Async # Handles scheduling of fibers. Implements the fiber scheduler interface. class Scheduler < Node + # Raised when an operation is attempted on a closed scheduler. class ClosedError < RuntimeError + # Create a new error. + # + # @parameter message [String] The error message. def initialize(message = "Scheduler is closed!") super end @@ -28,6 +32,11 @@ def self.supported? true end + # Create a new scheduler. + # + # @public Since `stable-v1`. + # @parameter parent [Node | Nil] The parent node to use for task hierarchy. + # @parameter selector [IO::Event::Selector] The selector to use for event handling. def initialize(parent = nil, selector: nil) super(parent) @@ -63,6 +72,9 @@ def load end end + # Invoked when the fiber scheduler is being closed. + # + # Executes the run loop until all tasks are finished, then closes the scheduler. def scheduler_close # If the execution context (thread) was handling an exception, we want to exit as quickly as possible: unless $! @@ -79,6 +91,7 @@ def terminate end end + # Terminate all child tasks and close the scheduler. # @public Since `stable-v1`. def close # It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly. @@ -108,6 +121,7 @@ def closed? @selector.nil? end + # @returns [String] A description of the scheduler. def to_s "\#<#{self.description} #{@children&.size || 0} children (#{stopped? ? 'stopped' : 'running'})>" end @@ -135,10 +149,20 @@ def push(fiber) @selector.push(fiber) end - def raise(*arguments) - @selector.raise(*arguments) + # Raise an exception on a specified fiber with the given arguments. + # + # This internally schedules the current fiber to be ready, before raising the exception, so that it will later resume execution. + # + # @parameter fiber [Fiber] The fiber to raise the exception on. + # @parameter *arguments [Array] The arguments to pass to the fiber. + def raise(...) + @selector.raise(...) end + # Resume execution of the specified fiber. + # + # @parameter fiber [Fiber] The fiber to resume. + # @parameter arguments [Array] The arguments to pass to the fiber. def resume(fiber, *arguments) @selector.resume(fiber, *arguments) end diff --git a/lib/async/task.rb b/lib/async/task.rb index c38d6470..ceedb357 100644 --- a/lib/async/task.rb +++ b/lib/async/task.rb @@ -16,15 +16,21 @@ module Async # Raised when a task is explicitly stopped. class Stop < Exception + # Used to defer stopping the current task until later. class Later + # Create a new stop later operation. + # + # @parameter task [Task] The task to stop later. def initialize(task) @task = task end + # @returns [Boolean] Whether the task is alive. def alive? true end + # Transfer control to the operation - this will stop the task. def transfer @task.stop end @@ -34,6 +40,9 @@ def transfer # Raised if a timeout occurs on a specific Fiber. Handled gracefully by `Task`. # @public Since `stable-v1`. class TimeoutError < StandardError + # Create a new timeout error. + # + # @parameter message [String] The error message. def initialize(message = "execution expired") super end @@ -41,7 +50,11 @@ def initialize(message = "execution expired") # @public Since `stable-v1`. class Task < Node + # Raised when a child task is created within a task that has finished execution. class FinishedError < RuntimeError + # Create a new finished error. + # + # @parameter message [String] The error message. def initialize(message = "Cannot create child task within a task that has finished execution!") super end @@ -72,14 +85,21 @@ def initialize(parent = Task.current?, finished: nil, **options, &block) @defer_stop = nil end + # @returns [Scheduler] The scheduler for this task. def reactor self.root end + # @returns [Array(Thread::Backtrace::Location) | Nil] The backtrace of the task, if available. def backtrace(*arguments) @fiber&.backtrace(*arguments) end + # Annotate the task with a description. + # + # This will internally try to annotate the fiber if it is running, otherwise it will annotate the task itself. + # + # @parameter annotation [String] The description to annotate the task with. def annotate(annotation, &block) if @fiber @fiber.annotate(annotation, &block) @@ -88,6 +108,7 @@ def annotate(annotation, &block) end end + # @returns [Object] The annotation of the task. def annotation if @fiber @fiber.annotation @@ -96,6 +117,7 @@ def annotation end end + # @returns [String] A description of the task and it's current status. def to_s "\#<#{self.description} (#{@status})>" end @@ -115,10 +137,10 @@ def yield Fiber.scheduler.yield end - # @attr fiber [Fiber] The fiber which is being used for the execution of this task. + # @attribute [Fiber] The fiber which is being used for the execution of this task. attr :fiber - # Whether the internal fiber is alive, i.e. it + # @returns [Boolean] Whether the internal fiber is alive, i.e. it is actively executing. def alive? @fiber&.alive? end @@ -130,32 +152,34 @@ def finished? super && @block.nil? && @fiber.nil? end - # Whether the task is running. - # @returns [Boolean] + # @returns [Boolean] Whether the task is running. def running? @status == :running end + # @returns [Boolean] Whether the task failed with an exception. def failed? @status == :failed end - # The task has been stopped + # @returns [Boolean] Whether the task has been stopped. def stopped? @status == :stopped end - # The task has completed execution and generated a result. + # @returns [Boolean] Whether the task has completed execution and generated a result. def completed? @status == :completed end alias complete? completed? - # @attr status [Symbol] The status of the execution of the fiber, one of `:initialized`, `:running`, `:complete`, `:stopped` or `:failed`. + # @attribute [Symbol] The status of the execution of the fiber, one of `:initialized`, `:running`, `:complete`, `:stopped` or `:failed`. attr :status # Begin the execution of the task. + # + # @raises [RuntimeError] If the task is already running. def run(*arguments) if @status == :initialized @status = :running @@ -169,6 +193,9 @@ def run(*arguments) end # Run an asynchronous task as a child of the current task. + # + # @raises [FinishedError] If the task has already finished. + # @returns [Task] The child task. def async(*arguments, **options, &block) raise FinishedError if self.finished? @@ -293,11 +320,12 @@ def self.current end # Check if there is a task defined for the current fiber. - # @returns [Task | Nil] + # @returns [Interface(:async) | Nil] def self.current? Thread.current[:async_task] end + # @returns [Boolean] Whether this task is the currently executing task. def current? Fiber.current.equal?(@fiber) end diff --git a/lib/async/variable.rb b/lib/async/variable.rb index bc53cc7b..d32ad681 100644 --- a/lib/async/variable.rb +++ b/lib/async/variable.rb @@ -6,12 +6,21 @@ require_relative 'condition' module Async + # A synchronization primitive that allows one task to wait for another task to resolve a value. class Variable + # Create a new variable. + # + # @parameter condition [Condition] The condition to use for synchronization. def initialize(condition = Condition.new) @condition = condition @value = nil end + # Resolve the value. + # + # Signals all waiting tasks. + # + # @parameter value [Object] The value to resolve. def resolve(value = true) @value = value condition = @condition @@ -22,15 +31,22 @@ def resolve(value = true) condition.signal(value) end + # Whether the value has been resolved. + # + # @returns [Boolean] Whether the value has been resolved. def resolved? @condition.nil? end + # Wait for the value to be resolved. + # + # @returns [Object] The resolved value. def value @condition&.wait return @value end + # Alias for {#value}. def wait self.value end diff --git a/lib/async/waiter.rb b/lib/async/waiter.rb index bd911c99..b930a0a1 100644 --- a/lib/async/waiter.rb +++ b/lib/async/waiter.rb @@ -6,6 +6,10 @@ module Async # A composable synchronization primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with {Semaphore} and/or {Barrier}. class Waiter + # Create a waiter instance. + # + # @parameter parent [Interface(:async) | Nil] The parent task to use for asynchronous operations. + # @parameter finished [Async::Condition] The condition to signal when a task completes. def initialize(parent: nil, finished: Async::Condition.new) @finished = finished @done = [] diff --git a/lib/async/wrapper.rb b/lib/async/wrapper.rb index 54ae779b..824f414f 100644 --- a/lib/async/wrapper.rb +++ b/lib/async/wrapper.rb @@ -23,6 +23,7 @@ def initialize(io, reactor = nil) attr_accessor :reactor + # Dup the underlying IO. def dup self.class.new(@io.dup) end @@ -51,11 +52,12 @@ def wait_any(timeout = @timeout) @io.to_io.wait(::IO::READABLE|::IO::WRITABLE|::IO::PRIORITY, timeout) or raise TimeoutError end - # Close the io and monitor. + # Close the underlying IO. def close @io.close end + # Whether the underlying IO is closed. def closed? @io.closed? end