From 8b149211bda77e6b91edd31145545d52ae70edee Mon Sep 17 00:00:00 2001 From: DerekBum Date: Sun, 28 Apr 2024 22:25:29 +0300 Subject: [PATCH] utube: fix slow take on busy utubes If some of the utube for tasks at the top of the queue were busy most of the time, `take` would slow down for every other task. This problem is fixed by creating a new space `space_ready`. It contains first task with `READY` status from each utube. This solution shows great results for the stated problem, with the cost of slowing the `put` method (it is ~3 times slower). Thus, this workaround is disabled by default. To enable it, user should set the `v2 = true` as an option while creating the tube. As example: ```lua local test_queue = queue.create_tube('test_queue', 'utube', {temporary = true, v2 = true}) ``` Part of #228 --- CHANGELOG.md | 6 ++ queue/abstract/driver/utube.lua | 101 ++++++++++++++++++++++++++++++++ t/030-utube.t | 72 ++++++++++++----------- t/benchmark/busy_utubes.lua | 94 +++++++++++++++++++++++++++++ t/benchmark/many_utubes.lua | 86 +++++++++++++++++++++++++++ 5 files changed, 325 insertions(+), 34 deletions(-) create mode 100644 t/benchmark/busy_utubes.lua create mode 100644 t/benchmark/many_utubes.lua diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c39331a..5e3b79b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,10 +7,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +### Added +- `v2` boolean option for creating a `utube` tube (#228). It enables the + workaround for slow takes while working with busy tubes. + ### Fixed - Stuck in `INIT` state if an instance failed to enter the `running` mode in time (#226). This fix works only for Tarantool versions >= 2.10.0. +- Slow takes on busy `utube` tubes (#228). The workaround could be enabled by + passing the `v2 = true` option while creating the tube. ## [1.3.3] - 2023-09-13 diff --git a/queue/abstract/driver/utube.lua b/queue/abstract/driver/utube.lua index d3b7e006..eac3b2b1 100644 --- a/queue/abstract/driver/utube.lua +++ b/queue/abstract/driver/utube.lua @@ -53,6 +53,7 @@ function tube.create_space(space_name, opts) type = 'tree', parts = {2, str_type(), 3, str_type(), 1, num_type()} }) + space.v2 = opts.v2 return space end @@ -60,10 +61,27 @@ end function tube.new(space, on_task_change) validate_space(space) + local space_ready_name = space.name .. "_utube_ready" + local space_ready = box.space[space_ready_name] + if space.v2 and not space_ready then + -- Create a space for first ready tasks from each utube. + space_ready = box.schema.create_space(space_ready_name, space_opts) + space_ready:create_index('task_id', { + type = 'tree', + parts = {1, num_type()} + }) + space_ready:create_index('utube', { + type = 'tree', + parts = {2, str_type()} + }) + end + on_task_change = on_task_change or (function() end) local self = setmetatable({ space = space, + space_ready = space_ready, on_task_change = on_task_change, + v2 = space.v2 or false, }, { __index = method }) return self end @@ -73,6 +91,20 @@ function method.normalize_task(self, task) return task and task:transform(3, 1) end +local function put_ready_task(self, id, utube) + local added = self.space_ready.index.utube:get{utube} + if added == nil then + self.space_ready:insert{id, utube} + end +end + +local function put_next_ready(self, utube) + local next_task = self.space.index.utube:min{state.READY, utube} + if next_task ~= nil then + put_ready_task(self, next_task[1], utube) + end +end + -- put task in space function method.put(self, data, opts) local max @@ -98,12 +130,62 @@ function method.put(self, data, opts) local id = max and max[1] + 1 or 0 local task = self.space:insert{id, state.READY, tostring(opts.utube), data} + if self.v2 then + put_ready_task(self, id, task[3]) + end self.on_task_change(task, 'put') return task end +local function delete_ready(self, id, utube) + self.space_ready:delete(id) + put_next_ready(self, utube) +end + +local function take_ready(self) + for s, task_ready in self.space_ready:pairs({}, { iterator = 'GE' }) do + local id = task_ready[1] + local commit_requirements = box.cfg.memtx_use_mvcc_engine and + (not box.is_in_txn()) + local task + + if commit_requirements then + box.begin({txn_isolation = 'read-committed'}) + task = self.space:get(id) + box.commit() + else + task = self.space:get(id) + end + + if task[2] == state.READY then + local taken + + if commit_requirements then + box.begin({txn_isolation = 'read-committed'}) + taken = self.space.index.utube:min{state.TAKEN, task[3]} + box.commit() + else + taken = self.space.index.utube:min{state.TAKEN, task[3]} + end + + if taken == nil or taken[2] ~= state.TAKEN then + task = self.space:update(id, { { '=', 2, state.TAKEN } }) + + delete_ready(self, id, task[3]) + + self.on_task_change(task, 'take') + return task + end + end + end +end + -- take task function method.take(self) + if self.v2 then + return take_ready(self) + end + for s, task in self.space.index.status:pairs(state.READY, { iterator = 'GE' }) do if task[2] ~= state.READY then @@ -146,6 +228,10 @@ function method.delete(self, id) local task = self.space:get(id) self.space:delete(id) if task ~= nil then + if self.v2 then + delete_ready(self, id, task[3]) + end + task = task:transform(2, 1, state.DONE) local neighbour = self.space.index.utube:min{state.READY, task[3]} @@ -157,10 +243,22 @@ function method.delete(self, id) return task end +local function on_ready_status_change(self, utube) + local prev_task = self.space_ready.index.utube:get{utube} + if prev_task ~= nil then + delete_ready(self, prev_task[1], utube) + else + put_next_ready(self, utube) + end +end + -- release task function method.release(self, id, opts) local task = self.space:update(id, {{ '=', 2, state.READY }}) if task ~= nil then + if self.v2 then + on_ready_status_change(self, task[3]) + end self.on_task_change(task, 'release') end return task @@ -193,6 +291,9 @@ function method.kick(self, count) end task = self.space:update(task[1], {{ '=', 2, state.READY }}) + if self.v2 then + on_ready_status_change(self, task[3]) + end self.on_task_change(task, 'kick') end return count diff --git a/t/030-utube.t b/t/030-utube.t index ffd92136..6c56f65f 100755 --- a/t/030-utube.t +++ b/t/030-utube.t @@ -18,45 +18,49 @@ test:ok(queue, 'queue is loaded') local tube = queue.create_tube('test', 'utube', { engine = engine }) local tube2 = queue.create_tube('test_stat', 'utube', { engine = engine }) +local tubev2 = queue.create_tube('test_stat_v2', 'utube', + { engine = engine, v2 = true }) test:ok(tube, 'test tube created') test:is(tube.name, 'test', 'tube.name') test:is(tube.type, 'utube', 'tube.type') test:test('Utube statistics', function(test) - test:plan(13) - tube2:put('stat_0') - tube2:put('stat_1') - tube2:put('stat_2') - tube2:put('stat_3') - tube2:put('stat_4') - tube2:delete(4) - tube2:take(.001) - tube2:release(0) - tube2:take(.001) - tube2:ack(0) - tube2:bury(1) - tube2:bury(2) - tube2:kick(1) - tube2:take(.001) - - local stats = queue.statistics('test_stat') - - -- check tasks statistics - test:is(stats.tasks.taken, 1, 'tasks.taken') - test:is(stats.tasks.buried, 1, 'tasks.buried') - test:is(stats.tasks.ready, 1, 'tasks.ready') - test:is(stats.tasks.done, 2, 'tasks.done') - test:is(stats.tasks.delayed, 0, 'tasks.delayed') - test:is(stats.tasks.total, 3, 'tasks.total') - - -- check function call statistics - test:is(stats.calls.delete, 1, 'calls.delete') - test:is(stats.calls.ack, 1, 'calls.ack') - test:is(stats.calls.take, 3, 'calls.take') - test:is(stats.calls.kick, 1, 'calls.kick') - test:is(stats.calls.bury, 2, 'calls.bury') - test:is(stats.calls.put, 5, 'calls.put') - test:is(stats.calls.release, 1, 'calls.release') + test:plan(26) + for _, tube_stat in ipairs({tube2, tubev2}) do + tube_stat:put('stat_0') + tube_stat:put('stat_1') + tube_stat:put('stat_2') + tube_stat:put('stat_3') + tube_stat:put('stat_4') + tube_stat:delete(4) + tube_stat:take(.001) + tube_stat:release(0) + tube_stat:take(.001) + tube_stat:ack(0) + tube_stat:bury(1) + tube_stat:bury(2) + tube_stat:kick(1) + tube_stat:take(.001) + + local stats = queue.statistics('test_stat') + + -- check tasks statistics + test:is(stats.tasks.taken, 1, 'tasks.taken') + test:is(stats.tasks.buried, 1, 'tasks.buried') + test:is(stats.tasks.ready, 1, 'tasks.ready') + test:is(stats.tasks.done, 2, 'tasks.done') + test:is(stats.tasks.delayed, 0, 'tasks.delayed') + test:is(stats.tasks.total, 3, 'tasks.total') + + -- check function call statistics + test:is(stats.calls.delete, 1, 'calls.delete') + test:is(stats.calls.ack, 1, 'calls.ack') + test:is(stats.calls.take, 3, 'calls.take') + test:is(stats.calls.kick, 1, 'calls.kick') + test:is(stats.calls.bury, 2, 'calls.bury') + test:is(stats.calls.put, 5, 'calls.put') + test:is(stats.calls.release, 1, 'calls.release') + end end) diff --git a/t/benchmark/busy_utubes.lua b/t/benchmark/busy_utubes.lua new file mode 100644 index 00000000..730367a0 --- /dev/null +++ b/t/benchmark/busy_utubes.lua @@ -0,0 +1,94 @@ +#!/usr/bin/env tarantool + +local clock = require('clock') +local os = require('os') +local fiber = require('fiber') +local queue = require('queue') + +-- Set the number of consumers. +local consumers_count = 10 +-- Set the number of tasks processed by one consumer per iteration. +local batch_size = 150000 + +local barrier = fiber.cond() +local wait_count = 0 + +box.cfg() + +local test_queue = queue.create_tube('test_queue', 'utube', + {temporary = true, v2 = true}) + +local function prepare_tasks() + local test_data = 'test data' + + for i = 1, consumers_count do + for _ = 1, batch_size do + test_queue:put(test_data, {utube = tostring(i)}) + end + end +end + +local function prepare_consumers() + local consumers = {} + + -- Make half the utubes busy. + for _ = 1, consumers_count / 2 do + test_queue:take() + end + + for i = 1, consumers_count / 2 do + consumers[i] = fiber.create(function() + wait_count = wait_count + 1 + -- Wait for all consumers to start. + barrier:wait() + + -- Ack the tasks. + for _ = 1, batch_size do + local task = test_queue:take() + test_queue:ack(task[1]) + end + + wait_count = wait_count + 1 + end) + end + + return consumers +end + +local function multi_consumer_bench() + --- Wait for all consumer fibers. + local wait_all = function() + while (wait_count ~= consumers_count / 2) do + fiber.yield() + end + wait_count = 0 + end + + fiber.set_max_slice(100) + + prepare_tasks() + + -- Wait for all consumers to start. + local consumers = prepare_consumers() + wait_all() + + -- Start timing of task confirmation. + local start_ack_time = clock.proc64() + barrier:broadcast() + -- Wait for all tasks to be acked. + wait_all() + -- Complete the timing of task confirmation. + local complete_time = clock.proc64() + + -- Print the result in milliseconds. + print(string.format("Time it takes to confirm the tasks: %i", + tonumber((complete_time - start_ack_time) / 10^6))) +end + +-- Start benchmark. +multi_consumer_bench() + +-- Cleanup. +test_queue:drop() + +os.exit(0) diff --git a/t/benchmark/many_utubes.lua b/t/benchmark/many_utubes.lua new file mode 100644 index 00000000..dbf78c69 --- /dev/null +++ b/t/benchmark/many_utubes.lua @@ -0,0 +1,86 @@ +#!/usr/bin/env tarantool + +local clock = require('clock') +local os = require('os') +local fiber = require('fiber') +local queue = require('queue') + +-- Set the number of consumers. +local consumers_count = 30000 + +local barrier = fiber.cond() +local wait_count = 0 + +box.cfg() + +local test_queue = queue.create_tube('test_queue', 'utube', + {temporary = true, v2 = true}) + +local function prepare_tasks() + local test_data = 'test data' + + for i = 1, consumers_count do + test_queue:put(test_data, {utube = tostring(i)}) + end +end + +local function prepare_consumers() + local consumers = {} + + for i = 1, consumers_count do + consumers[i] = fiber.create(function() + wait_count = wait_count + 1 + -- Wait for all consumers to start. + barrier:wait() + + -- Ack the task. + local task = test_queue:take() + test_queue:ack(task[1]) + + wait_count = wait_count + 1 + end) + end + + return consumers +end + +local function multi_consumer_bench() + --- Wait for all consumer fibers. + local wait_all = function() + while (wait_count ~= consumers_count) do + fiber.yield() + end + wait_count = 0 + end + + fiber.set_max_slice(100) + + -- Wait for all consumers to start. + local consumers = prepare_consumers() + wait_all() + + -- Start timing creation of tasks. + local start_put_time = clock.proc64() + prepare_tasks() + -- Start timing of task confirmation. + local start_ack_time = clock.proc64() + barrier:broadcast() + -- Wait for all tasks to be acked. + wait_all() + -- Complete the timing of task confirmation. + local complete_time = clock.proc64() + + -- Print results in milliseconds. + print(string.format("Time it takes to fill the queue: %i", + tonumber((start_ack_time - start_put_time) / 10^6))) + print(string.format("Time it takes to confirm the tasks: %i", + tonumber((complete_time - start_ack_time) / 10^6))) +end + +-- Start benchmark. +multi_consumer_bench() + +-- Cleanup. +test_queue:drop() + +os.exit(0)