Skip to content

Commit

Permalink
api: fix INIT state stuck
Browse files Browse the repository at this point in the history
Sometimes, instance could enter the queue initialization
while still not running (for example, left in the orphan mode).
This resulted in "lazy start". But Tarantool does not call
`box.cfg {}` after leaving orphan mode, so queue could stuck in the
`INIT` state.

Now we wait in the background for instances, that are not running.
It is similar to lazy init for read-only instances.

Note that this fix works only for Tarantool versions >= 2.10.0.
This is because of used watchers.

Closes #226
  • Loading branch information
DerekBum committed Apr 9, 2024
1 parent 5f2b145 commit d4bf56c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 4 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Fixed

- Stuck in `INIT` state if an instance failed to enter the `running` mode
in time (#226). This fix works only for Tarantool versions >= 2.10.0.

## [1.3.3] - 2023-09-13

### Fixed
Expand Down
33 changes: 29 additions & 4 deletions queue/init.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
local fiber = require('fiber')

local abstract = require('queue.abstract')
local queue_state = require('queue.abstract.queue_state')
local qc = require('queue.compat')
local queue = nil

-- load all core drivers
Expand All @@ -11,6 +14,10 @@ local core_drivers = {
limfifottl = require('queue.abstract.driver.limfifottl')
}

-- since:
-- https://github.com/locker/tarantool/commit/8cf5151cb4f05cee3fd0ea831add2b3187a01fe4
local watchers_supported = qc.check_version({2, 10, 0})

local function register_driver(driver_name, tube_ctr)
if type(tube_ctr.create_space) ~= 'function' or
type(tube_ctr.new) ~= 'function' then
Expand Down Expand Up @@ -62,6 +69,19 @@ local orig_call = nil

local wrapper_impl

local function running_waiter()
fiber.name('queue running waiter')
local wait_cond = fiber.cond()
local w = box.watch('box.status', function(_, new_status)
if new_status.status == 'running' then
wait_cond:signal()
end
end)
wait_cond:wait()
w:unregister()
return wrapper_impl()
end

local function cfg_wrapper(...)
box.cfg = orig_cfg
return wrapper_impl(...)
Expand All @@ -79,10 +99,15 @@ local function wrap_box_cfg()
orig_cfg = box.cfg
box.cfg = cfg_wrapper
elseif type(box.cfg) == 'table' then
-- box.cfg after the first box.cfg call
local cfg_mt = getmetatable(box.cfg)
orig_call = cfg_mt.__call
cfg_mt.__call = cfg_call_wrapper
if watchers_supported and box.info.status ~= 'running' then
-- Wait for the running state and initialize the queue.
fiber.new(running_waiter)
else
-- box.cfg after the first box.cfg call
local cfg_mt = getmetatable(box.cfg)
orig_call = cfg_mt.__call
cfg_mt.__call = cfg_call_wrapper
end
else
error('The box.cfg type is unexpected: ' .. type(box.cfg))
end
Expand Down

0 comments on commit d4bf56c

Please sign in to comment.