Skip to content

Commit

Permalink
stream: Export client config for wildcard address
Browse files Browse the repository at this point in the history
  • Loading branch information
shramov committed Dec 7, 2024
1 parent d6975a4 commit c91ef91
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 4 deletions.
23 changes: 23 additions & 0 deletions python/test/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,29 @@ async def test_export_client(asyncloop, tmp_path):

assert [m.name for m in c.scheme.messages] == ['Test']

@asyncloop_run
async def test_export_client_wildcard(asyncloop, tmp_path):
scheme = 'yamls://[{name: Test, id: 10}]'
server = asyncloop.Channel(f'stream+pub+tcp://*:0;request=tcp://*:0;dump=frame;storage=file:///{tmp_path}/storage.dat;name=server;mode=server', scheme=scheme)
server.open()
server.post(b'xxx', seq=100)

assert [m.name for m in server.scheme.messages] == ['Test']

url = server.config.get_url('client.init').copy()
assert url.proto == 'stream+pub+tcp'

assert [k for k, _ in server.config.sub('client.replace', False).browse('**')] == sorted(['host.init.tll.host.host', 'host.init.request.tll.host.host'])
for k, _ in server.config.browse('client.replace.host.init.**'):
url[k.split('.', 4)[-1]] = '::1'

c = asyncloop.Channel(url, name='client')
c.open(mode='seq', seq='100')
assert await c.recv_state() == c.State.Active
assert (await c.recv()).seq == 100

assert [m.name for m in c.scheme.messages] == ['Test']

@asyncloop_run
@pytest.mark.parametrize("mode,rseq,oseq", [
("request-only", list(range(10, 15)), []),
Expand Down
14 changes: 10 additions & 4 deletions src/channel/stream-server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -300,15 +300,21 @@ int StreamServer::_check_state(tll_state_t s)
_log.info("All sub channels are active");
if (state() == tll::state::Opening) {
auto oclient = _child->config().sub("client");
auto rclient = _request->config().sub("client.init");
if (oclient && oclient->sub("init") && rclient) {
auto rclient = _request->config().sub("client");
if (oclient && oclient->sub("init") && rclient && rclient->sub("init")) {
auto client = oclient->copy();
tll::Channel::Url url = *client.sub("init");
url.proto(std::string("stream+") + url.proto());
url.set("mode", "client");
url.set("request", rclient->copy());
url.set("request", rclient->sub("init")->copy());

for (auto &[prefix, cfg]: rclient->browse("replace.*.*", true)) {
for (auto &[k, _]: cfg.browse("**"))
client.set(fmt::format("{}.request.{}", prefix, k), "");
}

client.set("children.online", *oclient);
client.set("children.request", *_request->config().sub("client"));
client.set("children.request", *rclient);
_config.set("client", client);
}
state(tll::state::Active);
Expand Down

0 comments on commit c91ef91

Please sign in to comment.