Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

when I disable broker, client which use set_pingresp_timeout(2s) would block in force_disconnect() for more than 30 minutes. #976

Closed
ForrestGt opened this issue May 24, 2023 · 11 comments

Comments

@ForrestGt
Copy link

ForrestGt commented May 24, 2023

Hello,
I have a issue as follow.
I have called set_pingresp_timeout(20s) in my mqtt client. I think if I pause the broker(or unplug network cable), mqtt client would call force_disconnect() and then on_error callback is called. but actually mqtt client block in force_disconnect() for more than 30 minutes.

following is the code for reproducing this issue.

using client_t = decltype(
	MQTT_NS::make_tls_sync_client(
		std::declval<boost::asio::io_context&>(),
		std::string(),
		std::string()
	)
);


boost::asio::io_context ioc;
client_t c = nullptr ;

int main(int argc, char** argv) {

	std::string host = "10.161.93.83";
	std::string port = "8883";
	std::string cacert = "/opt/emerson_mqtt/root.pem";
	std::string cert = "/opt/emerson_mqtt/gt.pem";
	std::string private_key = "/opt/emerson_mqtt/gt.key";
	
	c = MQTT_NS::make_tls_sync_client(ioc, host, port, mqtt::protocol_version::v3_1_1);

	using packet_id_t = typename std::remove_reference_t<decltype(*c)>::packet_id_t;

	char  cli_id[128]= {0};
	int i_Rand = rand();
	sprintf(cli_id, "%d", i_Rand);
	c->set_client_id(cli_id);
	c->set_clean_session(true);
	c->set_keep_alive_sec(20,std::chrono::seconds(5));    

	
	c->get_ssl_context().load_verify_file(cacert);
	c->get_ssl_context().use_certificate_chain_file(cert);
	c->get_ssl_context().use_private_key_file(private_key, boost::asio::ssl::context::file_format::pem);
	c->set_pingresp_timeout(std::chrono::seconds(20));

#if OPENSSL_VERSION_NUMBER >= 0x10101000L
		SSL_CTX_set_keylog_callback(
			c->get_ssl_context().native_handle(),
			[](SSL const*, char const* line) {
				 std::cout <<"SSL_CTX callback keylog  " << syscall(SYS_gettid) << std::endl;
			}
		);
#endif // OPENSSL_VERSION_NUMBER >= 0x10101000L
	
	// Setup handlers
	c->set_connack_handler(
		[&c]
		(bool sp, MQTT_NS::connect_return_code connack_return_code){
			if (connack_return_code == MQTT_NS::connect_return_code::accepted) {
				std::cout << "Connect successful" << std::endl;
			}
			return true;
		});
	c->set_close_handler(
		[]
		(){					
			std::cout << "close_handler: closed." << std::endl;
		});
	c->set_error_handler(
		[]
		(MQTT_NS::error_code ec){
			std::cout << "ec.message: " << ec.message() <<"  ec.value=" << ec.value() <<  "  " << getDateTime() << std::endl;
		});
	c->set_pingresp_handler(
		[&]
		()
		{
		  std::cout << "pingresp_handler.  "<< getDateTime() << std::endl;
		  return true;
		});

	std::this_thread::sleep_until(std::chrono::steady_clock::now() + std::chrono::milliseconds(1000));
	std::cout << "start connect to broker and run io_context"  <<std::endl;
	c->connect();
	ioc.run();
	std::cout << "io_context has released " <<std::endl;
	
	while(1)
	{
		std::this_thread::sleep_until(std::chrono::steady_clock::now() + std::chrono::milliseconds(1000));
	}
}


Following is the backtrace in GDB after I pause the broker.

#0  0xf796fb60 in poll () from /lib/arm-linux-gnueabihf/libc.so.6
#1  0x00418ddc in boost::asio::detail::socket_ops::poll_read (s=6, state=0 '\000', msec=-1, ec=...)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/detail/impl/socket_ops.ipp:2244
#2  0x00418510 in boost::asio::detail::socket_ops::sync_recv1 (s=6, state=18 '\022', data=0x627b88, 
    size=17408, flags=0, ec=...)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/detail/impl/socket_ops.ipp:894
