diff --git a/benchmark/respond_to_vs_hash.rb b/benchmark/respond_to_vs_hash.rb new file mode 100644 index 00000000..f26c177b --- /dev/null +++ b/benchmark/respond_to_vs_hash.rb @@ -0,0 +1,108 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +# Copyright, 2019, by Samuel G. D. Williams. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +require 'benchmark/ips' + +GC.disable + +class FakeBody + def each + yield "Fake Body" + end +end + +object = FakeBody.new + +class Stream + def write(chunk) + end +end + +stream = Stream.new + +proc = Proc.new do |stream| + stream.write "Fake Body" +end + +hash = Hash.new +10.times do |i| + hash['fake-header-i'] = i +end + +hijack_hash = hash.dup +hijack_hash['rack.hijack'] = proc + +Benchmark.ips do |benchmark| + benchmark.time = 1 + benchmark.warmup = 1 + + benchmark.report("object") do |count| + while count > 0 + if object.respond_to?(:call) + object.call(stream) + else + object.each{|x| stream.write(x)} + end + + count -= 1 + end + end + + benchmark.report("proc") do |count| + while count > 0 + if proc.respond_to?(:call) + proc.call(stream) + else + object.each{|x| stream.write(x)} + end + + count -= 1 + end + end + + benchmark.report("hash") do |count| + while count > 0 + if hijack = hash['rack.hijack'] + hijack.call(stream) + else + object.each{|x| stream.write(x)} + end + + count -= 1 + end + end + + benchmark.report("hijack-hash") do |count| + while count > 0 + if hijack = hijack_hash['rack.hijack'] + hijack.call(stream) + else + object.each{|x| stream.write(x)} + end + + count -= 1 + end + end + + benchmark.compare! +end diff --git a/examples/duration.rb b/examples/duration.rb new file mode 100644 index 00000000..83f0685b --- /dev/null +++ b/examples/duration.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +class Duration + def initialize(value) + @value = value + end + + attr :value + + SUFFIX = ["s", "ms", "μs", "ns"] + + def zero? + @value.zero? + end + + def to_s + return "0" if self.zero? + + unit = 0 + value = @value.to_f + + while value < 1.0 && unit < SUFFIX.size + value = value * 1000.0 + unit = unit + 1 + end + + return "#{value.round(2)}#{SUFFIX[unit]}" + end + + def / factor + self.class.new(@value / factor) + end + + def self.time + t = Process.times + return t.stime + t.utime + t.cstime + t.cutime + end + + def self.measure + t = self.time + + yield + + return self.new(self.time - t) + end +end diff --git a/examples/scheduler/config.ru b/examples/scheduler/config.ru new file mode 100644 index 00000000..11387666 --- /dev/null +++ b/examples/scheduler/config.ru @@ -0,0 +1,10 @@ + +require 'net/http' + +run ->(env) do + response = Net::HTTP.get(URI "https://www.google.com/search?q=ruby") + + count = response.scan("ruby").count + + [200, [], ["Found ruby #{count} times!"]] +end diff --git a/examples/scheduler/http.rb b/examples/scheduler/http.rb new file mode 100755 index 00000000..2d48645c --- /dev/null +++ b/examples/scheduler/http.rb @@ -0,0 +1,26 @@ +#!/usr/bin/env ruby + +require_relative '../../lib/async' +require_relative '../../lib/async/barrier' + +require 'net/http' + +terms = ['cats', 'dogs', 'sheep', 'cows'] + +Async do + barrier = Async::Barrier.new + + terms.each do |term| + barrier.async do + Console.logger.info "Searching for #{term}" + + response = Net::HTTP.get(URI "https://www.google.com/search?q=#{term}") + + count = response.scan(term).count + + Console.logger.info "Found #{term} #{count} times!" + end + end + + barrier.wait +end diff --git a/examples/scheduler/ko1.rb b/examples/scheduler/ko1.rb new file mode 100755 index 00000000..55ccde0e --- /dev/null +++ b/examples/scheduler/ko1.rb @@ -0,0 +1,143 @@ +#!/usr/bin/env ruby + +require 'fiber' +require 'io/nonblock' +require 'open-uri' + +class Scheduler + def initialize + @ready = [] + @waiting = [] # [[f, type, opts], ...] + end + + def now + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + + def wait_readable_fd(fd) + wait_readable(::IO.for_fd(fd, autoclose: false)) + end + + def wait_readable(io) + p wait_readable: io + Fiber.yield :wait_readable, io + + true + end + + def wait_any(io, events, timeout) + p [:wait_any, io, events, timeout] + case events + when IO::WAIT_READABLE + Fiber.yield :wait_readable, io + when IO::WAIT_WRITABLE + Fiber.yield :wait_writable, io + when IO::WAIT_READABLE | IO::WAIT_WRITABLE + Fiber.yield :wait_any, io + end + + true + end + + # Wrapper for rb_wait_for_single_fd(int) C function. + def wait_for_single_fd(fd, events, duration) + wait_any(::IO.for_fd(fd, autoclose: false), events, duration) + end + + # Sleep the current task for the specified duration, or forever if not + # specified. + # @param duration [#to_f] the amount of time to sleep. + def wait_sleep(duration = nil) + Fiber.yield :sleep, self.now + duration + end + + def fiber + @ready << f = Fiber.new(blocking: false){ + yield + :exit + } + f + end + + def schedule_ready + while f = @ready.shift + wait, opts = f.resume + case wait + when :exit + # ok + else + @waiting << [f, wait, opts] + end + end + end + + def run + until @ready.empty? && @waiting.empty? + schedule_ready + next if @waiting.empty? + p @waiting + wakeup_time = nil + wakeup_fiber = nil + rs = [] + ws = [] + now = self.now + @waiting.each{|f, type, opt| + case type + when :sleep + t = opt + if !wakeup_time || wakeup_time > t + wakeup_time = t + wakeup_fiber = f + end + when :wait_readable + io = opt + rs << io + when :wait_writable + io = opt + ws << io + when :wait_any + io = opt + rs << io + ws << io + end + } + if wakeup_time + if wakeup_time > now + dur = wakeup_time - self.now + else + @ready << wakeup_fiber + @waiting.delete_if{|f,| f == wakeup_fiber} + end + end + pp dur + rs, ws, es = IO.select rs, ws, nil, dur + pp selected: [rs, ws, es] + [*rs, *ws].each{|io| + @waiting.delete_if{|f, type, opt| + if opt == io + p [:ready, f, io] + @ready << f + true + end + } + } + pp after: @waiting + end + end + + def enter_blocking_region + #pp caller(0) + end + + def exit_blocking_region + #pp caller(0) + end +end + +Thread.current.scheduler = Scheduler.new + +Fiber do + URI.open('http://www.ruby-lang.org/'){|f| p f.gets} +end + +Thread.current.scheduler.run diff --git a/examples/scheduler/pipe.rb b/examples/scheduler/pipe.rb new file mode 100755 index 00000000..43fb0804 --- /dev/null +++ b/examples/scheduler/pipe.rb @@ -0,0 +1,46 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +puts "Starting..." + +require_relative '../../lib/async' +require_relative '../../lib/async/scheduler' + +require 'io/nonblock' + +thread = Thread.current + +abort "Require Thread\#selector patch" unless thread.respond_to?(:selector) + +MESSAGE = "Helloooooo World!" + +Async do |task| + scheduler = Async::Scheduler.new(task.reactor) + + thread.selector = scheduler + + input, output = IO.pipe + input.nonblock = true + output.nonblock = true + + task.async do + MESSAGE.each_char do |character| + puts "Writing: #{character}" + output.write(character) + sleep(1) + end + + output.close + end + + input.each_char do |character| + puts "#{Async::Clock.now}: #{character}" + end + + puts "Closing" + input.close +ensure + thread.selector = nil +end + +puts "Done" \ No newline at end of file diff --git a/examples/scheduler/sample/config.ru b/examples/scheduler/sample/config.ru new file mode 100644 index 00000000..9b93065a --- /dev/null +++ b/examples/scheduler/sample/config.ru @@ -0,0 +1,12 @@ + +require 'net/http' + +run ->(env) do + term = "ruby" + + response = Net::HTTP.get(URI "https://www.google.com/search?q=#{term}") + + count = response.scan(term).size + + [200, [], ["Found #{count} times.\n"]] +end diff --git a/examples/scheduler/sample/http.rb b/examples/scheduler/sample/http.rb new file mode 100755 index 00000000..a195b2c2 --- /dev/null +++ b/examples/scheduler/sample/http.rb @@ -0,0 +1,25 @@ +#!/usr/bin/env ruby + +require_relative '../../lib/async' +require_relative '../../lib/async/barrier' + +require 'net/http' + +terms = ["ruby", "rust", "python", "basic", "clojure"] + +Async do + barrier = Async::Barrier.new + + terms.each do |term| + barrier.async do |task| + Console.logger.info(task, "Fetching #{term}") + + response = Net::HTTP.get(URI "https://www.google.com/search?q=#{term}") + + term_count = response.scan(term).size + Console.logger.info(task, "Found #{term_count} times") + end + end + + barrier.wait +end diff --git a/examples/scheduler/sqlite3.rb b/examples/scheduler/sqlite3.rb new file mode 100755 index 00000000..48bdd261 --- /dev/null +++ b/examples/scheduler/sqlite3.rb @@ -0,0 +1,11 @@ +#!/usr/bin/env ruby + +require 'sqlite3' + +require_relative '../../lib/async' + +Async do + s = "SELECT 1;"*500000 + db = SQLite3::Database.new(':memory:') + db.execute_batch2(s) +end diff --git a/examples/scheduler/starvation/client.rb b/examples/scheduler/starvation/client.rb new file mode 100755 index 00000000..8a35833e --- /dev/null +++ b/examples/scheduler/starvation/client.rb @@ -0,0 +1,51 @@ +require_relative '../../../lib/async' +require_relative 'resource_pool' + +POOL = ResourcePool.new(pool_size: 1, timeout: 0.1) +WORKER_COUNT = 3 +MAX_TEST_DURATION = 2.0 +LOG_COLORS = [:light_blue, :light_magenta, :light_green, :light_red, :light_cyan, :light_yellow, + :blue, :magenta, :green, :red, :cyan, :yellow] + +class Logger + def self.debug(message) + task = Async::Task.current + fiber = Fiber.current + color = Thread.current[:log_color] + Console.logger.info(task, message) + end +end + +Async do + clock = Async::Clock.new + clock.start! + + WORKER_COUNT.times do |n| + Async(annotation: "worker-#{n}") do + Thread.current[:log_color] = LOG_COLORS[n] + + begin + while clock.total < MAX_TEST_DURATION do + POOL.with_resource do + Logger.debug('Sleep with resource #1') + sleep(0.001) # simulates a DB call + end + + POOL.with_resource do + Logger.debug('Sleep with resource #2') + sleep(0.001) # simulates a DB call + end + + Logger.debug('Sleep without resource') + sleep(0.001) # simulates some other IO + end + rescue ResourcePool::TimeoutError => e + Logger.debug("Timed out. Aborting test after #{clock.total} seconds") + puts "#{e.class} #{e.message}" + puts e.backtrace + STDOUT.flush + Kernel.exit! + end + end + end +end \ No newline at end of file diff --git a/examples/scheduler/starvation/resource_pool.rb b/examples/scheduler/starvation/resource_pool.rb new file mode 100644 index 00000000..cb9bf636 --- /dev/null +++ b/examples/scheduler/starvation/resource_pool.rb @@ -0,0 +1,84 @@ +# Uses the same acquire/release flow as Sequel::ThreadedConnectionPool +class ResourcePool + class TimeoutError < StandardError; end + + def initialize(pool_size:, timeout:) + @available_resources = pool_size.times.map { |n| "resource-#{n}" } + @timeout = timeout + @mutex = Mutex.new + @waiter = ConditionVariable.new + end + + def with_resource + resource = acquire + yield resource + ensure + if resource + release(resource) + end + end + + private + + def acquire + if resource = sync_next_available + Logger.debug('Pool: Acquired resource without waiting') + return resource + end + + timeout = @timeout + start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + + @mutex.synchronize do + Logger.debug('Pool: Waiting') + @waiter.wait(@mutex, timeout) + if resource = next_available + Logger.debug('Pool: Acquired resource after waiting') + return resource + end + end + + until resource = sync_next_available + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time + + if elapsed > timeout + raise TimeoutError, "Unable to acquire resource after #{elapsed} seconds" + end + + # We get here when the resource was released and this fiber was unblocked by the signal, + # but the resource was immediately re-acquired by the fiber that sent the signal before + # this fiber could be resumed. Effectively a race condition. + @mutex.synchronize do + Logger.debug('Pool: Woken by signal but resource unavailable. Waiting again.') + @waiter.wait(@mutex, timeout - elapsed) + if resource = next_available + Logger.debug('Pool: Acquired resource after multiple waits') + return resource + end + end + end + + Logger.debug('Pool: Acquired resource after waiting') + resource + end + + def release(resource) + @mutex.synchronize do + @available_resources << resource + Logger.debug('Pool: Released resource. Signaling.') + @waiter.signal + end + + sleep(0) + end + + def sync_next_available + @mutex.synchronize do + next_available + end + end + + def next_available + @available_resources.pop + end +end diff --git a/examples/scheduler/wait.rb b/examples/scheduler/wait.rb new file mode 100755 index 00000000..0636892a --- /dev/null +++ b/examples/scheduler/wait.rb @@ -0,0 +1,12 @@ +#!/usr/bin/env ruby + +require_relative '../../lib/async' + +Async do + 10.times do + Async do + pid = Process.spawn("echo Sleeping; sleep 1; echo Hello World") + Process.wait(pid) + end + end +end diff --git a/examples/transient/exit.rb b/examples/transient/exit.rb new file mode 100755 index 00000000..8f522065 --- /dev/null +++ b/examples/transient/exit.rb @@ -0,0 +1,22 @@ +#!/usr/bin/env ruby + +require_relative '../../lib/async' + +Async do + Async do |task| + task.sleep(1) + Async.logger.info(task) {"Finished sleeping."} + end + + # When all other non-transient tasks are finished, the transient task will be stopped too. + Async(transient: true) do |task| + while true + Async.logger.info(task) {"Transient task sleeping..."} + task.reactor.print_hierarchy + task.sleep(2) + end + ensure + Async.logger.info(task) {"Transient task exiting: #{$!}"} + end +end + diff --git a/examples/transient/http.rb b/examples/transient/http.rb new file mode 100755 index 00000000..d4135903 --- /dev/null +++ b/examples/transient/http.rb @@ -0,0 +1,23 @@ +#!/usr/bin/env ruby + +require 'async' +require 'async/http/internet' + +Async do |task| + internet = Async::HTTP::Internet.new + + response = internet.get("https://www.google.com/search?q=ruby") + response.finish + + task.reactor.print_hierarchy + + Async(transient: true) do |task| + while true + task.sleep + end + ensure + internet&.close + end +end + +puts "Finished" \ No newline at end of file