diff --git a/spec/std/fiber/execution_context/global_queue_spec.cr b/spec/std/fiber/execution_context/global_queue_spec.cr new file mode 100644 index 000000000000..17b746c7dc86 --- /dev/null +++ b/spec/std/fiber/execution_context/global_queue_spec.cr @@ -0,0 +1,225 @@ +require "./spec_helper" + +describe Fiber::ExecutionContext::GlobalQueue do + it "#initialize" do + q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.empty?.should be_true + end + + it "#unsafe_push and #unsafe_pop" do + f1 = new_fake_fiber("f1") + f2 = new_fake_fiber("f2") + f3 = new_fake_fiber("f3") + + q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.unsafe_push(f1) + q.size.should eq(1) + + q.unsafe_push(f2) + q.unsafe_push(f3) + q.size.should eq(3) + + q.unsafe_pop?.should be(f3) + q.size.should eq(2) + + q.unsafe_pop?.should be(f2) + q.unsafe_pop?.should be(f1) + q.unsafe_pop?.should be_nil + q.size.should eq(0) + q.empty?.should be_true + end + + describe "#unsafe_grab?" do + it "can't grab from empty queue" do + q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + runnables = Fiber::ExecutionContext::Runnables(6).new(q) + q.unsafe_grab?(runnables, 4).should be_nil + end + + it "grabs fibers" do + q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + fibers = 10.times.map { |i| new_fake_fiber("f#{i}") }.to_a + fibers.each { |f| q.unsafe_push(f) } + + runnables = Fiber::ExecutionContext::Runnables(6).new(q) + fiber = q.unsafe_grab?(runnables, 4) + + # returned the last enqueued fiber + fiber.should be(fibers[9]) + + # enqueued the next 2 fibers + runnables.size.should eq(2) + runnables.shift?.should be(fibers[8]) + runnables.shift?.should be(fibers[7]) + + # the remaining fibers are still there: + 6.downto(0).each do |i| + q.unsafe_pop?.should be(fibers[i]) + end + end + + it "can't grab more than available" do + f = new_fake_fiber + q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.unsafe_push(f) + + # dequeues the unique fiber + runnables = Fiber::ExecutionContext::Runnables(6).new(q) + fiber = q.unsafe_grab?(runnables, 4) + fiber.should be(f) + + # had nothing left to dequeue + runnables.size.should eq(0) + end + + it "clamps divisor to 1" do + f = new_fake_fiber + q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.unsafe_push(f) + + # dequeues the unique fiber + runnables = Fiber::ExecutionContext::Runnables(6).new(q) + fiber = q.unsafe_grab?(runnables, 0) + fiber.should be(f) + + # had nothing left to dequeue + runnables.size.should eq(0) + end + end + + # interpreter doesn't support threads yet (#14287) + pending_interpreted describe: "thread safety" do + it "one by one" do + fibers = StaticArray(Fiber::ExecutionContext::FiberCounter, 763).new do |i| + Fiber::ExecutionContext::FiberCounter.new(new_fake_fiber("f#{i}")) + end + + n = 7 + increments = 15 + queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + ready = Thread::WaitGroup.new(n) + shutdown = Thread::WaitGroup.new(n) + + n.times do |i| + Thread.new("ONE-#{i}") do |thread| + slept = 0 + ready.done + + loop do + if fiber = queue.pop? + fc = fibers.find { |x| x.@fiber == fiber }.not_nil! + queue.push(fiber) if fc.increment < increments + slept = 0 + elsif slept < 100 + slept += 1 + Thread.sleep(1.nanosecond) # don't burn CPU + else + break + end + end + rescue exception + Crystal::System.print_error "\nthread: #{thread.name}: exception: #{exception}" + ensure + shutdown.done + end + end + ready.wait + + fibers.each_with_index do |fc, i| + queue.push(fc.@fiber) + Thread.sleep(10.nanoseconds) if i % 10 == 9 + end + + shutdown.wait + + # must have dequeued each fiber exactly X times + fibers.each { |fc| fc.counter.should eq(increments) } + end + + it "bulk operations" do + n = 7 + increments = 15 + + fibers = StaticArray(Fiber::ExecutionContext::FiberCounter, 765).new do |i| # 765 can be divided by 3 and 5 + Fiber::ExecutionContext::FiberCounter.new(new_fake_fiber("f#{i}")) + end + + queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + ready = Thread::WaitGroup.new(n) + shutdown = Thread::WaitGroup.new(n) + + n.times do |i| + Thread.new("BULK-#{i}") do |thread| + slept = 0 + + r = Fiber::ExecutionContext::Runnables(3).new(queue) + + batch = Fiber::List.new + size = 0 + + reenqueue = -> { + if size > 0 + queue.bulk_push(pointerof(batch)) + names = [] of String? + batch.each { |f| names << f.name } + batch.clear + size = 0 + end + } + + execute = ->(fiber : Fiber) { + fc = fibers.find { |x| x.@fiber == fiber }.not_nil! + + if fc.increment < increments + batch.push(fc.@fiber) + size += 1 + end + } + + ready.done + + loop do + if fiber = r.shift? + execute.call(fiber) + slept = 0 + next + end + + if fiber = queue.grab?(r, 1) + reenqueue.call + execute.call(fiber) + slept = 0 + next + end + + if slept >= 100 + break + end + + reenqueue.call + slept += 1 + Thread.sleep(1.nanosecond) # don't burn CPU + end + rescue exception + Crystal::System.print_error "\nthread #{thread.name} raised: #{exception}" + ensure + shutdown.done + end + end + ready.wait + + # enqueue in batches of 5 + 0.step(to: fibers.size - 1, by: 5) do |i| + list = Fiber::List.new + 5.times { |j| list.push(fibers[i + j].@fiber) } + queue.bulk_push(pointerof(list)) + Thread.sleep(10.nanoseconds) if i % 4 == 3 + end + + shutdown.wait + + # must have dequeued each fiber exactly X times (no less, no more) + fibers.each { |fc| fc.counter.should eq(increments) } + end + end +end diff --git a/spec/std/fiber/execution_context/runnables_spec.cr b/spec/std/fiber/execution_context/runnables_spec.cr new file mode 100644 index 000000000000..4c4a227e374f --- /dev/null +++ b/spec/std/fiber/execution_context/runnables_spec.cr @@ -0,0 +1,264 @@ +require "./spec_helper" + +describe Fiber::ExecutionContext::Runnables do + it "#initialize" do + g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = Fiber::ExecutionContext::Runnables(16).new(g) + r.capacity.should eq(16) + end + + describe "#push" do + it "enqueues the fiber in local queue" do + fibers = 4.times.map { |i| new_fake_fiber("f#{i}") }.to_a + + # local enqueue + g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = Fiber::ExecutionContext::Runnables(4).new(g) + fibers.each { |f| r.push(f) } + + # local dequeue + fibers.each { |f| r.shift?.should be(f) } + r.shift?.should be_nil + + # didn't push to global queue + g.pop?.should be_nil + end + + it "moves half the local queue to the global queue on overflow" do + fibers = 5.times.map { |i| new_fake_fiber("f#{i}") }.to_a + + # local enqueue + overflow + g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = Fiber::ExecutionContext::Runnables(4).new(g) + fibers.each { |f| r.push(f) } + + # kept half of local queue + r.shift?.should be(fibers[2]) + r.shift?.should be(fibers[3]) + + # moved half of local queue + last push to global queue + g.pop?.should eq(fibers[0]) + g.pop?.should eq(fibers[1]) + g.pop?.should eq(fibers[4]) + end + + it "can always push up to capacity" do + g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = Fiber::ExecutionContext::Runnables(4).new(g) + + 4.times do + # local + 4.times { r.push(new_fake_fiber) } + 2.times { r.shift? } + 2.times { r.push(new_fake_fiber) } + + # overflow (2+1 fibers are sent to global queue + 1 local) + 2.times { r.push(new_fake_fiber) } + + # clear + 3.times { r.shift? } + end + + # on each iteration we pushed 2+1 fibers to the global queue + g.size.should eq(12) + + # grab fibers back from the global queue + fiber = g.unsafe_grab?(r, divisor: 1) + fiber.should_not be_nil + r.shift?.should_not be_nil + r.shift?.should be_nil + end + end + + describe "#bulk_push" do + it "fills the local queue" do + l = Fiber::List.new + fibers = 4.times.map { |i| new_fake_fiber("f#{i}") }.to_a + fibers.each { |f| l.push(f) } + + # local enqueue + g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = Fiber::ExecutionContext::Runnables(4).new(g) + r.bulk_push(pointerof(l)) + + fibers.reverse_each { |f| r.shift?.should be(f) } + g.empty?.should be_true + end + + it "pushes the overflow to the global queue" do + l = Fiber::List.new + fibers = 7.times.map { |i| new_fake_fiber("f#{i}") }.to_a + fibers.each { |f| l.push(f) } + + # local enqueue + overflow + g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = Fiber::ExecutionContext::Runnables(4).new(g) + r.bulk_push(pointerof(l)) + + # filled the local queue + r.shift?.should eq(fibers[6]) + r.shift?.should eq(fibers[5]) + r.shift?.should be(fibers[4]) + r.shift?.should be(fibers[3]) + + # moved the rest to the global queue + g.pop?.should eq(fibers[2]) + g.pop?.should eq(fibers[1]) + g.pop?.should eq(fibers[0]) + end + end + + describe "#shift?" do + # TODO: need specific tests (though we already use it in the above tests?) + end + + describe "#steal_from" do + it "steals from another runnables" do + g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + fibers = 6.times.map { |i| new_fake_fiber("f#{i}") }.to_a + + # fill the source queue + r1 = Fiber::ExecutionContext::Runnables(16).new(g) + fibers.each { |f| r1.push(f) } + + # steal from source queue + r2 = Fiber::ExecutionContext::Runnables(16).new(g) + fiber = r2.steal_from(r1) + + # stole half of the runnable fibers + fiber.should be(fibers[2]) + r2.shift?.should be(fibers[0]) + r2.shift?.should be(fibers[1]) + r2.shift?.should be_nil + + # left the other half + r1.shift?.should be(fibers[3]) + r1.shift?.should be(fibers[4]) + r1.shift?.should be(fibers[5]) + r1.shift?.should be_nil + + # global queue is left untouched + g.empty?.should be_true + end + + it "steals the last fiber" do + g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + lone = new_fake_fiber("lone") + + # fill the source queue + r1 = Fiber::ExecutionContext::Runnables(16).new(g) + r1.push(lone) + + # steal from source queue + r2 = Fiber::ExecutionContext::Runnables(16).new(g) + fiber = r2.steal_from(r1) + + # stole the fiber & local queue is still empty + fiber.should be(lone) + r2.shift?.should be_nil + + # left nothing in original queue + r1.shift?.should be_nil + + # global queue is left untouched + g.empty?.should be_true + end + + it "steals nothing" do + g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r1 = Fiber::ExecutionContext::Runnables(16).new(g) + r2 = Fiber::ExecutionContext::Runnables(16).new(g) + + fiber = r2.steal_from(r1) + fiber.should be_nil + r2.shift?.should be_nil + r1.shift?.should be_nil + end + end + + # interpreter doesn't support threads yet (#14287) + pending_interpreted describe: "thread safety" do + it "stress test" do + n = 7 + increments = 7919 + + # less fibers than space in runnables (so threads can starve) + # 54 is roughly half of 16 × 7 and can be divided by 9 (for batch enqueues below) + fibers = Array(Fiber::ExecutionContext::FiberCounter).new(54) do |i| + Fiber::ExecutionContext::FiberCounter.new(new_fake_fiber("f#{i}")) + end + + global_queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + ready = Thread::WaitGroup.new(n) + shutdown = Thread::WaitGroup.new(n) + + all_runnables = Array(Fiber::ExecutionContext::Runnables(16)).new(n) do + Fiber::ExecutionContext::Runnables(16).new(global_queue) + end + + n.times do |i| + Thread.new("RUN-#{i}") do |thread| + runnables = all_runnables[i] + slept = 0 + + execute = ->(fiber : Fiber) { + fc = fibers.find { |x| x.@fiber == fiber }.not_nil! + runnables.push(fiber) if fc.increment < increments + } + + ready.done + + loop do + # dequeue from local queue + if fiber = runnables.shift? + execute.call(fiber) + slept = 0 + next + end + + # steal from another queue + while (r = all_runnables.sample) == runnables + end + if fiber = runnables.steal_from(r) + execute.call(fiber) + slept = 0 + next + end + + # dequeue from global queue + if fiber = global_queue.grab?(runnables, n) + execute.call(fiber) + slept = 0 + next + end + + if slept >= 100 + break + end + + slept += 1 + Thread.sleep(1.nanosecond) # don't burn CPU + end + rescue exception + Crystal::System.print_error "\nthread #{thread.name} raised: #{exception}" + ensure + shutdown.done + end + end + ready.wait + + # enqueue in batches + 0.step(to: fibers.size - 1, by: 9) do |i| + list = Fiber::List.new + 9.times { |j| list.push(fibers[i + j].@fiber) } + global_queue.bulk_push(pointerof(list)) + Thread.sleep(10.nanoseconds) if i % 2 == 1 + end + + shutdown.wait + + # must have dequeued each fiber exactly X times (no less, no more) + fibers.each { |fc| fc.counter.should eq(increments) } + end + end +end diff --git a/spec/std/fiber/execution_context/spec_helper.cr b/spec/std/fiber/execution_context/spec_helper.cr new file mode 100644 index 000000000000..465005bbe5b9 --- /dev/null +++ b/spec/std/fiber/execution_context/spec_helper.cr @@ -0,0 +1,22 @@ +require "../../spec_helper" +require "../../../support/fibers" +require "crystal/system/thread_wait_group" +require "fiber/execution_context/runnables" +require "fiber/execution_context/global_queue" + +module Fiber::ExecutionContext + class FiberCounter + def initialize(@fiber : Fiber) + @counter = Atomic(Int32).new(0) + end + + # fetch and add + def increment + @counter.add(1, :relaxed) + 1 + end + + def counter + @counter.get(:relaxed) + end + end +end diff --git a/spec/std/fiber/list_spec.cr b/spec/std/fiber/list_spec.cr new file mode 100644 index 000000000000..e46a5117ae46 --- /dev/null +++ b/spec/std/fiber/list_spec.cr @@ -0,0 +1,184 @@ +require "../spec_helper" +require "../../support/fibers" +require "fiber/list" + +describe Fiber::List do + describe "#initialize" do + it "creates an empty queue" do + list = Fiber::List.new + list.@head.should be_nil + list.@tail.should be_nil + list.size.should eq(0) + list.empty?.should be_true + end + + it "creates a filled queue" do + f1 = new_fake_fiber("f1") + f2 = new_fake_fiber("f2") + f1.list_next = f2 + f2.list_next = nil + + list = Fiber::List.new(f2, f1, size: 2) + list.@head.should be(f2) + list.@tail.should be(f1) + list.size.should eq(2) + list.empty?.should be_false + end + end + + describe "#push" do + it "to head" do + list = Fiber::List.new + f1 = new_fake_fiber("f1") + f2 = new_fake_fiber("f2") + f3 = new_fake_fiber("f3") + + # simulate fibers previously added to other queues + f1.list_next = f3 + f2.list_next = f1 + + # push first fiber + list.push(f1) + list.@head.should be(f1) + list.@tail.should be(f1) + f1.list_next.should be_nil + list.size.should eq(1) + + # push second fiber + list.push(f2) + list.@head.should be(f2) + list.@tail.should be(f1) + f2.list_next.should be(f1) + f1.list_next.should be_nil + list.size.should eq(2) + + # push third fiber + list.push(f3) + list.@head.should be(f3) + list.@tail.should be(f1) + f3.list_next.should be(f2) + f2.list_next.should be(f1) + f1.list_next.should be_nil + list.size.should eq(3) + end + end + + describe "#bulk_unshift" do + it "to empty queue" do + # manually create a queue + f1 = new_fake_fiber("f1") + f2 = new_fake_fiber("f2") + f3 = new_fake_fiber("f3") + f3.list_next = f2 + f2.list_next = f1 + f1.list_next = nil + q1 = Fiber::List.new(f3, f1, size: 3) + + # push in bulk + q2 = Fiber::List.new(nil, nil, size: 0) + q2.bulk_unshift(pointerof(q1)) + q2.@head.should be(f3) + q2.@tail.should be(f1) + q2.size.should eq(3) + end + + it "to filled queue" do + f1 = new_fake_fiber("f1") + f2 = new_fake_fiber("f2") + f3 = new_fake_fiber("f3") + f4 = new_fake_fiber("f4") + f5 = new_fake_fiber("f5") + + # source queue + f3.list_next = f2 + f2.list_next = f1 + f1.list_next = nil + q1 = Fiber::List.new(f3, f1, size: 3) + + # destination queue + f5.list_next = f4 + f4.list_next = nil + q2 = Fiber::List.new(f5, f4, size: 2) + + # push in bulk + q2.bulk_unshift(pointerof(q1)) + q2.@head.should be(f5) + q2.@tail.should be(f1) + q2.size.should eq(5) + + f5.list_next.should be(f4) + f4.list_next.should be(f3) + f3.list_next.should be(f2) + f2.list_next.should be(f1) + f1.list_next.should be(nil) + end + end + + describe "#pop" do + it "from head" do + f1 = new_fake_fiber("f1") + f2 = new_fake_fiber("f2") + f3 = new_fake_fiber("f3") + f3.list_next = f2 + f2.list_next = f1 + f1.list_next = nil + list = Fiber::List.new(f3, f1, size: 3) + + # removes third element + list.pop.should be(f3) + list.@head.should be(f2) + list.@tail.should be(f1) + list.size.should eq(2) + + # removes second element + list.pop.should be(f2) + list.@head.should be(f1) + list.@tail.should be(f1) + list.size.should eq(1) + + # removes first element + list.pop.should be(f1) + list.@head.should be_nil + list.@tail.should be_nil + list.size.should eq(0) + + # empty queue + expect_raises(IndexError) { list.pop } + list.size.should eq(0) + end + end + + describe "#pop?" do + it "from head" do + f1 = new_fake_fiber("f1") + f2 = new_fake_fiber("f2") + f3 = new_fake_fiber("f3") + f3.list_next = f2 + f2.list_next = f1 + f1.list_next = nil + list = Fiber::List.new(f3, f1, size: 3) + + # removes third element + list.pop?.should be(f3) + list.@head.should be(f2) + list.@tail.should be(f1) + list.size.should eq(2) + + # removes second element + list.pop?.should be(f2) + list.@head.should be(f1) + list.@tail.should be(f1) + list.size.should eq(1) + + # removes first element + list.pop?.should be(f1) + list.@head.should be_nil + list.@tail.should be_nil + list.size.should eq(0) + + # empty queue + list.pop?.should be_nil + list.size.should eq(0) + end + end +end diff --git a/spec/support/fibers.cr b/spec/support/fibers.cr index 1b095a4422d6..424987677803 100644 --- a/spec/support/fibers.cr +++ b/spec/support/fibers.cr @@ -14,3 +14,13 @@ def wait_until_finished(f : Fiber, timeout = 5.seconds) raise "Fiber failed to finish within #{timeout}" if (Time.monotonic - now) > timeout end end + +# Fake stack for `makecontext` to have somewhere to write in #initialize. We +# don't actually run the fiber. The worst case is windows with ~300 bytes (with +# shadow space and alignment taken into account). We allocate more to be safe. +FAKE_FIBER_STACK = GC.malloc(512) + +def new_fake_fiber(name = nil) + stack = Fiber::Stack.new(FAKE_FIBER_STACK, FAKE_FIBER_STACK + 512) + Fiber.new(name, stack) { } +end diff --git a/src/crystal/system/thread_wait_group.cr b/src/crystal/system/thread_wait_group.cr new file mode 100644 index 000000000000..3494e1e7f569 --- /dev/null +++ b/src/crystal/system/thread_wait_group.cr @@ -0,0 +1,20 @@ +# :nodoc: +class Thread::WaitGroup + def initialize(@count : Int32) + @mutex = Thread::Mutex.new + @condition = Thread::ConditionVariable.new + end + + def done : Nil + @mutex.synchronize do + @count -= 1 + @condition.broadcast if @count == 0 + end + end + + def wait : Nil + @mutex.synchronize do + @condition.wait(@mutex) unless @count == 0 + end + end +end diff --git a/src/fiber.cr b/src/fiber.cr index a7282047e165..d170f84a0bd5 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -75,6 +75,9 @@ class Fiber # :nodoc: property previous : Fiber? + # :nodoc: + property list_next : Fiber? + # :nodoc: def self.inactive(fiber : Fiber) fibers.delete(fiber) diff --git a/src/fiber/execution_context/global_queue.cr b/src/fiber/execution_context/global_queue.cr new file mode 100644 index 000000000000..9462e2a26dad --- /dev/null +++ b/src/fiber/execution_context/global_queue.cr @@ -0,0 +1,106 @@ +# The queue is a port of Go's `globrunq*` functions, distributed under a +# BSD-like license: +# + +require "../list" +require "./runnables" + +module Fiber::ExecutionContext + # :nodoc: + # + # Global queue of runnable fibers. + # Unbounded. + # Shared by all schedulers in an execution context. + # + # Basically a `Fiber::List` protected by a `Thread::Mutex`, at the exception of + # the `#grab?` method that tries to grab 1/Nth of the queue at once. + class GlobalQueue + def initialize(@mutex : Thread::Mutex) + @list = Fiber::List.new + end + + # Grabs the lock and enqueues a runnable fiber on the global runnable queue. + def push(fiber : Fiber) : Nil + @mutex.synchronize { unsafe_push(fiber) } + end + + # Enqueues a runnable fiber on the global runnable queue. Assumes the lock + # is currently held. + def unsafe_push(fiber : Fiber) : Nil + @list.push(fiber) + end + + # Grabs the lock and puts a runnable fiber on the global runnable queue. + def bulk_push(list : Fiber::List*) : Nil + @mutex.synchronize { unsafe_bulk_push(list) } + end + + # Puts a runnable fiber on the global runnable queue. Assumes the lock is + # currently held. + def unsafe_bulk_push(list : Fiber::List*) : Nil + @list.bulk_unshift(list) + end + + # Grabs the lock and dequeues one runnable fiber from the global runnable + # queue. + def pop? : Fiber? + @mutex.synchronize { unsafe_pop? } + end + + # Dequeues one runnable fiber from the global runnable queue. Assumes the + # lock is currently held. + def unsafe_pop? : Fiber? + @list.pop? + end + + # Grabs the lock then tries to grab a batch of fibers from the global + # runnable queue. Returns the next runnable fiber or `nil` if the queue was + # empty. + # + # `divisor` is meant for fair distribution of fibers across threads in the + # execution context; it should be the number of threads. + def grab?(runnables : Runnables, divisor : Int32) : Fiber? + @mutex.synchronize { unsafe_grab?(runnables, divisor) } + end + + # Try to grab a batch of fibers from the global runnable queue. Returns the + # next runnable fiber or `nil` if the queue was empty. Assumes the lock is + # currently held. + # + # `divisor` is meant for fair distribution of fibers across threads in the + # execution context; it should be the number of threads. + def unsafe_grab?(runnables : Runnables, divisor : Int32) : Fiber? + # ported from Go: globrunqget + return if @list.empty? + + divisor = 1 if divisor < 1 + size = @list.size + + n = { + size, # can't grab more than available + size // divisor + 1, # divide + try to take at least 1 fiber + runnables.capacity // 2, # refill half the destination queue + }.min + + fiber = @list.pop? + + # OPTIMIZE: list = @list.split(n - 1) then `runnables.push(pointerof(list))` (?) + (n - 1).times do + break unless f = @list.pop? + runnables.push(f) + end + + fiber + end + + @[AlwaysInline] + def empty? : Bool + @list.empty? + end + + @[AlwaysInline] + def size : Int32 + @list.size + end + end +end diff --git a/src/fiber/execution_context/runnables.cr b/src/fiber/execution_context/runnables.cr new file mode 100644 index 000000000000..64488966ec5f --- /dev/null +++ b/src/fiber/execution_context/runnables.cr @@ -0,0 +1,209 @@ +# The queue is a port of Go's `runq*` functions, distributed under a BSD-like +# license: +# +# The queue derivates from the chase-lev lock-free queue with adaptations: +# +# - single ring buffer (per scheduler); +# - on overflow: bulk push half the ring to `GlobalQueue`; +# - on empty: bulk grab up to half the ring from `GlobalQueue`; +# - bulk push operation; + +require "../list" +require "./global_queue" + +module Fiber::ExecutionContext + # :nodoc: + # + # Local queue or runnable fibers for schedulers. + # Bounded. + # First-in, first-out semantics (FIFO). + # Single producer, multiple consumers thread safety. + # + # Private to an execution context scheduler, except for stealing methods that + # can be called from any thread in the execution context. + class Runnables(N) + def initialize(@global_queue : GlobalQueue) + # head is an index to the buffer where the next fiber to dequeue is. + # + # tail is an index to the buffer where the next fiber to enqueue will be + # (on the next push). + # + # head is always behind tail (not empty) or equal (empty) but never after + # tail (the queue would have a negative size => bug). + @head = Atomic(UInt32).new(0) + @tail = Atomic(UInt32).new(0) + @buffer = uninitialized Fiber[N] + end + + @[AlwaysInline] + def capacity : Int32 + N + end + + # Tries to push fiber on the local runnable queue. If the run queue is full, + # pushes fiber on the global queue, which will grab the global lock. + # + # Executed only by the owner. + def push(fiber : Fiber) : Nil + # ported from Go: runqput + loop do + head = @head.get(:acquire) # sync with consumers + tail = @tail.get(:relaxed) + + if (tail &- head) < N + # put fiber to local queue + @buffer.to_unsafe[tail % N] = fiber + + # make the fiber available for consumption + @tail.set(tail &+ 1, :release) + return + end + + if push_slow(fiber, head, tail) + return + end + + # failed to advance head (another scheduler stole fibers), + # the queue isn't full, now the push above must succeed + end + end + + private def push_slow(fiber : Fiber, head : UInt32, tail : UInt32) : Bool + # ported from Go: runqputslow + n = (tail &- head) // 2 + raise "BUG: queue is not full" if n != N // 2 + + # first, try to grab half of the fibers from local queue + batch = uninitialized Fiber[N] # actually N // 2 + 1 but that doesn't compile + n.times do |i| + batch.to_unsafe[i] = @buffer.to_unsafe[(head &+ i) % N] + end + _, success = @head.compare_and_set(head, head &+ n, :acquire_release, :acquire) + return false unless success + + # append fiber to the batch + batch.to_unsafe[n] = fiber + + # link the fibers + n.times do |i| + batch.to_unsafe[i].list_next = batch.to_unsafe[i &+ 1] + end + list = Fiber::List.new(batch.to_unsafe[0], batch.to_unsafe[n], size: (n &+ 1).to_i32) + + # now put the batch on global queue (grabs the global lock) + @global_queue.bulk_push(pointerof(list)) + + true + end + + # Tries to enqueue all the fibers in *list* into the local queue. If the + # local queue is full, the overflow will be pushed to the global queue; in + # that case this will temporarily acquire the global queue lock. + # + # Executed only by the owner. + def bulk_push(list : Fiber::List*) : Nil + # ported from Go: runqputbatch + head = @head.get(:acquire) # sync with other consumers + tail = @tail.get(:relaxed) + + while !list.value.empty? && (tail &- head) < N + fiber = list.value.pop + @buffer.to_unsafe[tail % N] = fiber + tail &+= 1 + end + + # make the fibers available for consumption + @tail.set(tail, :release) + + # put any overflow on global queue + @global_queue.bulk_push(list) unless list.value.empty? + end + + # Dequeues the next runnable fiber from the local queue. + # + # Executed only by the owner. + def shift? : Fiber? + # ported from Go: runqget + + head = @head.get(:acquire) # sync with other consumers + loop do + tail = @tail.get(:relaxed) + return if tail == head + + fiber = @buffer.to_unsafe[head % N] + head, success = @head.compare_and_set(head, head &+ 1, :acquire_release, :acquire) + return fiber if success + end + end + + # Steals half the fibers from the local queue of `src` and puts them onto + # the local queue. Returns one of the stolen fibers, or `nil` on failure. + # + # Executed only by the owner (when the local queue is empty). + def steal_from(src : Runnables(N)) : Fiber? + # ported from Go: runqsteal + + tail = @tail.get(:relaxed) + n = src.grab(@buffer.to_unsafe, tail) + return if n == 0 + + # 'dequeue' last fiber from @buffer + n &-= 1 + fiber = @buffer.to_unsafe[(tail &+ n) % N] + return fiber if n == 0 + + head = @head.get(:acquire) # sync with consumers + if tail &- head &+ n >= N + raise "BUG: local queue overflow" + end + + # make the fibers available for consumption + @tail.set(tail &+ n, :release) + + fiber + end + + # Grabs a batch of fibers from local queue into `buffer` of size N (normally + # the ring buffer of another `Runnables`) starting at `buffer_head`. Returns + # number of grabbed fibers. + # + # Can be executed by any scheduler. + protected def grab(buffer : Fiber*, buffer_head : UInt32) : UInt32 + # ported from Go: runqgrab + + head = @head.get(:acquire) # sync with other consumers + loop do + tail = @tail.get(:acquire) # sync with the producer + + n = tail &- head + n -= n // 2 + return 0_u32 if n == 0 # queue is empty + + if n > N // 2 + # read inconsistent head and tail + head = @head.get(:acquire) + next + end + + n.times do |i| + fiber = @buffer.to_unsafe[(head &+ i) % N] + buffer[(buffer_head &+ i) % N] = fiber + end + + # try to mark the fiber as consumed + head, success = @head.compare_and_set(head, head &+ n, :acquire_release, :acquire) + return n if success + end + end + + @[AlwaysInline] + def empty? : Bool + @head.get(:relaxed) == @tail.get(:relaxed) + end + + @[AlwaysInline] + def size : UInt32 + @tail.get(:relaxed) &- @head.get(:relaxed) + end + end +end diff --git a/src/fiber/list.cr b/src/fiber/list.cr new file mode 100644 index 000000000000..4918d0c0b92f --- /dev/null +++ b/src/fiber/list.cr @@ -0,0 +1,89 @@ +# The list is modeled after Go's `gQueue`, distributed under a BSD-like +# license: + +class Fiber + # :nodoc: + # + # Singly-linked list of `Fiber`. + # Last-in, first-out (LIFO) semantic. + # A fiber can only exist within a single `List` at any time. + # + # This list if simpler than `Crystal::PointerLinkedList` which is a doubly + # linked list. It's meant to maintain a queue of runnable fibers, or to + # quickly collect an arbitrary number of fibers; situations where we don't + # need arbitrary deletions from anywhere in the list. + # + # Thread unsafe! An external lock is required for concurrent accesses. + struct List + getter size : Int32 + + def initialize(@head : Fiber? = nil, @tail : Fiber? = nil, @size = 0) + end + + # Appends *fiber* to the head of the list. + def push(fiber : Fiber) : Nil + fiber.list_next = @head + @head = fiber + @tail = fiber if @tail.nil? + @size += 1 + end + + # Appends all the fibers from *other* to the tail of the list. + def bulk_unshift(other : List*) : Nil + return unless last = other.value.@tail + last.list_next = nil + + if tail = @tail + tail.list_next = other.value.@head + else + @head = other.value.@head + end + @tail = last + + @size += other.value.size + end + + # Removes a fiber from the head of the list. Raises `IndexError` when + # empty. + @[AlwaysInline] + def pop : Fiber + pop { raise IndexError.new } + end + + # Removes a fiber from the head of the list. Returns `nil` when empty. + @[AlwaysInline] + def pop? : Fiber? + pop { nil } + end + + private def pop(&) + if fiber = @head + @head = fiber.list_next + @tail = nil if @head.nil? + @size -= 1 + fiber.list_next = nil + fiber + else + yield + end + end + + @[AlwaysInline] + def empty? : Bool + @head == nil + end + + def clear : Nil + @size = 0 + @head = @tail = nil + end + + def each(&) : Nil + cursor = @head + while cursor + yield cursor + cursor = cursor.list_next + end + end + end +end