-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add fibered_connection_pool Sequel extension. (#36)
* add fibered_connection_pool extension. * Hotfix readme links * Hotfix readme links * Hotfix readme links * Spec * More spec fixes. * Hotfix old ruby compatibility. * Hotfix old ruby compatibility. * Hotfix old ruby compatibility. * Add ruby 2.7 issue link * Add TODO to dev dependency gem. --------- Co-authored-by: ash <[email protected]>
- Loading branch information
1 parent
376c8d8
commit 600ea51
Showing
6 changed files
with
248 additions
and
19 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
# frozen_string_literal: true | ||
|
||
require "async" | ||
require "async/notification" | ||
|
||
class Sequel::FiberedConnectionPool < Sequel::ConnectionPool | ||
def initialize(db, opts = Sequel::OPTS) | ||
super(db, opts) | ||
|
||
@max_connections = opts[:max_connections] | ||
@available_connections = [] | ||
@notification = Async::Notification.new | ||
@size = 0 | ||
end | ||
|
||
def hold(*) | ||
connection = wait_for_connection | ||
return connection unless block_given? | ||
|
||
begin | ||
yield connection | ||
rescue Sequel::DatabaseDisconnectError, *@error_classes => error | ||
if disconnect_error?(error) | ||
disconnect_connection(connection) | ||
connection = nil | ||
@size -= 1 | ||
end | ||
raise | ||
ensure | ||
if connection | ||
@available_connections.push(connection) | ||
@notification.signal if Async::Task.current? | ||
end | ||
end | ||
end | ||
|
||
def disconnect(*) | ||
@available_connections.each(&:close) | ||
@available_connections.clear | ||
|
||
@size = 0 | ||
end | ||
|
||
def size | ||
@size | ||
end | ||
|
||
private | ||
|
||
def wait_for_connection | ||
until (connection = find_or_create_connection) | ||
@notification.wait | ||
end | ||
|
||
connection | ||
end | ||
|
||
def find_or_create_connection | ||
if (connection = @available_connections.shift) | ||
return connection | ||
end | ||
|
||
if @max_connections.nil? || @size < @max_connections | ||
connection = make_new(:default) | ||
@size += 1 | ||
|
||
return connection | ||
end | ||
|
||
nil | ||
end | ||
end | ||
|
||
module Sequel::ConnectionPoolPatch | ||
def connection_pool_class(*) | ||
Sequel.current.is_a?(Fiber) ? Sequel::FiberedConnectionPool : super | ||
end | ||
end | ||
|
||
# NOTE: Ruby 2.7 DOES NOT SUPPORT class methods prepend in this way | ||
# https://bugs.ruby-lang.org/issues/17423 | ||
if RUBY_VERSION > "3" | ||
Sequel::ConnectionPool::ClassMethods.prepend(Sequel::ConnectionPoolPatch) | ||
else | ||
class Sequel::ConnectionPool | ||
class << self | ||
prepend Sequel::ConnectionPoolPatch | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
# frozen_string_literal: true | ||
|
||
RSpec.describe Sequel::FiberedConnectionPool do | ||
describe "#initialize" do | ||
it "creates pool with options" do | ||
pool = described_class.new(DB, DB.opts) | ||
expect(pool.size).to eq(0) | ||
end | ||
end | ||
|
||
describe "#hold" do | ||
let(:pool) { described_class.new(DB, DB.opts) } | ||
|
||
it "creates connection" do | ||
pool.hold { 1 + 1 } | ||
expect(pool.size).to eq(1) | ||
end | ||
|
||
it "return connection if block not given" do | ||
expect(pool.hold).to be_a(Sequel::Postgres::Adapter) | ||
end | ||
|
||
it "drops connection on Sequel::DatabaseDisconnectError" do | ||
pool.hold { 1 + 1 } | ||
expect { pool.hold { raise Sequel::DatabaseDisconnectError } }.to \ | ||
raise_error(Sequel::DatabaseDisconnectError) | ||
expect(pool.size).to eq(0) | ||
end | ||
|
||
it "drops connection if connection is closed" do | ||
pool.hold { 1 + 1 } | ||
|
||
expect do | ||
pool.hold do |connection| | ||
connection.close | ||
raise Sequel::DatabaseDisconnectError | ||
end | ||
end.to raise_error(Sequel::DatabaseDisconnectError) | ||
|
||
expect(pool.size).to eq(0) | ||
end | ||
|
||
it "does not drop connection on PG::Error" do | ||
pool.hold { 1 + 1 } | ||
expect { pool.hold { raise PG::Error } }.to raise_error(PG::Error) | ||
expect(pool.size).to eq(1) | ||
end | ||
end | ||
|
||
describe "#disconnect" do | ||
let(:pool) { described_class.new(DB, DB.opts) } | ||
|
||
it "close each connection" do | ||
pool.hold { 1 + 1 } | ||
expect(pool.size).to eq(1) | ||
|
||
pool.disconnect | ||
expect(pool.size).to eq(0) | ||
end | ||
end | ||
|
||
describe "#wait_for_connection" do | ||
let(:pool) do | ||
opts = DB.opts.dup | ||
opts[:max_connections] = 0 | ||
|
||
described_class.new(DB, opts) | ||
end | ||
|
||
it "waits for connection" do | ||
Async do |task| | ||
task.async { pool.hold { expect(1 + 1).to eq(2) } } | ||
task.async do | ||
pool.instance_variable_set(:@max_connections, 1) | ||
pool.instance_variable_get(:@notification).signal | ||
end | ||
end | ||
end | ||
end | ||
|
||
describe "#find_or_create_connection" do | ||
let(:pool) do | ||
opts = Sequel::DATABASES.first.opts.dup | ||
opts[:max_connections] = 0 | ||
|
||
described_class.new(Sequel::DATABASES.first, opts) | ||
end | ||
|
||
it "does not create more connections" do | ||
expect(pool.send(:find_or_create_connection)).to eq(nil) | ||
end | ||
end | ||
end | ||
|
||
RSpec.describe Sequel::ConnectionPool do | ||
before { allow(Sequel).to receive(:current).and_return(Fiber.current) } | ||
|
||
it "return Sequel::FiberedConnectionPool if Sequel.current is a Fiber" do | ||
expect(described_class.connection_pool_class("test")).to eq(Sequel::FiberedConnectionPool) | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters