Skip to content

Commit

Permalink
Rework implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Nov 23, 2024
1 parent 0c337b8 commit 83f2160
Showing 1 changed file with 98 additions and 65 deletions.
163 changes: 98 additions & 65 deletions lib/async/worker_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,108 +10,141 @@ module Async
#
# @private
class WorkerPool
# A handle to the work being done.
class Handle
# Create a new handle.
#
# @parameter scheduler [Async::Scheduler] The scheduler that is managing the fiber.
# @parameter fiber [Async::Fiber] The fiber that is waiting for the work to complete.
# @parameter work [Proc] The work to be done.
def initialize(scheduler, fiber, work)
@scheduler = scheduler
@fiber = fiber
class Promise
def initialize(work)
@work = work

# The thread, if any, that is doing the work:
@state = :pending
@value = nil
@guard = ::Mutex.new
@condition = ::ConditionVariable.new
@thread = nil

@result = nil
@error = nil
end

# Call the work and notify the scheduler when it is done.
def call
@thread = ::Thread.current
work = nil

if @work
@result = @work.call
@guard.synchronize do
@thread = ::Thread.current

return unless work = @work
end
rescue => @error
ensure
@thread = nil

if @fiber
@scheduler.unblock(self, @fiber)
resolve(work.call)
rescue Exception => error
reject(error)
end

private def resolve(value)
@guard.synchronize do
@work = nil
@thread = nil
@value = value
@state = :resolved
@condition.broadcast
end
end

private def reject(error)
@guard.synchronize do
@work = nil
@thread = nil
@value = error
@state = :failed
@condition.broadcast
end
end

def cancel
@guard.synchronize do
@work = nil
@state = :cancelled
@thread&.raise(Interrupt)
end
end

# Wait for the work to complete.
def wait
@scheduler.block(self, nil)

if @error
raise @error
else
return @result
@guard.synchronize do
while @state == :pending
@condition.wait(@guard)
end

if @state == :failed
raise @value
else
return @value
end
end
end
end

# A handle to the work being done.
class Worker
def initialize
@thread = ::Thread.new(&method(:run))
@work = ::Thread::Queue.new
end

def run
while work = @work.pop
work.call
end
end

def close
if thread = @thread
@thread = nil
thread.kill
end
end

# Cancel the work.
def cancel!
@work = nil
@fiber = nil # Don't call unblock.
@thread&.raise(Interrupt)
# Call the work and notify the scheduler when it is done.
def call(work)
promise = Promise.new(work)

@work.push(promise)

promise.wait
end
end

# Create a new work pool.
#
# @parameter size [Integer] The number of threads to use.
def initialize(size: Etc.nprocessors)
@queue = ::Thread::Queue.new
@ready = ::Thread::Queue.new

@threads = size.times.map do
::Thread.new(&method(:run))
size.times do
@ready.push(Worker.new)
end
end

# Close the work pool. Kills all outstanding work.
def close
@queue.close

while thread = @threads.pop
thread.kill
if ready = @ready
@ready = nil
ready.close

while worker = ready.pop
worker.close
end
end
end

# Offload work to a thread.
#
# @parameter work [Proc] The work to be done.
def call(work)
if @threads.empty?
raise RuntimeError, "No threads available!"
end

handle = Handle.new(::Fiber.scheduler, ::Fiber.current, work)

begin
@queue << handle

result = handle.wait
handle = nil
if ready = @ready
worker = ready.pop

return result
ensure
handle&.cancel!
end
end

private def run
while job = @queue.pop
job.call
begin
worker.call(work)
ensure
ready.push(worker)
end
else
raise RuntimeError, "No worker available!"
end
rescue Interrupt
# Exiting.
end
end
end

0 comments on commit 83f2160

Please sign in to comment.