#3  0x00488fdc in boost::asio::detail::reactive_socket_service_base::receive<boost::asio::mutable_buffer> (this=0x60441c, impl=..., buffers=..., flags=0, ec=...)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/detail/reactive_socket_service_base.hpp:364
#4  0x00481108 in boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>::read_some boost/asio/basic_stream_socket.hpp:978
#5  0x00554c48 in   asio/ssl/detail/io.hpp:47
#6  0x00553bd4 in boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::--Type <RET> for more, q to quit, c to continue without paging--
asio::any_io_executor> >::shutdown (this=0x61a6f0, ec=...)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/ssl/stream.hpp:562
#7  0x00550c50 in mqtt::tcp_endpoint<boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor> >, boost::asio::strand<boost::asio::io_context::basic_executor_type<std::allocator<void>, 0u> > >::shutdown_and_close_impl (this=0x61a6e8, s=..., ec=...)
    at ./mqtt/tcp_endpoint.hpp:171
#8  0x0054d2d8 in mqtt::tcp_endpoint<boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor> >, boost::asio::strand<boost::asio::io_context::basic_executor_type<std::allocator<void>, 0u> > >::clean_shutdown_and_close (this=0x61a6e8, ec=...)
    at ./mqtt/tcp_endpoint.hpp:102
#9  0x004822d8 in mqtt::endpoint<std::mutex, std::lock_guard, 2u>::sync_shutdown (this=0x5efdd0, s=...)
    at ./mqtt/endpoint.hpp:5319
#10 0x00488624 in mqtt::endpoint<std::mutex, std::lock_guard, 2u>::force_disconnect (this=0x5efdd0)
    at ./mqtt/endpoint.hpp:1103
#11 0x004807c0 in mqtt::endpoint<std::mutex, std::lock_guard, 2u>::set_pingresp_timer()::{lambda(boost::system::error_code)#1}::operator()(boost::system::error_code) const::{lambda()#1}::operator()() const (
    this=0x60af80) at ./mqtt/endpoint.hpp:11543
#12 0x004a340c in std::_Function_handler<void (), mqtt::endpoint<std::mutex, std::lock_guard, 2u>::set_pingresp_timer()::{lambda(boost::system::error_code)#1}::operator()(boost::system::error_code) const::{lambda()#1}>::_M_invoke(std::_Any_data const&) (__functor=...)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/include/c++/8.3.0/b--Type <RET> for more, q to quit, c to continue without paging--
its/std_function.h:297

I read the code of boost according to above backtrace.(the version of boost is boost_1_78_0). I found mqtt client is blocked at boost/asio/detail/impl/socket_ops.ipp:894.

// Wait for socket to become ready.
if (socket_ops::poll_read(s, 0, -1, ec) < 0)

because I have paused the broker , and the timeout is set to -1 to wait forever. so poll_read would never return.

@ForrestGt ForrestGt changed the title when I disable broker, client which use set_pingresp_timeout(2s) would block in force_disconnect() for more than 30 minutes. min when I disable broker, client which use set_pingresp_timeout(2s) would block in force_disconnect() for more than 30 minutes. May 24, 2023
@redboltz
Copy link
Owner

Thank you for reporting the issue.

Let me confirm your situation.

  • The client called force_disconnect() internally, that means pingresp timeout itself works well.
  • But the called force_disconnect() call is never rerurn.
  • The backtrace indicate where is the blocking point, and it is poll_read() with -1 argument.

Thanks to your analysis, I understand the root issue quickly.
At the backtrace #7, mqtt_cpp client called s.shutdown(ec) at the line 171.

#if defined(MQTT_USE_TLS)
void shutdown_and_close_impl(tls::stream<as::ip::tcp::socket>& s, boost::system::error_code& ec) {
s.shutdown(ec);
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "shutdown ec:"
<< ec.message();
shutdown_and_close_impl(lowest_layer(), ec);
}

The member function s.shutdown(ec) is documented
https://www.boost.org/doc/libs/1_82_0/doc/html/boost_asio/reference/ssl__stream/shutdown/overload2.html

s is the instance of ssl::stream. https://www.boost.org/doc/libs/1_82_0/doc/html/boost_asio/reference/ssl__stream.html

Unfortunalely, as far as I've checked, there is no way to set timeout to shutdown() member function.
If users want to set timeout, users need to use async_shutdown() instead of shutdown() with self made timeout (using steady_timer).
NOTE: shutdown() function return timing is depends on OS network setting in the trouble situation.

