diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index 404e28d8..29242af5 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -7,6 +7,7 @@ require_relative "clock" require_relative "task" +require_relative "worker_pool" require "io/event" @@ -49,6 +50,7 @@ def initialize(parent = nil, selector: nil) @idle_time = 0.0 @timers = ::IO::Event::Timers.new + @worker_pool = WorkerPool.new end # Compute the scheduler load according to the busy and idle times that are updated by the run loop. @@ -112,6 +114,11 @@ def close selector&.close + worker_pool = @worker_pool + @worker_pool = nil + + worker_pool&.close + consume end @@ -169,8 +176,11 @@ def resume(fiber, *arguments) # Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call {unblock} must be performed to allow this fiber to continue. # - + # @public Since *Async v2*. # @asynchronous May only be called on same thread as fiber scheduler. + # + # @parameter blocker [Object] The object that is blocking the fiber. + # @parameter timeout [Float | Nil] The maximum time to block, or if nil, indefinitely. def block(blocker, timeout) # $stderr.puts "block(#{blocker}, #{Fiber.current}, #{timeout})" fiber = Fiber.current @@ -346,15 +356,7 @@ def process_wait(pid, flags) # @parameter work [Proc] The work to execute on a background thread. # @returns [Object] The result of the work. def blocking_operation_wait(work) - thread = Thread.new(&work) - - result = thread.join - - thread = nil - - return result - ensure - thread&.kill + @worker_pool.call(work) end # Run one iteration of the event loop. diff --git a/lib/async/worker_pool.rb b/lib/async/worker_pool.rb new file mode 100644 index 00000000..1007e661 --- /dev/null +++ b/lib/async/worker_pool.rb @@ -0,0 +1,150 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +require "etc" + +module Async + # A simple work pool that offloads work to a background thread. + # + # @private + class WorkerPool + class Promise + def initialize(work) + @work = work + @state = :pending + @value = nil + @guard = ::Mutex.new + @condition = ::ConditionVariable.new + @thread = nil + end + + def call + work = nil + + @guard.synchronize do + @thread = ::Thread.current + + return unless work = @work + end + + resolve(work.call) + rescue Exception => error + reject(error) + end + + private def resolve(value) + @guard.synchronize do + @work = nil + @thread = nil + @value = value + @state = :resolved + @condition.broadcast + end + end + + private def reject(error) + @guard.synchronize do + @work = nil + @thread = nil + @value = error + @state = :failed + @condition.broadcast + end + end + + def cancel + @guard.synchronize do + @work = nil + @state = :cancelled + @thread&.raise(Interrupt) + end + end + + def wait + @guard.synchronize do + while @state == :pending + @condition.wait(@guard) + end + + if @state == :failed + raise @value + else + return @value + end + end + end + end + + # A handle to the work being done. + class Worker + def initialize + @work = ::Thread::Queue.new + @thread = ::Thread.new(&method(:run)) + end + + def run + while work = @work.pop + work.call + end + end + + def close + if thread = @thread + @thread = nil + thread.kill + end + end + + # Call the work and notify the scheduler when it is done. + def call(work) + promise = Promise.new(work) + + @work.push(promise) + + promise.wait + end + end + + # Create a new work pool. + # + # @parameter size [Integer] The number of threads to use. + def initialize(size: Etc.nprocessors) + @ready = ::Thread::Queue.new + + size.times do + @ready.push(Worker.new) + end + end + + # Close the work pool. Kills all outstanding work. + def close + if ready = @ready + @ready = nil + ready.close + + while worker = ready.pop + worker.close + end + end + end + + # Offload work to a thread. + # + # @parameter work [Proc] The work to be done. + def call(work) + if ready = @ready + worker = ready.pop + + begin + worker.call(work) + ensure + ready.push(worker) + end + else + raise RuntimeError, "No worker available!" + end + end + end +end diff --git a/test/async/worker_pool.rb b/test/async/worker_pool.rb new file mode 100644 index 00000000..863af21e --- /dev/null +++ b/test/async/worker_pool.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2022-2024, by Samuel Williams. +# Copyright, 2024, by Patrik Wenger. + +require "async/worker_pool" +require "sus/fixtures/async" + +describe Async::WorkerPool do + include Sus::Fixtures::Async::ReactorContext + + let(:worker_pool) {subject.new(size: 1)} + + it "offloads work to a thread" do + result = worker_pool.call(proc do + Thread.current + end) + + expect(result).not.to be == Thread.current + end + + it "gracefully handles errors" do + expect do + worker_pool.call(proc do + raise ArgumentError, "Oops!" + end) + end.to raise_exception(ArgumentError, message: be == "Oops!") + end + + with "#close" do + it "can be closed" do + worker_pool.close + + expect do + worker_pool.call(proc{}) + end.to raise_exception(RuntimeError) + end + end +end