Skip to content

Commit

Permalink
fix sqlite and parallel tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mceachen committed Jul 5, 2014
1 parent fe86c63 commit 31ec2e4
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 108 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ end

## Changelog

### 1.1.0
### 2.0.0

* Lock timeouts of 0 now attempt the lock once, as per suggested by
[Jon Leighton](https://github.com/jonleighton) and implemented by
Expand All @@ -144,6 +144,7 @@ end
* Added Travis tests for jruby
* Dropped support for Rails 3.0, 3.1, and Ruby 1.8.7, as they are no longer
receiving security patches. See http://rubyonrails.org/security/ for more information.
This required the major version bump.

### 1.0.0

Expand Down
4 changes: 2 additions & 2 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ end
require 'rake/testtask'

Rake::TestTask.new do |t|
t.libs.push "lib"
t.libs.push "test"
t.libs.push 'lib'
t.libs.push 'test'
t.pattern = 'test/**/*_test.rb'
t.verbose = true
end
Expand Down
6 changes: 6 additions & 0 deletions lib/with_advisory_lock/flock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ def try_lock
0 == file_io.flock(File::LOCK_EX|File::LOCK_NB)
end

def advisory_lock_exists?(name)
acquired = true
self.class.new(nil, name, 0).yield_with_lock { acquired = false }
acquired
end

def release_lock
0 == file_io.flock(File::LOCK_UN)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/with_advisory_lock/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module WithAdvisoryLock
VERSION = Gem::Version.new('1.1.0')
VERSION = Gem::Version.new('2.0.0')
end
4 changes: 1 addition & 3 deletions test/lock_test.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
require 'minitest_helper'

describe 'class methods' do

let(:lock_name) { "test lock #{rand(1024)}" }
let(:expected_lock_name) { "#{ENV['WITH_ADVISORY_LOCK_PREFIX']}#{lock_name}" }

Expand Down Expand Up @@ -38,5 +37,4 @@
block_was_yielded.must_be_true
end
end

end if test_lock_exists?
end
4 changes: 0 additions & 4 deletions test/minitest_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ def env_db

Thread.abort_on_exception = true

def test_lock_exists?
[:mysql, :postgresql].include? env_db
end

class MiniTest::Spec
before do
ENV['FLOCK_DIR'] = Dir.mktmpdir
Expand Down
130 changes: 70 additions & 60 deletions test/parallelism_test.rb
Original file line number Diff line number Diff line change
@@ -1,58 +1,73 @@
require 'minitest_helper'

parallelism_is_broken = begin
# Rails < 3.2 has known bugs with parallelism
(ActiveRecord::VERSION::MAJOR <= 3 && ActiveRecord::VERSION::MINOR < 2) ||
# SQLite doesn't support parallel writes
ENV["DB"] =~ /sqlite/
end

describe "parallelism" do
def find_or_create_at(run_at, with_advisory_lock)
ActiveRecord::Base.connection.reconnect!
sleep(run_at - Time.now.to_f)
name = run_at.to_s
task = lambda do
Tag.transaction do
Tag.find_by_name(name) || Tag.create(:name => name)
class WorkerBase
def initialize(target, run_at, name, use_advisory_lock)
@thread = Thread.new do
ActiveRecord::Base.connection_pool.with_connection do
before_work
sleep((run_at - Time.now).to_f)
if use_advisory_lock
Tag.with_advisory_lock(name) { work(name) }
else
work(name)
end
end
end
end
if with_advisory_lock
Tag.with_advisory_lock(name, nil, &task)
else
task.call

def before_work
end

def work(name)
raise
end

def join
@thread.join
end
ActiveRecord::Base.connection.close if ActiveRecord::Base.connection.respond_to?(:close)
end

def run_workers(with_advisory_lock)
skip if env_db == :sqlite
@iterations.times do
time = (Time.now.to_i + 4).to_f
threads = @workers.times.collect do
Thread.new do
find_or_create_at(time, with_advisory_lock)
end
class FindOrCreateWorker < WorkerBase
def work(name)
Tag.transaction do
Tag.find_or_create_by_name(name: name)
end
threads.each { |ea| ea.join }
end
puts "Created #{Tag.all.size} (lock = #{with_advisory_lock})"
end

def run_workers(use_advisory_lock, worker_class = FindOrCreateWorker)
all_workers = []
@names = @iterations.times.map { |iter| "iteration ##{iter}" }
@names.each do |name|
wake_time = 1.second.from_now
workers = @workers.times.map do
worker_class.new(@target, wake_time, name, use_advisory_lock)
end
workers.each(&:join)
all_workers += workers
puts name
end
# Ensure we're still connected:
ActiveRecord::Base.connection_pool.connection
all_workers
end

before :each do
ActiveRecord::Base.connection.reconnect!
@iterations = 5
@workers = 5
@workers = 10
end

it "parallel threads create multiple duplicate rows" do
run_workers(with_advisory_lock = false)
it "creates multiple duplicate rows without advisory locks" do
run_workers(use_advisory_lock = false)
Tag.all.size.must_be :>, @iterations # <- any duplicated rows will make me happy.
TagAudit.all.size.must_be :>, @iterations # <- any duplicated rows will make me happy.
Label.all.size.must_be :>, @iterations # <- any duplicated rows will make me happy.
end
end unless env_db == :sqlite

it "parallel threads with_advisory_lock don't create multiple duplicate rows" do
run_workers(with_advisory_lock = true)
it "doesn't create multiple duplicate rows with advisory locks" do
run_workers(use_advisory_lock = true)
Tag.all.size.must_equal @iterations # <- any duplicated rows will NOT make me happy.
TagAudit.all.size.must_equal @iterations # <- any duplicated rows will NOT make me happy.
Label.all.size.must_equal @iterations # <- any duplicated rows will NOT make me happy.
Expand All @@ -61,37 +76,32 @@ def run_workers(with_advisory_lock)
it "returns false if the lock wasn't acquirable" do
t1_acquired_lock = false
t1_return_value = nil
lock_name = "testing 1,2,3"

t1 = Thread.new do
ActiveRecord::Base.connection.reconnect!
t1_return_value = Label.with_advisory_lock("testing 1,2,3") do
t1_acquired_lock = true
sleep(0.3)
"boom"
ActiveRecord::Base.connection_pool.with_connection do
t1_return_value = Label.with_advisory_lock(lock_name) do
t1_acquired_lock = true
sleep(0.5)
't1 finished'
end
end
end

# Make sure the lock is acquired:
sleep(0.1)

# Now try to acquire the lock impatiently:
t2_acquired_lock = false
t2_return_value = nil
t2 = Thread.new do
ActiveRecord::Base.connection.reconnect!
t2_return_value = Label.with_advisory_lock("testing 1,2,3", 0.1) do
t2_acquired_lock = true
"not expected"
end
ActiveRecord::Base.connection.reconnect!
Label.with_advisory_lock(lock_name, 0) do
fail "lock should not be acquirable at this point"
end

# Wait for them to finish:
t1.join
t2.join

t1_acquired_lock.must_be_true
t1_return_value.must_equal "boom"

t2_acquired_lock.must_be_false
t2_return_value.must_be_false
t1_return_value.must_equal 't1 finished'
ActiveRecord::Base.connection.reconnect!
# We should now be able to acquire the lock immediately:
reacquired = false
Label.with_advisory_lock(lock_name, 0) do
reacquired = true
end.must_be_true
reacquired.must_be_true
end
end unless parallelism_is_broken
end
37 changes: 0 additions & 37 deletions test/simple_parallel_test.rb

This file was deleted.

0 comments on commit 31ec2e4

Please sign in to comment.