If I would look over some timeout option setter functionality, please let me know.

I guess that Boost.Asio doesn't provide timeout parameter to sync function such as shutdown() is design policy. If users want to introduce timeout mechanism, use async version.
The design policy is reflected to mqtt_cpp, so mqtt_cpp's timeout (including internal timeout) depends on asio function has timeout or not.

mqtt_cpp has async version of shutdown mechanism. It works well with timeout.

void async_shutdown_and_close_impl(tls::stream<as::ip::tcp::socket>& s, std::function<void(error_code)> handler) {
s.async_shutdown(
as::bind_executor(
strand_,
[this, &s, handler = force_move(handler)] (error_code ec) mutable {
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "shutdown ec:"
<< ec.message();
shutdown_and_close_impl(s.lowest_layer(), ec);
force_move(handler)(ec);
}
)
);
}

Your code uses sync version. So if you want to use timeout use async APIs instead of sync APIs.
e.g. call async_connect() instead of connect().

NOTE: connect() APIs also doesn't have timeout.

You might think that even if users use sync APIs, the shutdown part should be async version to support timeout.
Unfortunately, it is potentially very harmful. Hiding async function call affets unexpected behavior to the library.
See https://github.com/redboltz/mqtt_cpp/wiki/Sync-vs-Async

I recommend using async APIs like async_connect().

By the way, could you re-form your post? It's difficult to see (code part and text part).

@ForrestGt
Copy link
Author

ForrestGt commented May 25, 2023

thank you for such a quick reply.
I have done for re-forming my post.
I understand what you say. if I change the mode from sync to async, this problem could be solved. but it's need too much time to change our architecture and code of our app and test. (we have been using sync mode of mqtt_cpp for two years in our project.)

so the following is my solution in my app. Can you give me some advices?
(1) start a timer in my app. for example, set timeout of timer to 30s.
(2) start a thread to check if the timer has been timeout.
(3) register set_pingresp_handler callback.

     	c->set_pingresp_handler(
		[&]
		()
		{
		  //reset the timer in here.
		  return true;
		});

(4) if mqtt_client doesn't receive the pingresp for 30s(it's timeout.), call c->socket()->force_shutdown_and_close(ec) to release the thread and resource of mqtt_client.

I found that only force_shutdown_and_close(boost::system::error_code&) can stop the thread of mqtt_client when the thread of mqtt_client block in connect() or shutdown(ec). I guess this way is not a graceful disconnection. but I have to do so if there is network delay(for example,unpluging network cable)._

@redboltz
Copy link
Owner

force_shutdown_and_close is defined as follows:

MQTT_ALWAYS_INLINE void force_shutdown_and_close(boost::system::error_code& ec) override final {
tcp_.lowest_layer().shutdown(as::ip::tcp::socket::shutdown_both, ec);
tcp_.lowest_layer().close(ec);
}

lowest_layer() is https://www.boost.org/doc/libs/1_82_0/doc/html/boost_asio/reference/ip__tcp/socket.html

The bottom of the page, you can see the following description:


Thread Safety
Distinct objects: Safe.

Shared objects: Unsafe.

Synchronous send, receive, connect, and shutdown operations are thread safe with respect to each other, if the underlying operating system calls are also thread safe. This means


Distinct Object means socket1, socket2. Theread safe is obious as long as they are not shareing static member.
Shared object is your case. Basically it is thread unsafe but some conditions are satisfied, it is thread safe. But it is for shutdown() funtion. close() seems to be always unsafe.

So, if you think your environment, calling shutdown() from other thread is thread safe, you can try it. But calling close() cause UB. (I guess that it looks working fine but sometimes bad things would happen.)

Instead of calling force_shutdown_and_close(), you can get c->socket()->lowest_layer() (ip::tcp::socket), and call only shutdown(). Perhaps it would unblock the ssl::stream::shutdown.

@ForrestGt
Copy link
Author

ForrestGt commented May 26, 2023

thank you for you advice.
I will try to call c->socket()>lowest_layer().shutdown(as::ip::tcp::socket::shutdown_both, ec) to unblock ssl::stream::shutdown in our app.

by the way, I changed my demo from sync mode to async mode, and run it. but the result is same as the old version.

here is my code of async mode demo._

int main(int argc, char** argv) {	
	std::cout << "version 4 8 3"  << std::endl;
	srand(time(NULL));
	std::string host = "10.161.93.83";
	std::string port = "8883";
	std::string cacert = "/opt/emerson_mqtt/root.pem";
	std::string cert = "/opt/emerson_mqtt/gt.pem";
	std::string private_key = "/opt/emerson_mqtt/gt.key";

    MQTT_NS::setup_log();

    boost::asio::io_context ioc;

    std::uint16_t pid_sub1;
    std::uint16_t pid_sub2;

    int count = 0;
    // Create no TLS client
    auto c = MQTT_NS::make_tls_async_client(ioc,host, port, mqtt::protocol_version::v3_1_1);
    using packet_id_t = typename std::remove_reference_t<decltype(*c)>::packet_id_t;

    auto disconnect = [&] {
        if (++count == 5) {
            c->async_disconnect(
                // [optional] checking async_disconnect completion code
                []
                (MQTT_NS::error_code ec){
                    std::cout << "async_disconnect callback: " << ec.message() << std::endl;
                }
            );
        }
    };

	char  cli_id[128]= {0};
	int i_Rand = rand();
	sprintf(cli_id, "%d", i_Rand);
	c->set_client_id(cli_id);
	c->set_clean_session(true);
	c->set_keep_alive_sec(20,std::chrono::seconds(5));    

	
	c->get_ssl_context().load_verify_file(cacert);
	c->get_ssl_context().use_certificate_chain_file(cert);
	c->get_ssl_context().use_private_key_file(private_key, boost::asio::ssl::context::file_format::pem);
	c->set_pingresp_timeout(std::chrono::seconds(20));

#if OPENSSL_VERSION_NUMBER >= 0x10101000L
		SSL_CTX_set_keylog_callback(
			c->get_ssl_context().native_handle(),
			[](SSL const*, char const* line) {
				 std::cout <<"SSL_CTX callback keylog "  << std::endl;
			}
		);
#endif // OPENSSL_VERSION_NUMBER >= 0x10101000L

	// Setup handlers
	c->set_connack_handler(
		[&c]
		(bool sp, MQTT_NS::connect_return_code connack_return_code){
			if (connack_return_code == MQTT_NS::connect_return_code::accepted) {
				std::cout << "Connect successful" << std::endl;
			}
			return true;
		});
	c->set_close_handler(
		[]
		(){					
			std::cout << "close_handler: closed." << std::endl;
		});
	c->set_error_handler(
		[]
		(MQTT_NS::error_code ec){
			std::cout << "ec.message: " << ec.message() <<"  ec.value=" << ec.value() <<  "  " << getDateTime() << std::endl;
		});
	c->set_pingresp_handler(
		[&]
		()
		{
		  std::cout << "pingresp_handler.  "<< getDateTime() << std::endl;
		  return true;
		});


    // Connect
    c->async_connect(
        // [optional] checking underlying layer completion code
        []
        (MQTT_NS::error_code ec){
            std::cout << "async_connect callback: " << ec.message() << std::endl;
        }
    );

    ioc.run();
}

here is the backtrace.

#0  0xf796fb9c in poll () from /lib/arm-linux-gnueabihf/libc.so.6
#1  0x004196dc in boost::asio::detail::socket_ops::poll_read (s=6, state=0 '\000', msec=-1, ec=...)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/detail/impl/socket_ops.ipp:2244
#2  0x00418e10 in boost::asio::detail::socket_ops::sync_recv1 (s=6, state=18 '\022', data=0x632b88, size=17408, flags=0, ec=...)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/detail/impl/socket_ops.ipp:894
#3  0x005614d4 in boost::asio::detail::reactive_socket_service_base::receive<boost::asio::mutable_buffer> (this=0x60f41c, impl=..., buffers=..., flags=0, 
    ec=...)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/detail/reactive_socket_service_base.hpp:364
#4  0x005604ec in boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>::read_some<boost::asio::mutable_buffer> (
    this=0x6256f0, buffers=..., ec=...)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/basic_stream_socket.hpp:978
#5  0x0055f820 in boost::asio::ssl::detail::io<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, boost::asio::ssl::detail::shutdown_op> (next_layer=..., core=..., op=..., ec=...)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/ssl/detail/io.hpp:47
#6  0x0055e64c in boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor> >::shutdown (this=0x6256f0, 
    ec=...) at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/ssl/stream.hpp:562
