Skip to content

Commit

Permalink
stream: Optionally initialize empty storage
Browse files Browse the repository at this point in the history
  • Loading branch information
shramov committed Dec 9, 2023
1 parent dd33563 commit 30e418e
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 4 deletions.
50 changes: 50 additions & 0 deletions python/test/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,56 @@ async def test_block(asyncloop, tmp_path, req, result):
assert m.type == m.Type.Control
assert (m.seq, c.unpack(m).SCHEME.name) == (result[-1], 'Online')

@pytest.mark.parametrize("init_seq,init_block", [
('', ''),
(10, ''),
(10, 'other'),
])
@asyncloop_run
async def test_init_message(asyncloop, tmp_path, init_seq, init_block):
SCHEME = '''yamls://
- name: Initial
id: 10
fields: [{name: i64, type: int64}]
- name: Data
id: 20
fields: [{name: i32, type: int32}]
'''
common = f'stream+pub+tcp://{tmp_path}/stream.sock;request=tcp://{tmp_path}/request.sock;dump=frame;storage.dump=frame;blocks.dump=frame'
s = asyncloop.Channel(f'{common};storage.url=file://{tmp_path}/storage.dat;name=server;mode=server;blocks.url=blocks://{tmp_path}/blocks.yaml;init-message=Initial;init-seq={init_seq};init-block={init_block}', scheme=SCHEME)
c = asyncloop.Channel(f'{common};name=client;mode=client;peer=test', scheme=SCHEME)

s.open()
assert s.state == s.State.Active # No need to wait

assert yaml.safe_load(open(tmp_path / 'blocks.yaml')) == [{'seq': init_seq or 0, 'type': init_block or 'default'}]

s.close()
s.open()

assert s.state == s.State.Active # No need to wait

assert yaml.safe_load(open(tmp_path / 'blocks.yaml')) == [{'seq': init_seq or 0, 'type': init_block or 'default'}]

c.open(f'mode=seq;seq={init_seq or 0}')

assert (await c.recv_state()) == c.State.Active

m = await c.recv()
assert (m.type, m.seq, m.msgid) == (m.Type.Data, init_seq or 0, 10)
assert c.unpack(m).as_dict() == {'i64': 0}

m = await c.recv()
assert m.type == m.Type.Control
assert (m.seq, c.unpack(m).SCHEME.name) == (init_seq or 0, 'Online')

c.close()
c.open(f'mode=block;block=0;block-type={init_block or "default"}')

m = await c.recv()
assert m.type == m.Type.Control
assert (m.seq, c.unpack(m).SCHEME.name) == (init_seq or 0, 'Online')

@asyncloop_run
async def test_autoseq(asyncloop, tmp_path):
common = f'stream+pub+tcp://{tmp_path}/stream.sock;request=tcp://{tmp_path}/request.sock;dump=frame;pub.dump=frame;request.dump=frame;storage.dump=frame'
Expand Down
73 changes: 69 additions & 4 deletions src/channel/stream-server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ int StreamServer::_init(const Channel::Url &url, tll::Channel *master)

_autoseq.enable = reader.getT("autoseq", false);

_init_message = reader.getT("init-message", std::string());
_init_seq = reader.getT<unsigned long>("init-seq", 0);
_init_block = reader.getT("init-block", std::string(url.sub("blocks") ? "default" : ""));

if (!reader)
return _log.fail(EINVAL, "Invalid url: {}", reader.error());

Expand Down Expand Up @@ -150,15 +154,76 @@ int StreamServer::_open(const ConstConfig &url)
if (!last)
return _log.fail(EINVAL, "Storage has invalid 'seq' config value: {}", last.error());
_seq = *last;
_autoseq.reset(_seq);
config_info().set_ptr("seq", &_seq);
_log.info("Last seq in storage: {}", _seq);

const bool empty_storage = _seq == -1;
tll_msg_t initial_message = {};
std::vector<char> initial_buffer;

if (_init_message.size() && empty_storage) {
_log.info("Init empty storage with message {} seq {}", _init_message, _init_seq);
auto scheme = _storage->scheme();
if (!scheme)
return _log.fail(EINVAL, "Can not initialize storage without scheme");
auto message = scheme->lookup(_init_message);
if (!message)
return _log.fail(EINVAL, "Message '{}' not found in scheme", _init_message);
initial_buffer.resize(message->size);
initial_message.msgid = message->msgid;
initial_message.seq = _init_seq;
initial_message.data = initial_buffer.data();
initial_message.size = initial_buffer.size();
if (auto r = _storage->post(&initial_message); r)
return _log.fail(EINVAL, "Failed to post initial message {} to storage", _init_message);
_seq = _init_seq;
}

_autoseq.reset(_seq);

if (_blocks) {
if (_blocks->open())
return _log.fail(EINVAL, "Failed to open blocks channel");
if (_blocks->state() != tll::state::Active)
return _log.fail(EINVAL, "Long opening blocks is not supported");

if (empty_storage && _init_message.size() && _init_block.size()) {
_log.info("Post initial message to blocks storage");
if (_blocks->post(&initial_message))
return _log.fail(EINVAL, "Failed to post initial message to blocks storage");

if (!_control_blocks)
return _log.fail(EINVAL, "Blocks storage has no control scheme, can not initialize");
auto message = _control_blocks->lookup("Block");
if (!message)
return _log.fail(EINVAL, "Blocks storage scheme has no Block message");
std::vector<char> buf;
buf.resize(message->size);
tll::scheme::Field * field = nullptr;
for (field = message->fields; field; field = field->next) {
if (field->name == std::string_view("type"))
break;
}
if (!field)
return _log.fail(EINVAL, "Block message has no 'type' field");
if (field->type == field->Bytes) {
auto view = tll::make_view(buf).view(field->offset);
if (field->size < _init_block.size())
return _log.fail(EINVAL, "Block::type size {} is not enough for init-block '{}'", field->size, _init_block);
memcpy(view.data(), _init_block.data(), _init_block.size());
} else
return _log.fail(EINVAL, "Block::type field is not fixed string: {}", field->type);
tll_msg_t msg = {};
msg.type = TLL_MESSAGE_CONTROL;
msg.msgid = message->msgid;
msg.data = buf.data();
msg.size = buf.size();

_log.info("Post initial block {}", _init_block);
if (_blocks->post(&msg))
return _log.fail(EINVAL, "Failed to post initial block '{}'", _init_block);
}

auto seq = _blocks->config().getT<long long>("info.seq");
if (!seq)
return _log.fail(EINVAL, "Blocks channel last seq invalid: {}", seq.error());
Expand Down Expand Up @@ -535,12 +600,12 @@ int StreamServer::_post(const tll_msg_t * msg, int flags)
msg = _autoseq.update(msg);
if (msg->seq <= _seq)
return _log.fail(EINVAL, "Non monotonic seq: {} < last posted {}", msg->seq, _seq);
if (auto r = _storage->post(msg); r)
return _log.fail(r, "Failed to store message {}", msg->seq);
if (_blocks) {
if (auto r = _blocks->post(msg); r)
return _log.fail(r, "Failed to send Block control message");
return _log.fail(r, "Failed to post message into block storage");
}
if (auto r = _storage->post(msg); r)
return _log.fail(r, "Failed to store message {}", msg->seq);
_seq = msg->seq;
_last_seq_tx(msg->seq);
return _child->post(msg);
Expand Down
4 changes: 4 additions & 0 deletions src/channel/stream-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class StreamServer : public tll::channel::LastSeqTx<StreamServer, tll::channel::

tll::Config _child_open;

std::string _init_message;
long long _init_seq;
std::string _init_block;

public:
static constexpr std::string_view channel_protocol() { return "stream+"; }

Expand Down

0 comments on commit 30e418e

Please sign in to comment.