From adb184eb33f49082e581a8a1b4c8d27cb63ef659 Mon Sep 17 00:00:00 2001 From: Sima <64804941+sima-fastly@users.noreply.github.com> Date: Mon, 22 Jan 2024 09:40:51 -0800 Subject: [PATCH] boostification of qzmq readyread (#47884) boostification of qzmq readyread --- src/cpp/m2adapter/m2adapterapp.cpp | 3 +- src/cpp/m2adapter/m2adapterapp.h | 1 + src/cpp/proxy/testhttprequest.cpp | 4 +- src/cpp/proxy/websocketoverhttp.cpp | 2 +- .../qzmq/examples/helloclient/helloclient.cpp | 24 ++-- .../qzmq/examples/helloserver/helloserver.cpp | 23 ++-- src/cpp/qzmq/src/qzmqreprouter.cpp | 6 +- src/cpp/qzmq/src/qzmqreprouter.h | 4 +- src/cpp/qzmq/src/qzmqsocket.cpp | 2 +- src/cpp/qzmq/src/qzmqsocket.h | 4 +- src/cpp/qzmq/src/qzmqvalve.cpp | 5 +- src/cpp/zhttpmanager.cpp | 125 +++++++++--------- 12 files changed, 103 insertions(+), 100 deletions(-) diff --git a/src/cpp/m2adapter/m2adapterapp.cpp b/src/cpp/m2adapter/m2adapterapp.cpp index 0c8d6624..478c649c 100644 --- a/src/cpp/m2adapter/m2adapterapp.cpp +++ b/src/cpp/m2adapter/m2adapterapp.cpp @@ -452,6 +452,7 @@ class M2AdapterApp::Private : public QObject QTimer *refreshTimer; Connection quitConnection; Connection hupConnection; + map rrConnection; Private(M2AdapterApp *_q) : QObject(_q), @@ -673,7 +674,7 @@ class M2AdapterApp::Private : public QObject sock->setShutdownWaitTime(0); sock->setHwm(1); // queue up 1 outstanding request at most sock->setWriteQueueEnabled(false); - connect(sock, &QZmq::Socket::readyRead, this, &Private::m2_control_readyRead); + rrConnection[sock] = sock->readyRead.connect(boost::bind(&Private::m2_control_readyRead, this)); log_info("m2_control connect %s:%s", m2_send_idents[n].data(), qPrintable(spec)); sock->connectToAddress(spec); diff --git a/src/cpp/m2adapter/m2adapterapp.h b/src/cpp/m2adapter/m2adapterapp.h index f2c846cc..fe765543 100644 --- a/src/cpp/m2adapter/m2adapterapp.h +++ b/src/cpp/m2adapter/m2adapterapp.h @@ -26,6 +26,7 @@ #include #include +using std::map; using SignalInt = boost::signals2::signal; using Connection = boost::signals2::scoped_connection; diff --git a/src/cpp/proxy/testhttprequest.cpp b/src/cpp/proxy/testhttprequest.cpp index 83e6bace..daafcccd 100644 --- a/src/cpp/proxy/testhttprequest.cpp +++ b/src/cpp/proxy/testhttprequest.cpp @@ -74,7 +74,7 @@ public slots: responseBody += QByteArray("request too large\n"); state = Responded; - emit q->readyRead(); + q->readyRead(); return; } @@ -131,7 +131,7 @@ public slots: } state = Responded; - emit q->readyRead(); + q->readyRead(); } }; diff --git a/src/cpp/proxy/websocketoverhttp.cpp b/src/cpp/proxy/websocketoverhttp.cpp index 753fc90c..640661cc 100644 --- a/src/cpp/proxy/websocketoverhttp.cpp +++ b/src/cpp/proxy/websocketoverhttp.cpp @@ -845,7 +845,7 @@ private slots: if(emitReadyRead) { - emit q->readyRead(); + q->readyRead(); if(!self) return; } diff --git a/src/cpp/qzmq/examples/helloclient/helloclient.cpp b/src/cpp/qzmq/examples/helloclient/helloclient.cpp index b0c39e3c..10c0ab12 100644 --- a/src/cpp/qzmq/examples/helloclient/helloclient.cpp +++ b/src/cpp/qzmq/examples/helloclient/helloclient.cpp @@ -2,6 +2,9 @@ #include #include #include "qzmqsocket.h" +#include + +using Connection = boost::signals2::scoped_connection; class App : public QObject { @@ -9,6 +12,8 @@ class App : public QObject private: QZmq::Socket sock; + Connection rrConnection; + Connection mwConnection; public: App() : @@ -21,11 +26,18 @@ class App : public QObject printf("messages written: %d\n", count); } + void sock_readyRead() + { + QList resp = sock.read(); + printf("read: %s\n", resp[0].data()); + emit quit(); + } + public slots: void start() { - connect(&sock, SIGNAL(readyRead()), SLOT(sock_readyRead())); - sock.messagesWritten.connect(boost::bind(&Private::sock_messagesWritten, this, boost::placeholders::_1)); + rrConnection = sock.readyRead.connect(boost::bind(&Private::sock_readyRead, this)); + mwConnection = sock.messagesWritten.connect(boost::bind(&Private::sock_messagesWritten, this, boost::placeholders::_1)); sock.connectToAddress("tcp://localhost:5555"); QByteArray out = "hello"; printf("writing: %s\n", out.data()); @@ -34,14 +46,6 @@ public slots: signals: void quit(); - -private slots: - void sock_readyRead() - { - QList resp = sock.read(); - printf("read: %s\n", resp[0].data()); - emit quit(); - } }; int main(int argc, char **argv) diff --git a/src/cpp/qzmq/examples/helloserver/helloserver.cpp b/src/cpp/qzmq/examples/helloserver/helloserver.cpp index cca6f140..b5bf59a0 100644 --- a/src/cpp/qzmq/examples/helloserver/helloserver.cpp +++ b/src/cpp/qzmq/examples/helloserver/helloserver.cpp @@ -16,18 +16,6 @@ class App : public QObject printf("messages written: %d\n", count); } -public slots: - void start() - { - connect(&sock, SIGNAL(readyRead()), SLOT(sock_readyRead())); - sock.messagesWritten.connect(boost::bind(&Private::sock_messagesWritten, this, boost::placeholders::_1)); - sock.bind("tcp://*:5555"); - } - -signals: - void quit(); - -private slots: void sock_readyRead() { QZmq::ReqMessage msg = sock.read(); @@ -42,6 +30,17 @@ private slots: printf("writing: %s\n", out.data()); sock.write(msg.createReply(QList() << out)); } + +public slots: + void start() + { + rrConnection = sock.readyRead.connect(boost::bind(&Private::sock_readyRead, this)); + mwConnection = sock.messagesWritten.connect(boost::bind(&Private::sock_messagesWritten, this, boost::placeholders::_1)); + sock.bind("tcp://*:5555"); + } + +signals: + void quit(); }; int main(int argc, char **argv) diff --git a/src/cpp/qzmq/src/qzmqreprouter.cpp b/src/cpp/qzmq/src/qzmqreprouter.cpp index 69754616..ca215a8a 100644 --- a/src/cpp/qzmq/src/qzmqreprouter.cpp +++ b/src/cpp/qzmq/src/qzmqreprouter.cpp @@ -36,13 +36,14 @@ class RepRouter::Private : public QObject RepRouter *q; Socket *sock; Connection mWConnection; + Connection rrConnection; Private(RepRouter *_q) : QObject(_q), q(_q) { sock = new Socket(Socket::Router, this); - connect(sock, SIGNAL(readyRead()), SLOT(sock_readyRead())); + rrConnection = sock->readyRead.connect(boost::bind(&Private::sock_readyRead, this)); mWConnection = sock->messagesWritten.connect(boost::bind(&Private::sock_messagesWritten, this, boost::placeholders::_1)); } @@ -51,10 +52,9 @@ class RepRouter::Private : public QObject q->messagesWritten(count); } -public slots: void sock_readyRead() { - emit q->readyRead(); + q->readyRead(); } }; diff --git a/src/cpp/qzmq/src/qzmqreprouter.h b/src/cpp/qzmq/src/qzmqreprouter.h index 2edd560b..8234fec3 100644 --- a/src/cpp/qzmq/src/qzmqreprouter.h +++ b/src/cpp/qzmq/src/qzmqreprouter.h @@ -53,11 +53,9 @@ class RepRouter : public QObject ReqMessage read(); void write(const ReqMessage &message); + Signal readyRead; SignalInt messagesWritten; -signals: - void readyRead(); - private: Q_DISABLE_COPY(RepRouter) diff --git a/src/cpp/qzmq/src/qzmqsocket.cpp b/src/cpp/qzmq/src/qzmqsocket.cpp index a51fcb09..edd38b88 100644 --- a/src/cpp/qzmq/src/qzmqsocket.cpp +++ b/src/cpp/qzmq/src/qzmqsocket.cpp @@ -587,7 +587,7 @@ class Socket::Private : public QObject if(canRead) { QPointer self = this; - emit q->readyRead(); + q->readyRead(); if(!self) return; } diff --git a/src/cpp/qzmq/src/qzmqsocket.h b/src/cpp/qzmq/src/qzmqsocket.h index 69102373..b890d061 100644 --- a/src/cpp/qzmq/src/qzmqsocket.h +++ b/src/cpp/qzmq/src/qzmqsocket.h @@ -103,11 +103,9 @@ class Socket : public QObject QList read(); void write(const QList &message); + Signal readyRead; SignalInt messagesWritten; -signals: - void readyRead(); - private: Q_DISABLE_COPY(Socket) diff --git a/src/cpp/qzmq/src/qzmqvalve.cpp b/src/cpp/qzmq/src/qzmqvalve.cpp index 87a3f898..871aa172 100644 --- a/src/cpp/qzmq/src/qzmqvalve.cpp +++ b/src/cpp/qzmq/src/qzmqvalve.cpp @@ -38,6 +38,7 @@ class Valve::Private : public QObject bool isOpen; bool pendingRead; int maxReadsPerEvent; + boost::signals2::scoped_connection rrConnection; Private(Valve *_q) : QObject(_q), @@ -52,7 +53,7 @@ class Valve::Private : public QObject void setup(QZmq::Socket *_sock) { sock = _sock; - connect(sock, SIGNAL(readyRead()), SLOT(sock_readyRead())); + rrConnection = sock->readyRead.connect(boost::bind(&Private::sock_readyRead, this)); } void queueRead() @@ -90,7 +91,6 @@ class Valve::Private : public QObject } } -private slots: void sock_readyRead() { if(pendingRead) @@ -99,6 +99,7 @@ private slots: tryRead(); } +private slots: void queuedRead() { pendingRead = false; diff --git a/src/cpp/zhttpmanager.cpp b/src/cpp/zhttpmanager.cpp index c0dae917..b22fb0c7 100644 --- a/src/cpp/zhttpmanager.cpp +++ b/src/cpp/zhttpmanager.cpp @@ -108,6 +108,7 @@ class ZhttpManager::Private : public QObject Connection cosConnection; Connection cossConnection; Connection sosConnection; + Connection rrConnection; Private(ZhttpManager *_q) : QObject(_q), @@ -234,7 +235,7 @@ class ZhttpManager::Private : public QObject delete client_in_sock; client_req_sock = new QZmq::Socket(QZmq::Socket::Dealer, this); - connect(client_req_sock, &QZmq::Socket::readyRead, this, &Private::client_req_readyRead); + rrConnection = client_req_sock->readyRead.connect(boost::bind(&Private::client_req_readyRead, this)); client_req_sock->setHwm(OUT_HWM); client_req_sock->setShutdownWaitTime(CLIENT_WAIT_TIME); @@ -492,6 +493,67 @@ class ZhttpManager::Private : public QObject Q_UNUSED(count); } + void client_req_readyRead() + { + QPointer self = this; + + while(client_req_sock->canRead()) + { + QList msg = client_req_sock->read(); + if(msg.count() != 2) + { + log_warning("zhttp/zws client req: received message with parts != 2, skipping"); + continue; + } + + QByteArray dataRaw = msg[1]; + if(dataRaw.length() < 1 || dataRaw[0] != 'T') + { + log_warning("zhttp/zws client req: received message with invalid format (missing type), skipping"); + continue; + } + + QVariant data = TnetString::toVariant(dataRaw.mid(1)); + if(data.isNull()) + { + log_warning("zhttp/zws client req: received message with invalid format (tnetstring parse failed), skipping"); + continue; + } + + if(log_outputLevel() >= LOG_LEVEL_DEBUG) + LogUtil::logVariantWithContent(LOG_LEVEL_DEBUG, data, "body", "zhttp/zws client req: IN"); + + ZhttpResponsePacket p; + if(!p.fromVariant(data)) + { + log_warning("zhttp/zws client req: received message with invalid format (parse failed), skipping"); + continue; + } + + if(p.ids.count() != 1) + { + log_warning("zhttp/zws client req: received message with multiple ids, skipping"); + return; + } + + const ZhttpResponsePacket::Id &id = p.ids.first(); + + ZhttpRequest *req = clientReqsByRid.value(ZhttpRequest::Rid(instanceId, id.id)); + if(req) + { + req->handle(id.id, id.seq, p); + if(!self) + return; + + continue; + } + + log_debug("zhttp/zws client req: received message for unknown request id"); + + // NOTE: we don't respond with a cancel message in req mode + } + } + public slots: void client_in_readyRead(const QList &msg) { @@ -670,67 +732,6 @@ public slots: } } - void client_req_readyRead() - { - QPointer self = this; - - while(client_req_sock->canRead()) - { - QList msg = client_req_sock->read(); - if(msg.count() != 2) - { - log_warning("zhttp/zws client req: received message with parts != 2, skipping"); - continue; - } - - QByteArray dataRaw = msg[1]; - if(dataRaw.length() < 1 || dataRaw[0] != 'T') - { - log_warning("zhttp/zws client req: received message with invalid format (missing type), skipping"); - continue; - } - - QVariant data = TnetString::toVariant(dataRaw.mid(1)); - if(data.isNull()) - { - log_warning("zhttp/zws client req: received message with invalid format (tnetstring parse failed), skipping"); - continue; - } - - if(log_outputLevel() >= LOG_LEVEL_DEBUG) - LogUtil::logVariantWithContent(LOG_LEVEL_DEBUG, data, "body", "zhttp/zws client req: IN"); - - ZhttpResponsePacket p; - if(!p.fromVariant(data)) - { - log_warning("zhttp/zws client req: received message with invalid format (parse failed), skipping"); - continue; - } - - if(p.ids.count() != 1) - { - log_warning("zhttp/zws client req: received message with multiple ids, skipping"); - return; - } - - const ZhttpResponsePacket::Id &id = p.ids.first(); - - ZhttpRequest *req = clientReqsByRid.value(ZhttpRequest::Rid(instanceId, id.id)); - if(req) - { - req->handle(id.id, id.seq, p); - if(!self) - return; - - continue; - } - - log_debug("zhttp/zws client req: received message for unknown request id"); - - // NOTE: we don't respond with a cancel message in req mode - } - } - void server_in_stream_readyRead(const QList &msg) { if(msg.count() != 3)