#7  0x0055b48c in mqtt::tcp_endpoint<boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor> >, boost::asio::strand<boost::asio::io_context::basic_executor_type<std::allocator<void>, 0u> > >::shutdown_and_close_impl (this=0x6256e8, s=..., ec=...)
    at ./mqtt/tcp_endpoint.hpp:171
#8  0x005578a4 in mqtt::tcp_endpoint<boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor> >, boost::asio::strand<boost::asio::io_context::basic_executor_type<std::allocator<void>, 0u> > >::clean_shutdown_and_close (this=0x6256e8, ec=...)
    at ./mqtt/tcp_endpoint.hpp:102
#9  0x004926d8 in mqtt::endpoint<std::mutex, std::lock_guard, 2u>::sync_shutdown (this=0x5fadd0, s=...) at ./mqtt/endpoint.hpp:5319
#10 0x004984c0 in mqtt::endpoint<std::mutex, std::lock_guard, 2u>::force_disconnect (this=0x5fadd0) at ./mqtt/endpoint.hpp:1103
#11 0x00490cfc in mqtt::endpoint<std::mutex, std::lock_guard, 2u>::set_pingresp_timer()::{lambda(boost::system::error_code)#1}::operator()(boost::system::error_code) const::{lambda()#1}::operator()() const (this=0x616050) at ./mqtt/endpoint.hpp:11543
#12 0x004aff24 in std::_Function_handler<void (), mqtt::endpoint<std::mutex, std::lock_guard, 2u>::set_pingresp_timer()::{lambda(boost::system::error_code)#1}::operator()(boost::system::error_code) const::{lambda()#1}>::_M_invoke(std::_Any_data const&) (__functor=...)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/include/c++/8.3.0/bits/std_function.h:297
#13 0x004ca764 in std::function<void ()>::operator()() const (this=0xfffef6d4)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/include/c++/8.3.0/bits/std_function.h:687
--Type <RET> for more, q to quit, c to continue without paging--
#14 0x00568b34 in boost::asio::detail::binder0<std::function<void ()> >::operator()() (this=0xfffef6d4)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/detail/bind_handler.hpp:60
#15 0x00567cc4 in boost::asio::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, ...) (function=...)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/handler_invoke_hook.hpp:88
#16 0x00566fec in boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, std::function<void ()>&) (function=..., context=...)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/detail/handler_invoke_helpers.hpp:54
#17 0x0056522c in boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >*) (function=..., this_handler=0xfffef6d4)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/detail/bind_handler.hpp:111
#18 0x005642d0 in boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >&) (function=..., context=...)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/detail/handler_invoke_helpers.hpp:54
#19 0x00565420 in boost::asio::detail::executor_op<boost::asio::detail::binder0<std::function<void ()> >, std::allocator<void>, boost::asio::detail::scheduler_operation>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned int) (owner=0x625020, base=0x61ffd0)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/detail/executor_op.hpp:70
#20 0x00412608 in boost::asio::detail::scheduler_operation::complete (this=0x61ffd0, owner=0x625020, ec=..., bytes_transferred=0)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/detail/scheduler_operation.hpp:40
#21 0x0041ce78 in boost::asio::detail::strand_executor_service::run_ready_handlers (impl=...)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/detail/impl/strand_executor_service.ipp:150
#22 0x00568b88 in boost::asio::detail::strand_executor_service::invoker<boost::asio::io_context::basic_executor_type<std::allocator<void>, 0u> const, void>::operator() (this=0xfffef7a8)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/detail/impl/strand_executor_service.hpp:127
#23 0x00567d14 in boost::asio::asio_handler_invoke<boost::asio::detail::strand_executor_service::invoker<boost::asio::io_context::basic_executor_type<std::allocator<void>, 0u> const, void> > (function=...)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/handler_invoke_hook.hpp:88
#24 0x005670b8 in boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::strand_executor_service::invoker<boost::asio::io_context::basic_executor_type<std::allocator<void>, 0u> const, void>, boost::asio::detail::strand_executor_service::invoker<boost::asio::io_context::basic_executor_type<std::allocator<void>, 0u> const, void> > (function=..., context=...)
--Type <RET> for more, q to quit, c to continue without paging--
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/detail/handler_invoke_helpers.hpp:54
#25 0x00567f14 in boost::asio::detail::executor_op<boost::asio::detail::strand_executor_service::invoker<boost::asio::io_context::basic_executor_type<std::allocator<void>, 0u> const, void>, std::allocator<void>, boost::asio::detail::scheduler_operation>::do_complete (owner=0x5fad10, base=0x60cee0)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/detail/executor_op.hpp:70
#26 0x00412608 in boost::asio::detail::scheduler_operation::complete (this=0x60cee0, owner=0x5fad10, ec=..., bytes_transferred=0)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/detail/scheduler_operation.hpp:40
#27 0x00417120 in boost::asio::detail::scheduler::do_run_one (this=0x5fad10, lock=..., this_thread=..., ec=...)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/detail/impl/scheduler.ipp:492
#28 0x00416a54 in boost::asio::detail::scheduler::run (this=0x5fad10, ec=...)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/detail/impl/scheduler.ipp:210
#29 0x00417eb0 in boost::asio::io_context::run (this=0xfffef978)
    at /opt/arm-buildroot-linux-gnueabihf_sdk-buildroot/arm-buildroot-linux-gnueabihf/sysroot/usr/include/boost/asio/impl/io_context.ipp:63
