Skip to content

Commit

Permalink
Add Scheduler#load and Async::Idler for scheduling tasks when idle.
Browse files Browse the repository at this point in the history
`Async::Idler` introduces a `maximum_load` and functions like a
semaphore, in that it will schedule tasks until the maximum load is
reached.
  • Loading branch information
ioquatix committed Mar 4, 2024
1 parent 13d3f47 commit 6be01bc
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 1 deletion.
2 changes: 1 addition & 1 deletion async.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ Gem::Specification.new do |spec|

spec.add_dependency "console", "~> 1.10"
spec.add_dependency "fiber-annotation"
spec.add_dependency "io-event", "~> 1.1"
spec.add_dependency "io-event", "~> 1.5"
spec.add_dependency "timers", "~> 4.1"
end
36 changes: 36 additions & 0 deletions lib/async/idler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024, by Samuel Williams.

module Async
class Idler
def initialize(maximum_load = 0.8, backoff: 0.01, parent: nil)
@maximum_load = maximum_load
@backoff = backoff
@parent = parent
end

def async(*arguments, parent: (@parent or Task.current), **options, &block)
wait

# It is crucial that we optimistically execute the child task, so that we prevent a tight loop invoking this method from consuming all available resources.
parent.async(*arguments, **options, &block)
end

def wait
scheduler = Fiber.scheduler
backoff = nil

while scheduler.load > @maximum_load
if backoff
sleep(backoff)
backoff *= 2.0
else
scheduler.yield
backoff = @backoff
end
end
end
end
end
35 changes: 35 additions & 0 deletions lib/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,33 @@ def initialize(parent = nil, selector: nil)

@blocked = 0

@busy_time = 0.0
@idle_time = 1.0

@timers = ::Timers::Group.new
end

# Compute the scheduler load according to the busy and idle times that are updated by the run loop.
# @returns [Float] The load of the scheduler. 0.0 means no load, 1.0 means fully loaded or over-loaded.
def load
total_time = @busy_time + @idle_time

# If the total time is zero, then the load is zero:
return 0.0 if total_time.zero?

# We normalize to a 1 second window:
if total_time > 1.0
ratio = 1.0 / total_time
@busy_time *= ratio
@idle_time *= ratio

# We don't need to divide here as we've already normalised it to a 1s window:
return @busy_time
else
return @busy_time / total_time
end
end

def scheduler_close
# If the execution context (thread) was handling an exception, we want to exit as quickly as possible:
unless $!
Expand Down Expand Up @@ -267,6 +291,8 @@ def run_once(timeout = nil)
# @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
# @returns [Boolean] Whether there is more work to do.
private def run_once!(timeout = 0)
start_time = Async::Clock.now

interval = @timers.wait_interval

# If there is no interval to wait (thus no timers), and no tasks, we could be done:
Expand All @@ -288,6 +314,15 @@ def run_once(timeout = nil)

@timers.fire

# Compute load:
end_time = Async::Clock.now
total_duration = end_time - start_time
idle_duration = @selector.idle_duration
busy_duration = total_duration - idle_duration

@busy_time += busy_duration
@idle_time += idle_duration

# The reactor still has work to do:
return true
end
Expand Down
Empty file added test.rb
Empty file.

0 comments on commit 6be01bc

Please sign in to comment.