Skip to content

Commit

Permalink
stream: Check that block end can not be in the future
Browse files Browse the repository at this point in the history
  • Loading branch information
shramov committed Dec 7, 2024
1 parent 0413aa5 commit 5aed4e7
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 1 deletion.
41 changes: 41 additions & 0 deletions python/test/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,3 +873,44 @@ async def test_client(asyncloop, tmp_path):
assert (m.type, m.seq) == (m.Type.Data, 10)
m = await c.recv()
assert (m.type, m.seq) == (m.Type.Control, 10)

@asyncloop_run
async def test_block_in_future(asyncloop, tmp_path):
common = f'stream+pub+tcp://{tmp_path}/stream.sock;request=tcp://{tmp_path}/request.sock;dump=frame;storage.dump=frame'
s = asyncloop.Channel(f'{common};storage=file://{tmp_path}/storage.dat;name=server;mode=server;blocks=blocks://{tmp_path}/blocks.yaml')
c = asyncloop.Channel(f'{common};name=client;mode=client;peer=test')

assert [x.name for x in s.children] == ['server/stream', 'server/request', 'server/storage', 'server/blocks']
blocks = s.children[-1]

s.open()
assert s.state == s.State.Active # No need to wait

for i in range(10):
s.post(b'xxx', msgid=10, seq=i)
s.post({'type': 'default'}, name='Block', type=s.Type.Control)

assert yaml.safe_load(open(tmp_path / 'blocks.yaml')) == [{'seq': 9, 'type':'default'}]
s.close()
s.open()

c.open(mode='block', block='0')

m = await c.recv()
assert m.seq == 9

c.close()

blocks.post(b'', seq=15)
blocks.post({'type': 'default'}, name='Block', type=s.Type.Control)

assert blocks.config['info.seq'] == '15'

c.open(mode='block', block='0')

assert (await c.recv_state()) == c.State.Error

c.close()
s.close()

with pytest.raises(TLLError): s.open()
7 changes: 6 additions & 1 deletion src/channel/stream-server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,9 @@ int StreamServer::_open(const ConstConfig &url)
auto seq = _blocks->config().getT<long long>("info.seq");
if (!seq)
return _log.fail(EINVAL, "Blocks channel last seq invalid: {}", seq.error());
if (*seq != _seq) {
if (*seq > _seq) {
return _log.fail(EINVAL, "Blocks channel last seq in the future: {}, last storage seq {}", *seq, _seq);
} else if (*seq < _seq) {
_log.info("Blocks seq is behind storage seq: {} < {}, feed from storage", *seq, _seq);
auto url = _storage_url.copy();
url.set("autoclose", "yes");
Expand Down Expand Up @@ -503,6 +505,9 @@ tll::result_t<int> StreamServer::Client::init(const tll_msg_t *msg)
_log.info("Translated block type '{}' number {} to seq {}, storage seq {}", block, req.get_seq(), seq, block_end);
}

if (block_end != -1 && block_end > parent->_seq + 1)
return error(fmt::format("Error in storage: block end {} in the future, last seq {}", block_end - 1, parent->_seq));

storage = parent->context().channel(parent->_storage_url, parent->_storage.get());
if (!storage)
return error("Failed to create storage channel");
Expand Down

0 comments on commit 5aed4e7

Please sign in to comment.