#30 0x0040c47c in main (argc=1, argv=0xfffefc54) at main.cpp:122

and I check the code of mqtt_cpp about set_pingresp_timer.

void set_pingresp_timer() {
      if (pingresp_timeout_ == std::chrono::steady_clock::duration::zero()) return;
      if (tim_pingresp_set_) return;
      tim_pingresp_set_ = true;

      LockGuard<Mutex> lck (mtx_tim_pingresp_);
      tim_pingresp_.expires_after(pingresp_timeout_);
      std::weak_ptr<this_type> wp(std::static_pointer_cast<this_type>(this->shared_from_this()));
      tim_pingresp_.async_wait(
          [wp = force_move(wp)](error_code ec) {
              if (auto sp = wp.lock()) {
                  sp->tim_pingresp_set_ = false;
                  if (!ec) {
                      sp->socket().post(
                          [sp] {
                              sp->force_disconnect();
                          }
                      );
                  }
              }
          }
      );
  }

I found that whatever user use sync mode or async mode, it only call the sync api force_disconnect() in here. I think if I use the async mode , it should call the async api async_force_disconnect.

@ForrestGt
Copy link
Author

so this is the reason why it still call sync api "shutdown" even I use the async mode.

@redboltz
Copy link
Owner

Please try #977 for your async version example.

@ForrestGt
Copy link
Author

ForrestGt commented May 26, 2023

thank you, I have tested using the code of branch #977. it runs good for my demo. it quick close the connection after pingresp timeout.

@redboltz
Copy link
Owner

Thank you for testing. I merged #977. It solves async version of pingresp timeout issue.
The fix affects only for async APIs. So sync version issue is sitll remained but it is Boost.Asio API limitation.
I hope your sync version prolem would be solved using sync asio::ip::tcp::shutdown() calling from the different (your timer) thread.

