Skip to content

Commit

Permalink
wip: stream: init message
Browse files Browse the repository at this point in the history
  • Loading branch information
shramov committed Dec 8, 2023
1 parent c524dd1 commit 098a965
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 4 deletions.
39 changes: 35 additions & 4 deletions src/channel/stream-server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ int StreamServer::_init(const Channel::Url &url, tll::Channel *master)

_autoseq.enable = reader.getT("autoseq", false);

_init_message = reader.getT("init-message", std::string());
_init_seq = reader.getT<unsigned long>("init-seq", 0);
_init_block = reader.getT("init-block", std::string(url.sub("blocks") ? "default" : ""));

if (!reader)
return _log.fail(EINVAL, "Invalid url: {}", reader.error());

Expand Down Expand Up @@ -142,10 +146,32 @@ int StreamServer::_open(const ConstConfig &url)
if (!last)
return _log.fail(EINVAL, "Storage has invalid 'seq' config value: {}", last.error());
_seq = *last;
_autoseq.reset(_seq);
config_info().set_ptr("seq", &_seq);
_log.info("Last seq in storage: {}", _seq);

const bool empty_storage = _seq == -1;
if (_init_message.size() && empty_storage) {
_log.info("Init empty storage with message {} seq {}", _init_message, _init_seq);
auto scheme = _storage->scheme();
if (!scheme)
return _log.fail(EINVAL, "Can not initialize storage without scheme");
auto message = scheme->lookup(_init_message);
if (!message)
return _log.fail(EINVAL, "Message '{}' not found in scheme", _init_message);
std::vector<char> buf;
buf.resize(message->size);
tll_msg_t msg = {};
msg.msgid = message->msgid;
msg.seq = _init_seq;
msg.data = buf.data();
msg.size = buf.size();
if (auto r = _storage->post(&msg); r)
return _log.fail(EINVAL, "Failed to post initial message {} to storage", _init_message);
_seq = _init_seq;
}

_autoseq.reset(_seq);

if (_blocks) {
if (_blocks->open())
return _log.fail(EINVAL, "Failed to open blocks channel");
Expand All @@ -172,6 +198,11 @@ int StreamServer::_open(const ConstConfig &url)
_child_open = url.copy();
return 0;
}

if (empty_storage && _init_message.size() && _init_block.size()) {
if (!_control_blocks)
return _log.fail(EINVAL, "Blocks storage has no control scheme, can not initialize");
}
}

if (_request->open())
Expand Down Expand Up @@ -527,12 +558,12 @@ int StreamServer::_post(const tll_msg_t * msg, int flags)
msg = _autoseq.update(msg);
if (msg->seq <= _seq)
return _log.fail(EINVAL, "Non monotonic seq: {} < last posted {}", msg->seq, _seq);
if (auto r = _storage->post(msg); r)
return _log.fail(r, "Failed to store message {}", msg->seq);
if (_blocks) {
if (auto r = _blocks->post(msg); r)
return _log.fail(r, "Failed to send Block control message");
return _log.fail(r, "Failed to post message into block storage");
}
if (auto r = _storage->post(msg); r)
return _log.fail(r, "Failed to store message {}", msg->seq);
_seq = msg->seq;
_last_seq_tx(msg->seq);
return _child->post(msg);
Expand Down
4 changes: 4 additions & 0 deletions src/channel/stream-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class StreamServer : public tll::channel::LastSeqTx<StreamServer, tll::channel::

tll::Config _child_open;

std::string _init_message;
long long _init_seq;
std::string _init_block;

public:
static constexpr std::string_view channel_protocol() { return "stream+"; }

Expand Down

0 comments on commit 098a965

Please sign in to comment.