Skip to content

Commit

Permalink
utube: fix slow take on busy utubes
Browse files Browse the repository at this point in the history
If some of the utube for tasks at the top of the queue were busy
most of the time, `take` would slow down for every other task.
This problem is fixed by creating a new space `space_ready`. It
contains first task with `READY` status from each utube.

This solution shows great results for the stated problem, with the cost
of slowing the `put` method (it is ~3 times slower). Thus, this workaround is
disabled by default. To enable it, user should set the `v2 = true` as an
option while creating the tube. As example:
```lua
local test_queue = queue.create_tube('test_queue', 'utube',
        {temporary = true, v2 = true})
```

Part of #228
  • Loading branch information
DerekBum committed May 9, 2024
1 parent aa7c092 commit 749eacf
Show file tree
Hide file tree
Showing 6 changed files with 506 additions and 119 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

### Added
- `v2` boolean option for creating a `utube` tube (#228). It enables the
workaround for slow takes while working with busy tubes.

### Fixed

- Stuck in `INIT` state if an instance failed to enter the `running` mode
in time (#226). This fix works only for Tarantool versions >= 2.10.0.
- Slow takes on busy `utube` tubes (#228). The workaround could be enabled by
passing the `v2 = true` option while creating the tube.

## [1.3.3] - 2023-09-13

Expand Down
6 changes: 5 additions & 1 deletion queue/abstract.lua
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,11 @@ function tube.drop(self)

local space_name = tube[3]

box.space[space_name]:drop()
if self.raw.drop ~= nil then
self.raw:drop()
else
box.space[space_name]:drop()
end
box.space._queue:delete{tube_name}
-- drop queue
queue.tube[tube_name] = nil
Expand Down
217 changes: 199 additions & 18 deletions queue/abstract/driver/utube.lua
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,37 @@ function tube.create_space(space_name, opts)
type = 'tree',
parts = {2, str_type(), 3, str_type(), 1, num_type()}
})
space.v2 = opts.v2
return space
end

-- start tube on space
function tube.new(space, on_task_change)
validate_space(space)

local space_ready_name = space.name .. "_utube_ready"
local space_ready = box.space[space_ready_name]
if space.v2 and not space_ready then
-- Create a space for first ready tasks from each utube.
space_ready = box.schema.create_space(space_ready_name, space_opts)
space_ready:create_index('task_id', {
type = 'tree',
parts = {1, num_type()},
unique = true
})
space_ready:create_index('utube', {
type = 'tree',
parts = {2, str_type()},
unique = true
})
end

on_task_change = on_task_change or (function() end)
local self = setmetatable({
space = space,
space_ready = space_ready,
on_task_change = on_task_change,
v2 = space.v2 or false,
}, { __index = method })
return self
end
Expand All @@ -73,37 +93,92 @@ function method.normalize_task(self, task)
return task and task:transform(3, 1)
end

local function put_ready(self, utube)
local taken = self.space.index.utube:min{state.TAKEN, utube}
if taken == nil or taken[2] ~= state.TAKEN then
local next_task = self.space.index.utube:min{state.READY, utube}
if next_task == nil or next_task[2] ~= state.READY then
return
end
pcall(self.space_ready.insert, self.space_ready, {next_task[1], utube})
end
end

-- put task in space
function method.put(self, data, opts)
local max

-- Taking the maximum of the index is an implicit transactions, so it is
-- always done with 'read-confirmed' mvcc isolation level.
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
-- It is hapenning because 'max' for several puts in parallel will be the same since
-- read confirmed isolation level makes visible all transactions that finished the commit.
-- To fix it we wrap it with box.begin/commit and set right isolation level.
-- Current fix does not resolve that bug in situations when we already are in transaction
-- since it will open nested transactions.
-- See https://github.com/tarantool/queue/issues/207
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/

if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
local commit_requirements = box.cfg.memtx_use_mvcc_engine and
(not box.is_in_txn())
if commit_requirements then
box.begin({txn_isolation = 'read-committed'})
max = self.space.index.task_id:max()
box.commit()
else
max = self.space.index.task_id:max()
end

local max

max = self.space.index.task_id:max()

local id = max and max[1] + 1 or 0
local task = self.space:insert{id, state.READY, tostring(opts.utube), data}
if self.v2 then
put_ready(self, task[3])
end

if commit_requirements then
box.commit()
end

self.on_task_change(task, 'put')
return task
end

local function take_ready(self)
local commit_requirements = box.cfg.memtx_use_mvcc_engine and
(not box.is_in_txn())

while true do
if commit_requirements then
box.begin({txn_isolation = 'read-committed'})
end

local task_ready = self.space_ready.index.task_id:min()
if task_ready == nil then
if commit_requirements then
box.commit()
end
return
end

local id = task_ready[1]
local task = self.space:get(id)

if task[2] == state.READY then
local taken = self.space.index.utube:min{state.TAKEN, task[3]}

if taken == nil or taken[2] ~= state.TAKEN then
task = self.space:update(id, { { '=', 2, state.TAKEN } })

self.space_ready:delete(id)

if commit_requirements then
box.commit()
end

self.on_task_change(task, 'take')
return task
end
end

if commit_requirements then
box.commit()
end
end
end

-- take task
function method.take(self)
if self.v2 then
return take_ready(self)
end

for s, task in self.space.index.status:pairs(state.READY,
{ iterator = 'GE' }) do
if task[2] ~= state.READY then
Expand Down Expand Up @@ -141,49 +216,131 @@ function method.touch(self, id, ttr)
error('utube queue does not support touch')
end

local function delete_ready(self, id, utube)
self.space_ready:delete(id)
put_ready(self, utube)
end

-- delete task
function method.delete(self, id)
local commit_requirements = box.cfg.memtx_use_mvcc_engine and
(not box.is_in_txn())
if commit_requirements then
box.begin({txn_isolation = 'read-committed'})
end

local task = self.space:get(id)
self.space:delete(id)
if task ~= nil then
if self.v2 then
if task[2] == state.TAKEN then
put_ready(self, task[3])
elseif task[2] == state.READY then
delete_ready(self, id, task[3])
end
end

task = task:transform(2, 1, state.DONE)

local neighbour = self.space.index.utube:min{state.READY, task[3]}

if commit_requirements then
box.commit()
end

self.on_task_change(task, 'delete')
if neighbour then
self.on_task_change(neighbour)
end
return task
end

if commit_requirements then
box.commit()
end
return task
end

-- release task
function method.release(self, id, opts)
local commit_requirements = box.cfg.memtx_use_mvcc_engine and
(not box.is_in_txn())
if commit_requirements then
box.begin({txn_isolation = 'read-committed'})
end

local task = self.space:update(id, {{ '=', 2, state.READY }})
if task ~= nil then
if self.v2 then
-- We guarantee that release is called only on TAKEN tasks.
self.space_ready:insert{id, task[3]}
end

if commit_requirements then
box.commit()
end

self.on_task_change(task, 'release')
return task
end

if commit_requirements then
box.commit()
end
return task
end

-- bury task
function method.bury(self, id)
local commit_requirements = box.cfg.memtx_use_mvcc_engine and
(not box.is_in_txn())
if commit_requirements then
box.begin({txn_isolation = 'read-committed'})
end

local current_task = self.space:get{id}
local task = self.space:update(id, {{ '=', 2, state.BURIED }})
if task ~= nil then
if self.v2 then
local status = current_task[2]
local ready_task = self.space_ready:get{task[1]}
if ready_task ~= nil then
delete_ready(self, id, task[3])
elseif status == state.TAKEN then
put_ready(self, task[3])
end
end

local neighbour = self.space.index.utube:min{state.READY, task[3]}

if commit_requirements then
box.commit()
end

self.on_task_change(task, 'bury')
if neighbour and neighbour[i_status] == state.READY then
self.on_task_change(neighbour)
end
else
if commit_requirements then
box.commit()
end

self.on_task_change(task, 'bury')
end
return task
end

-- unbury several tasks
function method.kick(self, count)
local commit_requirements = box.cfg.memtx_use_mvcc_engine and
(not box.is_in_txn())

for i = 1, count do
if commit_requirements then
box.begin({txn_isolation = 'read-committed'})
end

local task = self.space.index.status:min{ state.BURIED }
if task == nil then
return i - 1
Expand All @@ -193,6 +350,19 @@ function method.kick(self, count)
end

task = self.space:update(task[1], {{ '=', 2, state.READY }})
if self.v2 then
local prev_task = self.space_ready.index.utube:get{task[3]}
if prev_task ~= nil then
delete_ready(self, prev_task[1], task[3])
else
put_ready(self, task[3])
end
end

if commit_requirements then
box.commit()
end

self.on_task_change(task, 'kick')
end
return count
Expand All @@ -210,6 +380,9 @@ end

function method.truncate(self)
self.space:truncate()
if self.v2 then
self.space_ready:truncate()
end
end

-- This driver has no background activity.
Expand All @@ -222,4 +395,12 @@ function method.stop()
return
end

function method.drop(self)
self:stop()
box.space[self.space.name]:drop()
if self.v2 then
box.space[self.space_ready.name]:drop()
end
end

return tube
Loading

0 comments on commit 749eacf

Please sign in to comment.