Skip to content

Commit

Permalink
Work pool for (more) efficient handling of blocking_operation_wait.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Nov 22, 2024
1 parent 34464e5 commit 9731e3e
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 10 deletions.
23 changes: 13 additions & 10 deletions lib/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

require_relative "clock"
require_relative "task"
require_relative "work_pool"

require "io/event"

Expand Down Expand Up @@ -49,6 +50,7 @@ def initialize(parent = nil, selector: nil)
@idle_time = 0.0

@timers = ::IO::Event::Timers.new
@work_pool = WorkPool.new
end

# Compute the scheduler load according to the busy and idle times that are updated by the run loop.
Expand Down Expand Up @@ -112,6 +114,11 @@ def close

selector&.close

work_pool = @work_pool
@work_pool = nil

work_pool&.close

consume
end

Expand Down Expand Up @@ -169,8 +176,11 @@ def resume(fiber, *arguments)

# Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call {unblock} must be performed to allow this fiber to continue.
#

# @public Since *Async v2*.
# @asynchronous May only be called on same thread as fiber scheduler.
#
# @parameter blocker [Object] The object that is blocking the fiber.
# @parameter timeout [Float | Nil] The maximum time to block, or if nil, indefinitely.
def block(blocker, timeout)
# $stderr.puts "block(#{blocker}, #{Fiber.current}, #{timeout})"
fiber = Fiber.current
Expand Down Expand Up @@ -346,15 +356,8 @@ def process_wait(pid, flags)
# @parameter work [Proc] The work to execute on a background thread.
# @returns [Object] The result of the work.
def blocking_operation_wait(work)
thread = Thread.new(&work)

result = thread.join

thread = nil

return result
ensure
thread&.kill
# Fiber.blocking{$stderr.puts "Doing work: #{caller(3, 1)}"}
@work_pool.call(work)
end

# Run one iteration of the event loop.
Expand Down
91 changes: 91 additions & 0 deletions lib/async/work_pool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024, by Samuel Williams.

require 'etc'

module Async
class WorkPool
class Handle
def initialize(scheduler, fiber, work)
@scheduler = scheduler
@fiber = fiber
@work = work
@thread = nil

@finished = false

@result = nil
@error = nil
end

def call
@thread = ::Thread.current

if @work
@result = @work.call
end
rescue => @error
ensure
@thread = nil
@finished = true
@scheduler.unblock(self, @fiber)
end

def wait
@scheduler.block(self, nil)

if @error
raise @error
else
return @result
end
end

def cancel!
@work = nil
@thread&.raise(Interrupt)
end
end

def initialize(size: Etc.nprocessors)
@queue = ::Thread::Queue.new

@threads = size.times.map do
::Thread.new(&method(:run))
end
end

def close
@queue.close

while thread = @threads.pop
thread.kill
end
end

def call(work)
handle = Handle.new(::Fiber.scheduler, ::Fiber.current, work)

begin
@queue << handle

result = handle.wait
handle = nil

return result
ensure
handle&.cancel!
end
end

private def run
while job = @queue.pop
job.call
end
rescue Interrupt
# Exiting.
end
end
end

0 comments on commit 9731e3e

Please sign in to comment.