Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slow subscriber generates an assert when ZMTP heartbeat is enabled #4767

Open
ottawaguy81 opened this issue Jan 15, 2025 · 0 comments
Open

Comments

@ottawaguy81
Copy link

Issue description

Slow subscriber application generates an ASSERT when the subscriber RX queue is full and HB is enabled. This is the same issue as #3937
My observation shows, right after the ZMTP heartbeat triggers a reset of the socket, the subsequent socket read triggers an assert.

Environment

  • libzmq version : 4.3.4
  • OS: Linux

Minimal test code / Steps to reproduce the issue

I have two programs, pub.c and sub.c program that. The SNDHW and RCVHW are set to one. I send 3 publication and wait(15sec) for the HB to trigger a socket reset.
Here are the steps to reproduce the problem.
gcc -g pub.c -o pub -lczmq -lzmq
gcc -g sub.c -o sub -lczmq -lzmq

  1. Run pub program. (./pub)
  2. Run the sub (./sub)
  3. Press any key three times in the pub program console. That will send three publications.
  4. Wait 15 sec. // The HB will cause the TCP socket reset.
  5. Press any key in the ./sub window. This will trigger a zstr_recv() and cause an abort.
  6. If there is no abort, repeat from step 3 .

****** *********************File *******sub.c
#include <czmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>

char serverIp[100]="127.0.0.1";

void *sub_thread(void *arg)
{
char server[100];
int counter = 0;
int rcvhwm = 2;
int heartbeat_timeout = 4000;
int heartbeat_ivl = 5000;

zsock_t *client = zsock_new (ZMQ_SUB);
char *message;
printf (" Set option ZMQ_RCVHWM %d \n", zmq_setsockopt (zsock_resolve(client), ZMQ_RCVHWM, &rcvhwm, sizeof (int)));
printf (" Set option ZMQ_HEARTBEAT_TIMEOUT %d \n", zmq_setsockopt (zsock_resolve(client), ZMQ_HEARTBEAT_TIMEOUT, &heartbeat_timeout, sizeof (int)));
printf (" Set option ZMQ_HEARTBEAT_IVL %d \n", zmq_setsockopt (zsock_resolve(client), ZMQ_HEARTBEAT_IVL, &heartbeat_ivl, sizeof (int)));

sprintf (server, "tcp://%s:5556",serverIp);
zsock_set_subscribe (client, "");
zsock_connect (client, server);
printf ("Press any key to receive publication\n");
getchar();
while (1)
{
message = zstr_recv (client);
printf ("%s \n", message );
getchar();
}
zsock_destroy (&client);

}

int main (int argc, char *argv[])
{
pthread_t thread1,thread2;
if (argc==2)
strcpy(serverIp,argv[1]);
pthread_create(&thread2, NULL, sub_thread, NULL);
pthread_join(thread2, NULL);
}

***************File ***************** pub.c *******
#include <czmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>

void* pub_thread (void *arg)
{
zsock_t *publisher = zsock_new (ZMQ_PUB);
int sndhwm = 2;
int noDropFlag = 1;
int sendtmo = 0;
int timeout = 10000;
int counter = 0;
char pub_data[100];
int rtn = 0;
printf (" Set option ZMQ_SNDHWM %d \n", zmq_setsockopt (zsock_resolve(publisher), ZMQ_SNDHWM, &sndhwm, sizeof (int)));
printf (" Set option ZMQ_XPUB_NODROP %d \n", zmq_setsockopt (zsock_resolve(publisher), ZMQ_XPUB_NODROP, &noDropFlag, sizeof (int)));
printf (" Set option ZMQ_SNDTIMEO %d \n", zmq_setsockopt (zsock_resolve(publisher), ZMQ_SNDTIMEO, &sendtmo, sizeof (int)));
zsock_bind(publisher,"tcp://0.0.0.0:5556");
sleep (1);
printf ("To start pub press any key\n");
getchar();
while (1) {
sprintf (pub_data,"pub-hello : %d", counter++);
rtn = zstr_send(publisher,pub_data);
printf ("Sending %s rtn: %d\n",pub_data,rtn );
getchar();
}
zsock_destroy(&publisher);
return NULL;

}
int main ()
{
pthread_t thread2;

pthread_create(&thread2, NULL, pub_thread, NULL);
pthread_join( thread2, NULL);
return 0;
}

What's the actual result? (include assertion message & call stack if applicable)

gdb) bt
#0 __pthread_kill_implementation (threadid=, signo=signo@entry=6, no_tid=no_tid@entry=0) at pthread_kill.c:44
#1 0x00007f93e2c423dd in __pthread_kill_internal (threadid=, signo=signo@entry=6) at pthread_kill.c:78
#2 0x00007f93e2c423f4 in GI___pthread_kill (threadid=, signo=signo@entry=6) at pthread_kill.c:89
#3 0x00007f93e2bf7cbe in GI_raise (sig=sig@entry=6) at ../sysdeps/posix/raise.c:26
#4 0x00007f93e2bf8d37 in GI_abort () at abort.c:79
#5 0x00007f93e2e03bf6 in zmq::zmq_abort (errmsg
=errmsg
@entry=0x7f93e2e48d0e "input_stopped") at /usr/src/debug/zeromq/4.3.4-r0/zeromq-4.3.4/src/err.cpp:88
#6 0x00007f93e2e2a03b in zmq::stream_engine_base_t::restart_input (this=0x7f93d4001500) at /usr/src/debug/zeromq/4.3.4-r0/zeromq-4.3.4/src/stream_engine_base.cpp:417
#7 0x00007f93e2e1ccba in zmq::session_base_t::write_activated (this=, pipe
=) at /usr/src/debug/zeromq/4.3.4-r0/zeromq-4.3.4/src/session_base.cpp:333
#8 0x00007f93e2e11e17 in zmq::pipe_t::process_activate_write (this=, msgs_read
=) at /usr/src/debug/zeromq/4.3.4-r0/zeromq-4.3.4/src/pipe.cpp:301
#9 0x00007f93e2e0de2d in zmq::object_t::process_command (this=0x7f93dc00ee10, cmd=...) at /usr/src/debug/zeromq/4.3.4-r0/zeromq-4.3.4/src/object.cpp:79
#10 0x00007f93e2e0493d in zmq::io_thread_t::in_event (this=) at /usr/src/debug/zeromq/4.3.4-r0/zeromq-4.3.4/src/io_thread.cpp:91
#11 0x00007f93e2e0334b in zmq::epoll_t::loop (this=0x7f93dc005020) at /usr/src/debug/zeromq/4.3.4-r0/zeromq-4.3.4/src/epoll.cpp:206
#12 0x00007f93e2e16046 in zmq::worker_poller_base_t::worker_routine (arg=) at /usr/src/debug/zeromq/4.3.4-r0/zeromq-4.3.4/src/poller_base.cpp:146
#13 0x00007f93e2e2e78a in thread_routine (arg=0x7f93dc005078) at /usr/src/debug/zeromq/4.3.4-r0/zeromq-4.3.4/src/thread.cpp:257
#14 0x00007f93e2c41018 in start_thread (arg=) at pthread_create.c:442
#15 0x00007f93e2cbe670 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81

(gdb) frame 6
(gdb) print _input_stopped
$1 = false

What's the expected result?

I expect no abort. The messages should be dropped at the publisher side.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant