Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust reap interval based on timeouts #744

Merged
merged 1 commit into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions src/zyre.c
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,64 @@ zyre_test (bool verbose)
zstr_free (&command);
zmsg_destroy (&msg);

// First node should also receive ENTER and JOINs from second node
msg = zyre_recv (node1);
assert (msg);
command = zmsg_popstr (msg);
assert (streq (command, "ENTER"));
zstr_free (&command);
assert (zmsg_size (msg) == 4);
peerid = zmsg_popstr (msg);
name = zmsg_popstr (msg);
assert (streq (name, "node2"));
zstr_free (&peerid);
zstr_free (&name);
zmsg_destroy (&msg);

msg = zyre_recv (node1);
assert (msg);
command = zmsg_popstr (msg);
assert (streq (command, "JOIN"));
zstr_free (&command);
assert (zmsg_size (msg) == 3);
zmsg_destroy (&msg);

msg = zyre_recv (node1);
assert (msg);
command = zmsg_popstr (msg);
assert (streq (command, "JOIN"));
zstr_free (&command);
assert (zmsg_size (msg) == 3);
zmsg_destroy (&msg);

// Test evasive timeout
const int evasive_test_interval = 100;
zyre_set_evasive_timeout (node1, evasive_test_interval);
// Refresh peers to apply new timeouts immediately
zyre_shouts (node1, "GLOBAL", "Hello again");
zyre_shouts (node2, "GLOBAL", "Hello again");
msg = zyre_recv (node1);
assert (msg);
command = zmsg_popstr (msg);
assert (streq (command, "SHOUT"));
zstr_free (&command);
zmsg_destroy (&msg);
msg = zyre_recv (node2);
assert (msg);
command = zmsg_popstr (msg);
assert (streq (command, "SHOUT"));
zstr_free (&command);
zmsg_destroy (&msg);

int64_t recv_start = zclock_mono ();
msg = zyre_recv (node1);
assert ((zclock_mono () - recv_start) < (evasive_test_interval + 100));
assert (msg);
command = zmsg_popstr (msg);
assert (streq (command, "EVASIVE"));
zstr_free (&command);
zmsg_destroy (&msg);

zyre_stop (node2);

msg = zyre_recv (node2);
Expand Down
27 changes: 18 additions & 9 deletions src/zyre_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ s_string_compare (void *item1, void *item2)
return strcmp (str1, str2);
}

static int64_t
s_reap_interval (zyre_node_t *self)
{
uint64_t interval = self->evasive_timeout;
if (self->expired_timeout < interval)
interval = self->expired_timeout;
if (interval > REAP_INTERVAL)
interval = REAP_INTERVAL;
return interval;
}

// --------------------------------------------------------------------------
// Constructor

Expand Down Expand Up @@ -1547,15 +1558,15 @@ zyre_node_ping_peer (const char *key, void *item, void *argument)
zstr_sendm (self->outbox, "EVASIVE");
zstr_sendm (self->outbox, zyre_peer_identity (peer));
zstr_send (self->outbox, zyre_peer_name (peer));
if (zclock_mono () >= zyre_peer_evasive_at (peer) + REAP_INTERVAL) {
if (zclock_mono () >= zyre_peer_evasive_at (peer) + s_reap_interval (self)) {
// Inform the calling application this peer is being silent
// despite having tried to ping it. Something is wrong with
// the connection to this peer (or with the network).
// NB: this is an improvement of the EVASIVE event which triggers
// before getting ping result and thus has poor meaning.
if (self->verbose)
zsys_info ("(%s) peer '%s' has not answered ping after %d milliseconds (silent)",
self->name, zyre_peer_name(peer), REAP_INTERVAL);
self->name, zyre_peer_name(peer), s_reap_interval (self));
zstr_sendm (self->outbox, "SILENT");
zstr_sendm (self->outbox, zyre_peer_identity (peer));
zstr_send (self->outbox, zyre_peer_name (peer));
Expand All @@ -1581,7 +1592,7 @@ zyre_node_actor (zsock_t *pipe, void *args)
zsock_signal (self->pipe, 0);

// Loop until the agent is terminated one way or another
int64_t reap_at = zclock_mono () + REAP_INTERVAL;
int64_t last_reaped_at = zclock_mono ();
while (!self->terminated) {

// Start beacon as soon as we can
Expand Down Expand Up @@ -1635,10 +1646,8 @@ zyre_node_actor (zsock_t *pipe, void *args)
zstr_free(&hostname);
}

int timeout = (int) (reap_at - zclock_mono ());
if (timeout > REAP_INTERVAL)
timeout = REAP_INTERVAL;
else
// If nothing else happens, wait until the next reap
int timeout = (int) ((last_reaped_at + s_reap_interval (self)) - zclock_mono ());
if (timeout < 0)
timeout = 0;

Expand All @@ -1661,9 +1670,9 @@ zyre_node_actor (zsock_t *pipe, void *args)
break; // Interrupted, check before expired
else
if (zpoller_expired (self->poller)) {
if (zclock_mono () >= reap_at) {
if (zclock_mono () >= (last_reaped_at + s_reap_interval (self))) {
void *item;
reap_at = zclock_mono () + REAP_INTERVAL;
last_reaped_at = zclock_mono ();
// Ping all peers and reap any expired ones
for (item = zhash_first (self->peers); item != NULL;
item = zhash_next (self->peers))
Expand Down
Loading