From 47375730c7b5b4860e8824a013049d31edbd8e47 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Tue, 6 Aug 2024 17:58:37 +1200 Subject: [PATCH 1/6] WIP --- lib/async/scheduler.rb | 8 +++++--- lib/async/task.rb | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index 543e7e26..0f883c51 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -296,7 +296,7 @@ def run_once(timeout = nil) Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking? # If we are finished, we stop the task tree and exit: - if self.finished? + if @children.nil? return false end @@ -398,9 +398,11 @@ def run(...) initial_task = self.async(...) if block_given? self.run_loop do - unless self.finished? - run_once! + if self.finished? + self.stop end + + run_once! end return initial_task diff --git a/lib/async/task.rb b/lib/async/task.rb index 7411bd8f..9b658804 100644 --- a/lib/async/task.rb +++ b/lib/async/task.rb @@ -176,7 +176,7 @@ def completed? alias complete? completed? - # @attribute [Symbol] The status of the execution of the fiber, one of `:initialized`, `:running`, `:complete`, `:stopped` or `:failed`. + # @attribute [Symbol] The status of the execution of the task, one of `:initialized`, `:running`, `:complete`, `:stopped` or `:failed`. attr :status # Begin the execution of the task. From e820b2c7dbf71133bf73a83f42f165ca02597f74 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Tue, 6 Aug 2024 23:56:11 +1200 Subject: [PATCH 2/6] Re-introduce terminate. --- lib/async/scheduler.rb | 75 ++++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 36 deletions(-) diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index 0f883c51..d7ec45d5 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -75,30 +75,33 @@ def load # Invoked when the fiber scheduler is being closed. # # Executes the run loop until all tasks are finished, then closes the scheduler. - def scheduler_close + def scheduler_close(error = $!) # If the execution context (thread) was handling an exception, we want to exit as quickly as possible: - unless $! + unless error self.run end ensure self.close end - private def shutdown! - # It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly. - self.stop - - self.run_loop do - unless @children.nil? - run_once! - end + # Terminate all child tasks. + def terminate + # If that doesn't work, take more serious action: + @children&.each do |child| + child.terminate end + + return @children.nil? end # Terminate all child tasks and close the scheduler. # @public Since `stable-v1`. def close - self.shutdown! + self.run_loop do + until self.terminate + self.run_once! + end + end Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0 ensure @@ -288,21 +291,6 @@ def process_wait(pid, flags) return @selector.process_wait(Fiber.current, pid, flags) end - # Run one iteration of the event loop. - # Does not handle interrupts. - # @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite. - # @returns [Boolean] Whether there is more work to do. - def run_once(timeout = nil) - Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking? - - # If we are finished, we stop the task tree and exit: - if @children.nil? - return false - end - - return run_once!(timeout) - end - # Run one iteration of the event loop. # # When terminating the event loop, we already know we are finished. So we don't need to check the task tree. This is a logical requirement because `run_once` ignores transient tasks. For example, a single top level transient task is not enough to keep the reactor running, but during termination we must still process it in order to terminate child tasks. @@ -346,6 +334,25 @@ def run_once(timeout = nil) return true end + # Run one iteration of the event loop. + # Does not handle interrupts. + # @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite. + # @returns [Boolean] Whether there is more work to do. + def run_once(timeout = nil) + Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking? + + if self.finished? + self.stop + end + + # If we are finished, we stop the task tree and exit: + if @children.nil? + return false + end + + return run_once!(timeout) + end + # Checks and clears the interrupted state of the scheduler. # @returns [Boolean] Whether the reactor has been interrupted. private def interrupted? @@ -363,10 +370,8 @@ def run_once(timeout = nil) # Stop all children, including transient children, ignoring any signals. def stop - Thread.handle_interrupt(::SignalException => :never) do - @children&.each do |child| - child.stop - end + @children&.each do |child| + child.stop end end @@ -382,7 +387,9 @@ def stop end end rescue Interrupt => interrupt - self.stop + Thread.handle_interrupt(::SignalException => :never) do + self.stop + end retry end @@ -398,11 +405,7 @@ def run(...) initial_task = self.async(...) if block_given? self.run_loop do - if self.finished? - self.stop - end - - run_once! + run_once end return initial_task From 3bdf0c23e0391a254916cc987d8371ce510fc82c Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 7 Aug 2024 00:56:12 +1200 Subject: [PATCH 3/6] wip. --- lib/async/node.rb | 16 ++++++++++++++++ lib/async/task.rb | 5 ++++- test/async/reactor.rb | 2 +- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/lib/async/node.rb b/lib/async/node.rb index 3c26e98a..fec5586d 100644 --- a/lib/async/node.rb +++ b/lib/async/node.rb @@ -34,6 +34,14 @@ def nil? empty? end + def adjust_transient_count(transient) + if transient + @transient_count += 1 + else + @transient_count -= 1 + end + end + private def added(node) @@ -110,6 +118,14 @@ def transient? @transient end + protected def transient=(value) + if @transient != value + @transient = value + + @parent&.children&.adjust_transient_count(value) + end + end + # Annotate the node with a description. # # @parameter annotation [String] The description to annotate the node with. diff --git a/lib/async/task.rb b/lib/async/task.rb index 9b658804..ba8e80d9 100644 --- a/lib/async/task.rb +++ b/lib/async/task.rb @@ -262,6 +262,9 @@ def stop(later = false) # If the fiber is alive, we need to stop it: if @fiber&.alive? + # As the task is now exiting, we want to ensure the event loop continues to execute until the task finishes. + self.transient = false + if self.current? # If the fiber is current, and later is `true`, we need to schedule the fiber to be stopped later, as it's currently invoking `stop`: if later @@ -276,7 +279,7 @@ def stop(later = false) begin # There is a chance that this will stop the fiber that originally called stop. If that happens, the exception handling in `#stopped` will rescue the exception and re-raise it later. Fiber.scheduler.raise(@fiber, Stop) - rescue FiberError + rescue FiberError => error # In some cases, this can cause a FiberError (it might be resumed already), so we schedule it to be stopped later: Fiber.scheduler.push(Stop::Later.new(self)) end diff --git a/test/async/reactor.rb b/test/async/reactor.rb index 4e913b46..7fa5bc64 100644 --- a/test/async/reactor.rb +++ b/test/async/reactor.rb @@ -40,7 +40,7 @@ sleep end - expect(reactor.run_once).to be == false + expect(reactor.run_once(0)).to be == false expect(reactor).to be(:finished?) # Kick the task into the ensure block: From 34b8fb6463b446dba42287185194fbd7d980d4bd Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 7 Aug 2024 12:10:51 +1200 Subject: [PATCH 4/6] Update test to expect graceful shutdown. --- test/async/reactor.rb | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/test/async/reactor.rb b/test/async/reactor.rb index 7fa5bc64..cd0e4c8e 100644 --- a/test/async/reactor.rb +++ b/test/async/reactor.rb @@ -40,11 +40,8 @@ sleep end - expect(reactor.run_once(0)).to be == false expect(reactor).to be(:finished?) - - # Kick the task into the ensure block: - reactor.stop + expect(reactor.run_once(0)).to be == true reactor.close end From e3a38f89ecc87d111d23d27c9a6f92637da2e462 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 7 Aug 2024 12:55:49 +1200 Subject: [PATCH 5/6] Documentation. --- lib/async/node.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/async/node.rb b/lib/async/node.rb index fec5586d..f8238515 100644 --- a/lib/async/node.rb +++ b/lib/async/node.rb @@ -34,6 +34,9 @@ def nil? empty? end + # Adjust the number of transient children, assuming it has changed. + # + # @parameter transient [Boolean] Whether to increment or decrement the transient count. def adjust_transient_count(transient) if transient @transient_count += 1 From 672787934b43d2f29ceb79bb5986bff40215217a Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 7 Aug 2024 21:48:07 +1200 Subject: [PATCH 6/6] Tests. --- lib/async/node.rb | 9 ++++++++- test/async/children.rb | 18 ++++++++++++++++++ test/async/scheduler.rb | 4 +++- 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/lib/async/node.rb b/lib/async/node.rb index f8238515..20b2bd59 100644 --- a/lib/async/node.rb +++ b/lib/async/node.rb @@ -36,6 +36,8 @@ def nil? # Adjust the number of transient children, assuming it has changed. # + # Despite being public, this is not intended to be called directly. It is used internally by {Node#transient=}. + # # @parameter transient [Boolean] Whether to increment or decrement the transient count. def adjust_transient_count(transient) if transient @@ -121,7 +123,12 @@ def transient? @transient end - protected def transient=(value) + # Change the transient state of the node. + # + # A transient node is not considered when determining if a node is finished, and propagates up if the parent is consumed. + # + # @parameter value [Boolean] Whether the node is transient. + def transient=(value) if @transient != value @transient = value diff --git a/test/async/children.rb b/test/async/children.rb index 6abdcb77..f57a1734 100644 --- a/test/async/children.rb +++ b/test/async/children.rb @@ -40,4 +40,22 @@ expect{children.remove(child)}.to raise_exception(ArgumentError, message: be =~ /not in a list/) end end + + with "transient children" do + let(:parent) {Async::Node.new} + let(:children) {parent.children} + + it "can add a transient child" do + child = Async::Node.new(parent, transient: true) + expect(children).to be(:transients?) + + child.transient = false + expect(children).not.to be(:transients?) + expect(parent).not.to be(:finished?) + + child.transient = true + expect(children).to be(:transients?) + expect(parent).to be(:finished?) + end + end end diff --git a/test/async/scheduler.rb b/test/async/scheduler.rb index 61ad7907..7e5bf2a4 100644 --- a/test/async/scheduler.rb +++ b/test/async/scheduler.rb @@ -187,9 +187,10 @@ with "transient tasks" do it "exits gracefully" do state = nil + child_task = nil Sync do |task| - task.async(transient: true) do + child_task = task.async(transient: true) do state = :sleeping # Never come back: Fiber.scheduler.transfer @@ -205,6 +206,7 @@ end expect(state).to be == :finished + expect(child_task).not.to be(:transient?) end end end