From 31ec2e4e032dee25d2c3178e3f1b6995c6487a7e Mon Sep 17 00:00:00 2001 From: Matthew McEachen Date: Fri, 4 Jul 2014 21:18:32 -0700 Subject: [PATCH] fix sqlite and parallel tests --- README.md | 3 +- Rakefile | 4 +- lib/with_advisory_lock/flock.rb | 6 ++ lib/with_advisory_lock/version.rb | 2 +- test/lock_test.rb | 4 +- test/minitest_helper.rb | 4 - test/parallelism_test.rb | 130 ++++++++++++++++-------------- test/simple_parallel_test.rb | 37 --------- 8 files changed, 82 insertions(+), 108 deletions(-) delete mode 100644 test/simple_parallel_test.rb diff --git a/README.md b/README.md index 33360ec..101981c 100644 --- a/README.md +++ b/README.md @@ -134,7 +134,7 @@ end ## Changelog -### 1.1.0 +### 2.0.0 * Lock timeouts of 0 now attempt the lock once, as per suggested by [Jon Leighton](https://github.com/jonleighton) and implemented by @@ -144,6 +144,7 @@ end * Added Travis tests for jruby * Dropped support for Rails 3.0, 3.1, and Ruby 1.8.7, as they are no longer receiving security patches. See http://rubyonrails.org/security/ for more information. + This required the major version bump. ### 1.0.0 diff --git a/Rakefile b/Rakefile index b0bc3a6..f791690 100644 --- a/Rakefile +++ b/Rakefile @@ -8,8 +8,8 @@ end require 'rake/testtask' Rake::TestTask.new do |t| - t.libs.push "lib" - t.libs.push "test" + t.libs.push 'lib' + t.libs.push 'test' t.pattern = 'test/**/*_test.rb' t.verbose = true end diff --git a/lib/with_advisory_lock/flock.rb b/lib/with_advisory_lock/flock.rb index f469ca3..e58a7c7 100644 --- a/lib/with_advisory_lock/flock.rb +++ b/lib/with_advisory_lock/flock.rb @@ -23,6 +23,12 @@ def try_lock 0 == file_io.flock(File::LOCK_EX|File::LOCK_NB) end + def advisory_lock_exists?(name) + acquired = true + self.class.new(nil, name, 0).yield_with_lock { acquired = false } + acquired + end + def release_lock 0 == file_io.flock(File::LOCK_UN) end diff --git a/lib/with_advisory_lock/version.rb b/lib/with_advisory_lock/version.rb index df9012e..717183b 100644 --- a/lib/with_advisory_lock/version.rb +++ b/lib/with_advisory_lock/version.rb @@ -1,3 +1,3 @@ module WithAdvisoryLock - VERSION = Gem::Version.new('1.1.0') + VERSION = Gem::Version.new('2.0.0') end diff --git a/test/lock_test.rb b/test/lock_test.rb index 8cff114..9ec7fbe 100644 --- a/test/lock_test.rb +++ b/test/lock_test.rb @@ -1,7 +1,6 @@ require 'minitest_helper' describe 'class methods' do - let(:lock_name) { "test lock #{rand(1024)}" } let(:expected_lock_name) { "#{ENV['WITH_ADVISORY_LOCK_PREFIX']}#{lock_name}" } @@ -38,5 +37,4 @@ block_was_yielded.must_be_true end end - -end if test_lock_exists? +end diff --git a/test/minitest_helper.rb b/test/minitest_helper.rb index 17a03db..c66d131 100644 --- a/test/minitest_helper.rb +++ b/test/minitest_helper.rb @@ -20,10 +20,6 @@ def env_db Thread.abort_on_exception = true -def test_lock_exists? - [:mysql, :postgresql].include? env_db -end - class MiniTest::Spec before do ENV['FLOCK_DIR'] = Dir.mktmpdir diff --git a/test/parallelism_test.rb b/test/parallelism_test.rb index 5fcb946..0e19929 100644 --- a/test/parallelism_test.rb +++ b/test/parallelism_test.rb @@ -1,58 +1,73 @@ require 'minitest_helper' -parallelism_is_broken = begin - # Rails < 3.2 has known bugs with parallelism - (ActiveRecord::VERSION::MAJOR <= 3 && ActiveRecord::VERSION::MINOR < 2) || - # SQLite doesn't support parallel writes - ENV["DB"] =~ /sqlite/ -end - describe "parallelism" do - def find_or_create_at(run_at, with_advisory_lock) - ActiveRecord::Base.connection.reconnect! - sleep(run_at - Time.now.to_f) - name = run_at.to_s - task = lambda do - Tag.transaction do - Tag.find_by_name(name) || Tag.create(:name => name) + class WorkerBase + def initialize(target, run_at, name, use_advisory_lock) + @thread = Thread.new do + ActiveRecord::Base.connection_pool.with_connection do + before_work + sleep((run_at - Time.now).to_f) + if use_advisory_lock + Tag.with_advisory_lock(name) { work(name) } + else + work(name) + end + end end end - if with_advisory_lock - Tag.with_advisory_lock(name, nil, &task) - else - task.call + + def before_work + end + + def work(name) + raise + end + + def join + @thread.join end - ActiveRecord::Base.connection.close if ActiveRecord::Base.connection.respond_to?(:close) end - def run_workers(with_advisory_lock) - skip if env_db == :sqlite - @iterations.times do - time = (Time.now.to_i + 4).to_f - threads = @workers.times.collect do - Thread.new do - find_or_create_at(time, with_advisory_lock) - end + class FindOrCreateWorker < WorkerBase + def work(name) + Tag.transaction do + Tag.find_or_create_by_name(name: name) end - threads.each { |ea| ea.join } end - puts "Created #{Tag.all.size} (lock = #{with_advisory_lock})" + end + + def run_workers(use_advisory_lock, worker_class = FindOrCreateWorker) + all_workers = [] + @names = @iterations.times.map { |iter| "iteration ##{iter}" } + @names.each do |name| + wake_time = 1.second.from_now + workers = @workers.times.map do + worker_class.new(@target, wake_time, name, use_advisory_lock) + end + workers.each(&:join) + all_workers += workers + puts name + end + # Ensure we're still connected: + ActiveRecord::Base.connection_pool.connection + all_workers end before :each do + ActiveRecord::Base.connection.reconnect! @iterations = 5 - @workers = 5 + @workers = 10 end - it "parallel threads create multiple duplicate rows" do - run_workers(with_advisory_lock = false) + it "creates multiple duplicate rows without advisory locks" do + run_workers(use_advisory_lock = false) Tag.all.size.must_be :>, @iterations # <- any duplicated rows will make me happy. TagAudit.all.size.must_be :>, @iterations # <- any duplicated rows will make me happy. Label.all.size.must_be :>, @iterations # <- any duplicated rows will make me happy. - end + end unless env_db == :sqlite - it "parallel threads with_advisory_lock don't create multiple duplicate rows" do - run_workers(with_advisory_lock = true) + it "doesn't create multiple duplicate rows with advisory locks" do + run_workers(use_advisory_lock = true) Tag.all.size.must_equal @iterations # <- any duplicated rows will NOT make me happy. TagAudit.all.size.must_equal @iterations # <- any duplicated rows will NOT make me happy. Label.all.size.must_equal @iterations # <- any duplicated rows will NOT make me happy. @@ -61,37 +76,32 @@ def run_workers(with_advisory_lock) it "returns false if the lock wasn't acquirable" do t1_acquired_lock = false t1_return_value = nil + lock_name = "testing 1,2,3" + t1 = Thread.new do - ActiveRecord::Base.connection.reconnect! - t1_return_value = Label.with_advisory_lock("testing 1,2,3") do - t1_acquired_lock = true - sleep(0.3) - "boom" + ActiveRecord::Base.connection_pool.with_connection do + t1_return_value = Label.with_advisory_lock(lock_name) do + t1_acquired_lock = true + sleep(0.5) + 't1 finished' + end end end - # Make sure the lock is acquired: sleep(0.1) - - # Now try to acquire the lock impatiently: - t2_acquired_lock = false - t2_return_value = nil - t2 = Thread.new do - ActiveRecord::Base.connection.reconnect! - t2_return_value = Label.with_advisory_lock("testing 1,2,3", 0.1) do - t2_acquired_lock = true - "not expected" - end + ActiveRecord::Base.connection.reconnect! + Label.with_advisory_lock(lock_name, 0) do + fail "lock should not be acquirable at this point" end - # Wait for them to finish: t1.join - t2.join - - t1_acquired_lock.must_be_true - t1_return_value.must_equal "boom" - - t2_acquired_lock.must_be_false - t2_return_value.must_be_false + t1_return_value.must_equal 't1 finished' + ActiveRecord::Base.connection.reconnect! + # We should now be able to acquire the lock immediately: + reacquired = false + Label.with_advisory_lock(lock_name, 0) do + reacquired = true + end.must_be_true + reacquired.must_be_true end -end unless parallelism_is_broken +end diff --git a/test/simple_parallel_test.rb b/test/simple_parallel_test.rb deleted file mode 100644 index c4e7851..0000000 --- a/test/simple_parallel_test.rb +++ /dev/null @@ -1,37 +0,0 @@ -require 'minitest_helper' - -describe "prevents threads from accessing a resource concurrently" do - def assert_correct_parallel_behavior(lock_name) - times = ActiveSupport::OrderedHash.new - ActiveRecord::Base.connection_pool.disconnect! - t1 = Thread.new do - ActiveRecord::Base.connection.reconnect! - ActiveRecord::Base.with_advisory_lock(lock_name) do - times[:t1_acquire] = Time.now - sleep 0.5 - end - times[:t1_release] = Time.now - end - sleep 0.1 - t2 = Thread.new do - ActiveRecord::Base.connection.reconnect! - ActiveRecord::Base.with_advisory_lock(lock_name) do - times[:t2_acquire] = Time.now - sleep 1 - end - times[:t2_release] = Time.now - end - t1.join - t2.join - times.keys.must_equal [:t1_acquire, :t1_release, :t2_acquire, :t2_release] - times[:t2_acquire].must_be :>, times[:t1_release] - end - - it "with a string lock name" do - assert_correct_parallel_behavior("example lock name") - end - - it "with a numeric lock name" do - assert_correct_parallel_behavior(1234) - end -end