Skip to content

Commit

Permalink
Allow transient tasks to exit completely. (#336)
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix authored Aug 4, 2024
1 parent 91812e1 commit b7ce9de
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 34 deletions.
3 changes: 2 additions & 1 deletion lib/async/idler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def wait
backoff = nil

while true
load = scheduler.load
load = scheduler.load

break if load < @maximum_load

if backoff
Expand Down
71 changes: 39 additions & 32 deletions lib/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def load
return @busy_time / total_time
end
end

# Invoked when the fiber scheduler is being closed.
#
# Executes the run loop until all tasks are finished, then closes the scheduler.
Expand All @@ -84,34 +84,30 @@ def scheduler_close
self.close
end

# Terminate the scheduler. We deliberately ignore interrupts here, as this code can be called from an interrupt, and we don't want to be interrupted while cleaning up.
def terminate
Thread.handle_interrupt(::Interrupt => :never) do
super
private def shutdown!
# It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly.
self.stop

self.run_loop do
unless @children.nil?
run_once!
end
end
end

# Terminate all child tasks and close the scheduler.
# @public Since `stable-v1`.
def close
# It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly.
until self.terminate
self.run_once!
end
self.shutdown!

Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0

# We depend on GVL for consistency:
# @guard.synchronize do

ensure
# We want `@selector = nil` to be a visible side effect from this point forward, specifically in `#interrupt` and `#unblock`. If the selector is closed, then we don't want to push any fibers to it.
selector = @selector
@selector = nil

selector&.close

# end

consume
end

Expand Down Expand Up @@ -313,7 +309,7 @@ def run_once(timeout = nil)
#
# @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
# @returns [Boolean] Whether there is more work to do.
private def run_once!(timeout = 0)
private def run_once!(timeout = nil)
start_time = Async::Clock.now

interval = @timers.wait_interval
Expand Down Expand Up @@ -365,38 +361,49 @@ def run_once(timeout = nil)
return false
end

# Run the reactor until all tasks are finished. Proxies arguments to {#async} immediately before entering the loop, if a block is provided.
def run(...)
Kernel.raise ClosedError if @selector.nil?

initial_task = self.async(...) if block_given?
# Stop all children, including transient children, ignoring any signals.
def stop
Thread.handle_interrupt(::SignalException => :never) do
@children&.each do |child|
child.stop
end
end
end

private def run_loop(&block)
interrupt = nil

begin
# In theory, we could use Exception here to be a little bit safer, but we've only shown the case for SignalException to be a problem, so let's not over-engineer this.
Thread.handle_interrupt(::SignalException => :never) do
while true
# If we are interrupted, we need to exit:
break if self.interrupted?

until self.interrupted?
# If we are finished, we need to exit:
break unless self.run_once
break unless yield
end
end
rescue Interrupt => interrupt
Thread.handle_interrupt(::SignalException => :never) do
self.stop
end
self.stop

retry
end

# If the event loop was interrupted, and we finished exiting normally (due to the interrupt), we need to re-raise the interrupt so that the caller can handle it too.
Kernel.raise interrupt if interrupt
Kernel.raise(interrupt) if interrupt
end

# Run the reactor until all tasks are finished. Proxies arguments to {#async} immediately before entering the loop, if a block is provided.
def run(...)
Kernel.raise ClosedError if @selector.nil?

initial_task = self.async(...) if block_given?

self.run_loop do
unless self.finished?
run_once!
end
end

return initial_task
ensure
Console.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."}
end

# Start an asynchronous task within the specified reactor. The task will be
Expand Down
4 changes: 3 additions & 1 deletion test/async/idler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
let(:idler) {subject.new(0.5)}

it 'can schedule tasks up to the desired load' do
expect(Fiber.scheduler.load).to be < 0.1

# Generate the load:
Async do
while true
Expand All @@ -24,7 +26,7 @@
end
end

# This test must be longer than the test window...
# This test must be longer than the idle calculation window (1s)...
sleep 1.1

# Verify that the load is within the desired range:
Expand Down
4 changes: 4 additions & 0 deletions test/async/reactor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@

expect(reactor.run_once).to be == false
expect(reactor).to be(:finished?)

# Kick the task into the ensure block:
reactor.stop

reactor.close
end

Expand Down
24 changes: 24 additions & 0 deletions test/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,28 @@
end.to raise_exception(RuntimeError, message: be =~ /Closing scheduler with blocked operations/)
end
end

with "transient tasks" do
it "exits gracefully" do
state = nil

Sync do |task|
task.async(transient: true) do
state = :sleeping
# Never come back:
Fiber.scheduler.transfer
ensure
state = :ensure
# Yoyo but eventually exit:
5.times do
Fiber.scheduler.yield
end

state = :finished
end
end

expect(state).to be == :finished
end
end
end

0 comments on commit b7ce9de

Please sign in to comment.