diff --git a/lib/async/idler.rb b/lib/async/idler.rb index 5cc9fa7e..a090f2ed 100644 --- a/lib/async/idler.rb +++ b/lib/async/idler.rb @@ -41,7 +41,8 @@ def wait backoff = nil while true - load = scheduler.load + load = scheduler.load + break if load < @maximum_load if backoff diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index 31c45b14..543e7e26 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -71,7 +71,7 @@ def load return @busy_time / total_time end end - + # Invoked when the fiber scheduler is being closed. # # Executes the run loop until all tasks are finished, then closes the scheduler. @@ -84,34 +84,30 @@ def scheduler_close self.close end - # Terminate the scheduler. We deliberately ignore interrupts here, as this code can be called from an interrupt, and we don't want to be interrupted while cleaning up. - def terminate - Thread.handle_interrupt(::Interrupt => :never) do - super + private def shutdown! + # It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly. + self.stop + + self.run_loop do + unless @children.nil? + run_once! + end end end # Terminate all child tasks and close the scheduler. # @public Since `stable-v1`. def close - # It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly. - until self.terminate - self.run_once! - end + self.shutdown! Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0 - - # We depend on GVL for consistency: - # @guard.synchronize do - + ensure # We want `@selector = nil` to be a visible side effect from this point forward, specifically in `#interrupt` and `#unblock`. If the selector is closed, then we don't want to push any fibers to it. selector = @selector @selector = nil selector&.close - # end - consume end @@ -313,7 +309,7 @@ def run_once(timeout = nil) # # @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite. # @returns [Boolean] Whether there is more work to do. - private def run_once!(timeout = 0) + private def run_once!(timeout = nil) start_time = Async::Clock.now interval = @timers.wait_interval @@ -365,38 +361,49 @@ def run_once(timeout = nil) return false end - # Run the reactor until all tasks are finished. Proxies arguments to {#async} immediately before entering the loop, if a block is provided. - def run(...) - Kernel.raise ClosedError if @selector.nil? - - initial_task = self.async(...) if block_given? + # Stop all children, including transient children, ignoring any signals. + def stop + Thread.handle_interrupt(::SignalException => :never) do + @children&.each do |child| + child.stop + end + end + end + + private def run_loop(&block) interrupt = nil begin # In theory, we could use Exception here to be a little bit safer, but we've only shown the case for SignalException to be a problem, so let's not over-engineer this. Thread.handle_interrupt(::SignalException => :never) do - while true - # If we are interrupted, we need to exit: - break if self.interrupted? - + until self.interrupted? # If we are finished, we need to exit: - break unless self.run_once + break unless yield end end rescue Interrupt => interrupt - Thread.handle_interrupt(::SignalException => :never) do - self.stop - end + self.stop retry end # If the event loop was interrupted, and we finished exiting normally (due to the interrupt), we need to re-raise the interrupt so that the caller can handle it too. - Kernel.raise interrupt if interrupt + Kernel.raise(interrupt) if interrupt + end + + # Run the reactor until all tasks are finished. Proxies arguments to {#async} immediately before entering the loop, if a block is provided. + def run(...) + Kernel.raise ClosedError if @selector.nil? + + initial_task = self.async(...) if block_given? + + self.run_loop do + unless self.finished? + run_once! + end + end return initial_task - ensure - Console.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."} end # Start an asynchronous task within the specified reactor. The task will be diff --git a/test/async/idler.rb b/test/async/idler.rb index 615aabfa..5252983b 100644 --- a/test/async/idler.rb +++ b/test/async/idler.rb @@ -13,6 +13,8 @@ let(:idler) {subject.new(0.5)} it 'can schedule tasks up to the desired load' do + expect(Fiber.scheduler.load).to be < 0.1 + # Generate the load: Async do while true @@ -24,7 +26,7 @@ end end - # This test must be longer than the test window... + # This test must be longer than the idle calculation window (1s)... sleep 1.1 # Verify that the load is within the desired range: diff --git a/test/async/reactor.rb b/test/async/reactor.rb index f6710d8d..4e913b46 100644 --- a/test/async/reactor.rb +++ b/test/async/reactor.rb @@ -42,6 +42,10 @@ expect(reactor.run_once).to be == false expect(reactor).to be(:finished?) + + # Kick the task into the ensure block: + reactor.stop + reactor.close end diff --git a/test/async/scheduler.rb b/test/async/scheduler.rb index 9250119d..61ad7907 100644 --- a/test/async/scheduler.rb +++ b/test/async/scheduler.rb @@ -183,4 +183,28 @@ end.to raise_exception(RuntimeError, message: be =~ /Closing scheduler with blocked operations/) end end + + with "transient tasks" do + it "exits gracefully" do + state = nil + + Sync do |task| + task.async(transient: true) do + state = :sleeping + # Never come back: + Fiber.scheduler.transfer + ensure + state = :ensure + # Yoyo but eventually exit: + 5.times do + Fiber.scheduler.yield + end + + state = :finished + end + end + + expect(state).to be == :finished + end + end end