diff --git a/lib/async/node.rb b/lib/async/node.rb index 3c26e98a..20b2bd59 100644 --- a/lib/async/node.rb +++ b/lib/async/node.rb @@ -34,6 +34,19 @@ def nil? empty? end + # Adjust the number of transient children, assuming it has changed. + # + # Despite being public, this is not intended to be called directly. It is used internally by {Node#transient=}. + # + # @parameter transient [Boolean] Whether to increment or decrement the transient count. + def adjust_transient_count(transient) + if transient + @transient_count += 1 + else + @transient_count -= 1 + end + end + private def added(node) @@ -110,6 +123,19 @@ def transient? @transient end + # Change the transient state of the node. + # + # A transient node is not considered when determining if a node is finished, and propagates up if the parent is consumed. + # + # @parameter value [Boolean] Whether the node is transient. + def transient=(value) + if @transient != value + @transient = value + + @parent&.children&.adjust_transient_count(value) + end + end + # Annotate the node with a description. # # @parameter annotation [String] The description to annotate the node with. diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index 543e7e26..d7ec45d5 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -75,30 +75,33 @@ def load # Invoked when the fiber scheduler is being closed. # # Executes the run loop until all tasks are finished, then closes the scheduler. - def scheduler_close + def scheduler_close(error = $!) # If the execution context (thread) was handling an exception, we want to exit as quickly as possible: - unless $! + unless error self.run end ensure self.close end - private def shutdown! - # It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly. - self.stop - - self.run_loop do - unless @children.nil? - run_once! - end + # Terminate all child tasks. + def terminate + # If that doesn't work, take more serious action: + @children&.each do |child| + child.terminate end + + return @children.nil? end # Terminate all child tasks and close the scheduler. # @public Since `stable-v1`. def close - self.shutdown! + self.run_loop do + until self.terminate + self.run_once! + end + end Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0 ensure @@ -288,21 +291,6 @@ def process_wait(pid, flags) return @selector.process_wait(Fiber.current, pid, flags) end - # Run one iteration of the event loop. - # Does not handle interrupts. - # @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite. - # @returns [Boolean] Whether there is more work to do. - def run_once(timeout = nil) - Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking? - - # If we are finished, we stop the task tree and exit: - if self.finished? - return false - end - - return run_once!(timeout) - end - # Run one iteration of the event loop. # # When terminating the event loop, we already know we are finished. So we don't need to check the task tree. This is a logical requirement because `run_once` ignores transient tasks. For example, a single top level transient task is not enough to keep the reactor running, but during termination we must still process it in order to terminate child tasks. @@ -346,6 +334,25 @@ def run_once(timeout = nil) return true end + # Run one iteration of the event loop. + # Does not handle interrupts. + # @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite. + # @returns [Boolean] Whether there is more work to do. + def run_once(timeout = nil) + Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking? + + if self.finished? + self.stop + end + + # If we are finished, we stop the task tree and exit: + if @children.nil? + return false + end + + return run_once!(timeout) + end + # Checks and clears the interrupted state of the scheduler. # @returns [Boolean] Whether the reactor has been interrupted. private def interrupted? @@ -363,10 +370,8 @@ def run_once(timeout = nil) # Stop all children, including transient children, ignoring any signals. def stop - Thread.handle_interrupt(::SignalException => :never) do - @children&.each do |child| - child.stop - end + @children&.each do |child| + child.stop end end @@ -382,7 +387,9 @@ def stop end end rescue Interrupt => interrupt - self.stop + Thread.handle_interrupt(::SignalException => :never) do + self.stop + end retry end @@ -398,9 +405,7 @@ def run(...) initial_task = self.async(...) if block_given? self.run_loop do - unless self.finished? - run_once! - end + run_once end return initial_task diff --git a/lib/async/task.rb b/lib/async/task.rb index 7411bd8f..ba8e80d9 100644 --- a/lib/async/task.rb +++ b/lib/async/task.rb @@ -176,7 +176,7 @@ def completed? alias complete? completed? - # @attribute [Symbol] The status of the execution of the fiber, one of `:initialized`, `:running`, `:complete`, `:stopped` or `:failed`. + # @attribute [Symbol] The status of the execution of the task, one of `:initialized`, `:running`, `:complete`, `:stopped` or `:failed`. attr :status # Begin the execution of the task. @@ -262,6 +262,9 @@ def stop(later = false) # If the fiber is alive, we need to stop it: if @fiber&.alive? + # As the task is now exiting, we want to ensure the event loop continues to execute until the task finishes. + self.transient = false + if self.current? # If the fiber is current, and later is `true`, we need to schedule the fiber to be stopped later, as it's currently invoking `stop`: if later @@ -276,7 +279,7 @@ def stop(later = false) begin # There is a chance that this will stop the fiber that originally called stop. If that happens, the exception handling in `#stopped` will rescue the exception and re-raise it later. Fiber.scheduler.raise(@fiber, Stop) - rescue FiberError + rescue FiberError => error # In some cases, this can cause a FiberError (it might be resumed already), so we schedule it to be stopped later: Fiber.scheduler.push(Stop::Later.new(self)) end diff --git a/test/async/children.rb b/test/async/children.rb index 6abdcb77..f57a1734 100644 --- a/test/async/children.rb +++ b/test/async/children.rb @@ -40,4 +40,22 @@ expect{children.remove(child)}.to raise_exception(ArgumentError, message: be =~ /not in a list/) end end + + with "transient children" do + let(:parent) {Async::Node.new} + let(:children) {parent.children} + + it "can add a transient child" do + child = Async::Node.new(parent, transient: true) + expect(children).to be(:transients?) + + child.transient = false + expect(children).not.to be(:transients?) + expect(parent).not.to be(:finished?) + + child.transient = true + expect(children).to be(:transients?) + expect(parent).to be(:finished?) + end + end end diff --git a/test/async/reactor.rb b/test/async/reactor.rb index 4e913b46..cd0e4c8e 100644 --- a/test/async/reactor.rb +++ b/test/async/reactor.rb @@ -40,11 +40,8 @@ sleep end - expect(reactor.run_once).to be == false expect(reactor).to be(:finished?) - - # Kick the task into the ensure block: - reactor.stop + expect(reactor.run_once(0)).to be == true reactor.close end diff --git a/test/async/scheduler.rb b/test/async/scheduler.rb index 61ad7907..7e5bf2a4 100644 --- a/test/async/scheduler.rb +++ b/test/async/scheduler.rb @@ -187,9 +187,10 @@ with "transient tasks" do it "exits gracefully" do state = nil + child_task = nil Sync do |task| - task.async(transient: true) do + child_task = task.async(transient: true) do state = :sleeping # Never come back: Fiber.scheduler.transfer @@ -205,6 +206,7 @@ end expect(state).to be == :finished + expect(child_task).not.to be(:transient?) end end end