NOTE:
It is off topic. I'm developping the new mqtt library that supports only async APIs based on mqtt_cpp experience. You said that you have no plan to move into mqtt_cpp async APIs. It is good. I don't recomment that migrating to mqtt_cpp APIs immediately because I will release the new async mqtt library soon. (But I can't promise when).

@ForrestGt
Copy link
Author

ForrestGt commented Jun 1, 2023

Hello,
thank you for your reminding.
but I have already change my mqtt client from sync mode to async mode in my free time.
I didn't plan to move into mqtt_cpp async APIs at first, because I thought It would take much time to finish this migrating.
actually it took me 3 days to code and 2 day to test. it runs perfect at present. The logic and structure are clearer after coding refactoring.
so I talk about this migrating with my leader. and she told me if it can verify by tester, I will check in this change.

I want to talk with your about some quetions

  1. about io_context lifecycle_
io_context ioc; 
ioc.run(); 

after called ioc.run(), the thread will block in here. the thread could not released except io_context released.
it is very important that io_context can release successfull in my mqtt client. so I am very care about this point.
(1) I found sometimes I call asnyc_connect(when use some invalid config), but the callback of asnyc_connect never called forever.
so I have to call async_force_disconnect to release the io_context in this situation.
(2) on_close callback has called after I closed the connection by async_disconnect. but io_context would auto released after a minute or two.
why it took 1 or 2 minute to release?
Is there a possibility that it will never be released?
(3) Did you know that in some cases the io context can never be released?

  1. my mqtt client current use v12.0.0 of mqtt_cpp lib. I want to upgrate to Fixed async_force_disconnect() call. #977. because It could support set_pingresp_timeout.
    I konw you will release the new async mqtt library soon which based on mqtt_cpp experience . are there any problems if I still use mqtt_cpp lib and not use the new async mqtt library?

thank you.

@redboltz
Copy link
Owner

redboltz commented Jun 1, 2023

after called ioc.run(), the thread will block in here. the thread could not released except io_context released.
it is very important that io_context can release successfull in my mqtt client. so I am very care about this point.

Yes, I agreed. It is very important not only ioc.run() correclty finish but also avoiding unexpected finish. If there is no events in ioc queue, then ioc.run() would return unexpectedly. In order to avoid it you can use work_guard.
https://www.boost.org/doc/libs/1_82_0/doc/html/boost_asio/reference/make_work_guard.html

(3) Did you know that in some cases the io context can never be released?

When mqtt_cpp call asio's async_read internallty, but no packet is received. In this case ioc ins never be released.

Now, you are interented in how to finish ioc.run() well.
async_connect() doesn't have timeout parameter by design. It is similar to https://www.boost.org/doc/libs/1_82_0/doc/html/boost_asio/reference/async_connect.html

In order to timeout, you can set timer s.g. steady_timer by yourself.
When timer is fired, you need to call async_force_disconnect(). On these error cases, async_force_disconnect() should be called. async_disconnect() sends MQTT DISCONNECT packet and then wait the underlying socket will close from the broker*. It is not right.
The name of disconnect is a little misleading. async_publish() means send MQTT PUBLISH packet asynchronously. Similarlly, async_disconnect() means send MQTT DISCONNECT packet asynchronously. If broker doesn't close the socket, socket is keep connecting.
#977 affects async_force_disconnect() so you need to use updated version. I will release it soon (maybe this weekend).

I konw you will release the new async mqtt library soon which based on mqtt_cpp experience . are there any problems if I still use mqtt_cpp lib and not use the new async mqtt library?

mqtt_cpp doesn't have serious problem.

I just published my new library named async_mqtt. The first release is not yet but all functionalites are implemented and the test coverage is 89.17%.

https://github.com/redboltz/async_mqtt
https://github.com/redboltz/async_mqtt/tree/main/example
https://github.com/redboltz/async_mqtt/tree/main/test/system

What is the difference between mqtt_cpp and async_mqtt ?

  1. mqtt_cpp has sync part and async part. It is difficult to manage them (mqtt_cpp developper side opinion)

The following parts are about async part of mqtt_cpp.

  1. mqtt_cpp only support callback APIs. It causes callback hell easily. Boost Asio provides very nice CompletionToken mechanism, but mqtt_cpp can't use them. I need to change the basic design to support CompletionToken. Comp token base moh #959, Add move only handler #957, Asio comp token support #954, and Introduced move_only_function. #953 are the results of I struggled and finaly gaveup ;)
  2. mqtt_cpp's underlying layer (TCP, TLS, WS, WSS) are limited.
  3. When I combine mqtt_cpp's async function and other asio based async funtion, it is difficult. Because when mqtt_cpp's callback returns, the next read is automatically happens. It is sometimes annoying. For example, you receives the 1st packet and callback is called, in the callback, you call Redis async_write API. You want to receive the next MQTT packet after redis API callback is invoked. However the next receive happens and the packet is processed concurrently. It could happen packet processing overtaking. In order to avoid it, mqtt_cpp provides delaying the next read funtionality.
    void set_mqtt_message_processed_handler(

    It is pretty complecated.
  4. mqtt_cpp's endpoint is difficult to get its type. decltype is required.

async_mqtt solves these problems.

@ForrestGt
Copy link
Author

Hello, I have replaced mqtt lib by the latest version and change the mode from sync to async.
it is running perfect. and the new feature has past testing and promoted to main branch of our project.
so I will close the issue. thanks for you help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants