From 41a0c92ca74760f8759c9da74264fd79c47c733d Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 27 Nov 2024 20:26:41 +1300 Subject: [PATCH] Make the worker pool an optional feature that defaults to off. --- .github/workflows/test-worker-pool.yaml | 38 +++++++++++++++++++++++++ lib/async/scheduler.rb | 27 +++++++++--------- lib/async/worker_pool.rb | 13 +++++++++ 3 files changed, 65 insertions(+), 13 deletions(-) create mode 100644 .github/workflows/test-worker-pool.yaml diff --git a/.github/workflows/test-worker-pool.yaml b/.github/workflows/test-worker-pool.yaml new file mode 100644 index 00000000..ae268ea1 --- /dev/null +++ b/.github/workflows/test-worker-pool.yaml @@ -0,0 +1,38 @@ +name: Test + +on: [push, pull_request] + +permissions: + contents: read + +env: + CONSOLE_OUTPUT: XTerm + ASYNC_SCHEDULER_DEFAULT_WORKER_POOL: true + +jobs: + test: + name: ${{matrix.ruby}} on ${{matrix.os}} / ASYNC_SCHEDULER_DEFAULT_WORKER_POOL=true + runs-on: ${{matrix.os}}-latest + + strategy: + matrix: + os: + - ubuntu + + ruby: + - head + + steps: + - uses: actions/checkout@v3 + - uses: ruby/setup-ruby@v1 + with: + ruby-version: ${{matrix.ruby}} + bundler-cache: true + + - name: Run tests + timeout-minutes: 10 + run: bundle exec bake test + + - name: Run external tests + timeout-minutes: 10 + run: bundle exec bake test:external diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index 29242af5..56c9620b 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -17,6 +17,10 @@ module Async # Handles scheduling of fibers. Implements the fiber scheduler interface. class Scheduler < Node + DEFAULT_WORKER_POOL = ENV.fetch("ASYNC_SCHEDULER_DEFAULT_WORKER_POOL", nil).then do |value| + value == "true" ? true : nil + end + # Raised when an operation is attempted on a closed scheduler. class ClosedError < RuntimeError # Create a new error. @@ -38,7 +42,7 @@ def self.supported? # @public Since *Async v1*. # @parameter parent [Node | Nil] The parent node to use for task hierarchy. # @parameter selector [IO::Event::Selector] The selector to use for event handling. - def initialize(parent = nil, selector: nil) + def initialize(parent = nil, selector: nil, worker_pool: DEFAULT_WORKER_POOL) super(parent) @selector = selector || ::IO::Event::Selector.new(Fiber.current) @@ -50,7 +54,15 @@ def initialize(parent = nil, selector: nil) @idle_time = 0.0 @timers = ::IO::Event::Timers.new - @worker_pool = WorkerPool.new + if worker_pool == true + @worker_pool = WorkerPool.new + else + @worker_pool = worker_pool + end + + if @worker_pool + self.singleton_class.prepend(WorkerPool::BlockingOperationWait) + end end # Compute the scheduler load according to the busy and idle times that are updated by the run loop. @@ -348,17 +360,6 @@ def process_wait(pid, flags) return @selector.process_wait(Fiber.current, pid, flags) end - # Wait for the given work to be executed. - # - # @public Since *Async v2.19* and *Ruby v3.4*. - # @asynchronous May be non-blocking. - # - # @parameter work [Proc] The work to execute on a background thread. - # @returns [Object] The result of the work. - def blocking_operation_wait(work) - @worker_pool.call(work) - end - # Run one iteration of the event loop. # # When terminating the event loop, we already know we are finished. So we don't need to check the task tree. This is a logical requirement because `run_once` ignores transient tasks. For example, a single top level transient task is not enough to keep the reactor running, but during termination we must still process it in order to terminate child tasks. diff --git a/lib/async/worker_pool.rb b/lib/async/worker_pool.rb index 88420b76..62e9994b 100644 --- a/lib/async/worker_pool.rb +++ b/lib/async/worker_pool.rb @@ -10,6 +10,19 @@ module Async # # @private class WorkerPool + module BlockingOperationWait + # Wait for the given work to be executed. + # + # @public Since *Async v2.19* and *Ruby v3.4*. + # @asynchronous May be non-blocking. + # + # @parameter work [Proc] The work to execute on a background thread. + # @returns [Object] The result of the work. + def blocking_operation_wait(work) + @worker_pool.call(work) + end + end + class Promise def initialize(work) @work = work