Skip to content

Commit

Permalink
boostification of qzmq readyread (#47884)
Browse files Browse the repository at this point in the history
boostification of qzmq readyread
  • Loading branch information
sima-fastly authored Jan 22, 2024
1 parent d184d71 commit adb184e
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 100 deletions.
3 changes: 2 additions & 1 deletion src/cpp/m2adapter/m2adapterapp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ class M2AdapterApp::Private : public QObject
QTimer *refreshTimer;
Connection quitConnection;
Connection hupConnection;
map<QZmq::Socket*, Connection> rrConnection;

Private(M2AdapterApp *_q) :
QObject(_q),
Expand Down Expand Up @@ -673,7 +674,7 @@ class M2AdapterApp::Private : public QObject
sock->setShutdownWaitTime(0);
sock->setHwm(1); // queue up 1 outstanding request at most
sock->setWriteQueueEnabled(false);
connect(sock, &QZmq::Socket::readyRead, this, &Private::m2_control_readyRead);
rrConnection[sock] = sock->readyRead.connect(boost::bind(&Private::m2_control_readyRead, this));

log_info("m2_control connect %s:%s", m2_send_idents[n].data(), qPrintable(spec));
sock->connectToAddress(spec);
Expand Down
1 change: 1 addition & 0 deletions src/cpp/m2adapter/m2adapterapp.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <QObject>
#include <boost/signals2.hpp>

using std::map;
using SignalInt = boost::signals2::signal<void(int)>;
using Connection = boost::signals2::scoped_connection;

Expand Down
4 changes: 2 additions & 2 deletions src/cpp/proxy/testhttprequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public slots:
responseBody += QByteArray("request too large\n");

state = Responded;
emit q->readyRead();
q->readyRead();
return;
}

Expand Down Expand Up @@ -131,7 +131,7 @@ public slots:
}

state = Responded;
emit q->readyRead();
q->readyRead();
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/cpp/proxy/websocketoverhttp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ private slots:

if(emitReadyRead)
{
emit q->readyRead();
q->readyRead();
if(!self)
return;
}
Expand Down
24 changes: 14 additions & 10 deletions src/cpp/qzmq/examples/helloclient/helloclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@
#include <QCoreApplication>
#include <QTimer>
#include "qzmqsocket.h"
#include <boost/signals2.hpp>

using Connection = boost::signals2::scoped_connection;

class App : public QObject
{
Q_OBJECT

private:
QZmq::Socket sock;
Connection rrConnection;
Connection mwConnection;

public:
App() :
Expand All @@ -21,11 +26,18 @@ class App : public QObject
printf("messages written: %d\n", count);
}

void sock_readyRead()
{
QList<QByteArray> resp = sock.read();
printf("read: %s\n", resp[0].data());
emit quit();
}

public slots:
void start()
{
connect(&sock, SIGNAL(readyRead()), SLOT(sock_readyRead()));
sock.messagesWritten.connect(boost::bind(&Private::sock_messagesWritten, this, boost::placeholders::_1));
rrConnection = sock.readyRead.connect(boost::bind(&Private::sock_readyRead, this));
mwConnection = sock.messagesWritten.connect(boost::bind(&Private::sock_messagesWritten, this, boost::placeholders::_1));
sock.connectToAddress("tcp://localhost:5555");
QByteArray out = "hello";
printf("writing: %s\n", out.data());
Expand All @@ -34,14 +46,6 @@ public slots:

signals:
void quit();

private slots:
void sock_readyRead()
{
QList<QByteArray> resp = sock.read();
printf("read: %s\n", resp[0].data());
emit quit();
}
};

int main(int argc, char **argv)
Expand Down
23 changes: 11 additions & 12 deletions src/cpp/qzmq/examples/helloserver/helloserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,6 @@ class App : public QObject
printf("messages written: %d\n", count);
}

public slots:
void start()
{
connect(&sock, SIGNAL(readyRead()), SLOT(sock_readyRead()));
sock.messagesWritten.connect(boost::bind(&Private::sock_messagesWritten, this, boost::placeholders::_1));
sock.bind("tcp://*:5555");
}

signals:
void quit();

private slots:
void sock_readyRead()
{
QZmq::ReqMessage msg = sock.read();
Expand All @@ -42,6 +30,17 @@ private slots:
printf("writing: %s\n", out.data());
sock.write(msg.createReply(QList<QByteArray>() << out));
}

public slots:
void start()
{
rrConnection = sock.readyRead.connect(boost::bind(&Private::sock_readyRead, this));
mwConnection = sock.messagesWritten.connect(boost::bind(&Private::sock_messagesWritten, this, boost::placeholders::_1));
sock.bind("tcp://*:5555");
}

signals:
void quit();
};

int main(int argc, char **argv)
Expand Down
6 changes: 3 additions & 3 deletions src/cpp/qzmq/src/qzmqreprouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ class RepRouter::Private : public QObject
RepRouter *q;
Socket *sock;
Connection mWConnection;
Connection rrConnection;

Private(RepRouter *_q) :
QObject(_q),
q(_q)
{
sock = new Socket(Socket::Router, this);
connect(sock, SIGNAL(readyRead()), SLOT(sock_readyRead()));
rrConnection = sock->readyRead.connect(boost::bind(&Private::sock_readyRead, this));
mWConnection = sock->messagesWritten.connect(boost::bind(&Private::sock_messagesWritten, this, boost::placeholders::_1));
}

Expand All @@ -51,10 +52,9 @@ class RepRouter::Private : public QObject
q->messagesWritten(count);
}

public slots:
void sock_readyRead()
{
emit q->readyRead();
q->readyRead();
}
};

Expand Down
4 changes: 1 addition & 3 deletions src/cpp/qzmq/src/qzmqreprouter.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,9 @@ class RepRouter : public QObject
ReqMessage read();
void write(const ReqMessage &message);

Signal readyRead;
SignalInt messagesWritten;

signals:
void readyRead();

private:
Q_DISABLE_COPY(RepRouter)

Expand Down
2 changes: 1 addition & 1 deletion src/cpp/qzmq/src/qzmqsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ class Socket::Private : public QObject
if(canRead)
{
QPointer<QObject> self = this;
emit q->readyRead();
q->readyRead();
if(!self)
return;
}
Expand Down
4 changes: 1 addition & 3 deletions src/cpp/qzmq/src/qzmqsocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,9 @@ class Socket : public QObject
QList<QByteArray> read();
void write(const QList<QByteArray> &message);

Signal readyRead;
SignalInt messagesWritten;

signals:
void readyRead();

private:
Q_DISABLE_COPY(Socket)

Expand Down
5 changes: 3 additions & 2 deletions src/cpp/qzmq/src/qzmqvalve.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class Valve::Private : public QObject
bool isOpen;
bool pendingRead;
int maxReadsPerEvent;
boost::signals2::scoped_connection rrConnection;

Private(Valve *_q) :
QObject(_q),
Expand All @@ -52,7 +53,7 @@ class Valve::Private : public QObject
void setup(QZmq::Socket *_sock)
{
sock = _sock;
connect(sock, SIGNAL(readyRead()), SLOT(sock_readyRead()));
rrConnection = sock->readyRead.connect(boost::bind(&Private::sock_readyRead, this));
}

void queueRead()
Expand Down Expand Up @@ -90,7 +91,6 @@ class Valve::Private : public QObject
}
}

private slots:
void sock_readyRead()
{
if(pendingRead)
Expand All @@ -99,6 +99,7 @@ private slots:
tryRead();
}

private slots:
void queuedRead()
{
pendingRead = false;
Expand Down
125 changes: 63 additions & 62 deletions src/cpp/zhttpmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class ZhttpManager::Private : public QObject
Connection cosConnection;
Connection cossConnection;
Connection sosConnection;
Connection rrConnection;

Private(ZhttpManager *_q) :
QObject(_q),
Expand Down Expand Up @@ -234,7 +235,7 @@ class ZhttpManager::Private : public QObject
delete client_in_sock;

client_req_sock = new QZmq::Socket(QZmq::Socket::Dealer, this);
connect(client_req_sock, &QZmq::Socket::readyRead, this, &Private::client_req_readyRead);
rrConnection = client_req_sock->readyRead.connect(boost::bind(&Private::client_req_readyRead, this));

client_req_sock->setHwm(OUT_HWM);
client_req_sock->setShutdownWaitTime(CLIENT_WAIT_TIME);
Expand Down Expand Up @@ -492,6 +493,67 @@ class ZhttpManager::Private : public QObject
Q_UNUSED(count);
}

void client_req_readyRead()
{
QPointer<QObject> self = this;

while(client_req_sock->canRead())
{
QList<QByteArray> msg = client_req_sock->read();
if(msg.count() != 2)
{
log_warning("zhttp/zws client req: received message with parts != 2, skipping");
continue;
}

QByteArray dataRaw = msg[1];
if(dataRaw.length() < 1 || dataRaw[0] != 'T')
{
log_warning("zhttp/zws client req: received message with invalid format (missing type), skipping");
continue;
}

QVariant data = TnetString::toVariant(dataRaw.mid(1));
if(data.isNull())
{
log_warning("zhttp/zws client req: received message with invalid format (tnetstring parse failed), skipping");
continue;
}

if(log_outputLevel() >= LOG_LEVEL_DEBUG)
LogUtil::logVariantWithContent(LOG_LEVEL_DEBUG, data, "body", "zhttp/zws client req: IN");

ZhttpResponsePacket p;
if(!p.fromVariant(data))
{
log_warning("zhttp/zws client req: received message with invalid format (parse failed), skipping");
continue;
}

if(p.ids.count() != 1)
{
log_warning("zhttp/zws client req: received message with multiple ids, skipping");
return;
}

const ZhttpResponsePacket::Id &id = p.ids.first();

ZhttpRequest *req = clientReqsByRid.value(ZhttpRequest::Rid(instanceId, id.id));
if(req)
{
req->handle(id.id, id.seq, p);
if(!self)
return;

continue;
}

log_debug("zhttp/zws client req: received message for unknown request id");

// NOTE: we don't respond with a cancel message in req mode
}
}

public slots:
void client_in_readyRead(const QList<QByteArray> &msg)
{
Expand Down Expand Up @@ -670,67 +732,6 @@ public slots:
}
}

void client_req_readyRead()
{
QPointer<QObject> self = this;

while(client_req_sock->canRead())
{
QList<QByteArray> msg = client_req_sock->read();
if(msg.count() != 2)
{
log_warning("zhttp/zws client req: received message with parts != 2, skipping");
continue;
}

QByteArray dataRaw = msg[1];
if(dataRaw.length() < 1 || dataRaw[0] != 'T')
{
log_warning("zhttp/zws client req: received message with invalid format (missing type), skipping");
continue;
}

QVariant data = TnetString::toVariant(dataRaw.mid(1));
if(data.isNull())
{
log_warning("zhttp/zws client req: received message with invalid format (tnetstring parse failed), skipping");
continue;
}

if(log_outputLevel() >= LOG_LEVEL_DEBUG)
LogUtil::logVariantWithContent(LOG_LEVEL_DEBUG, data, "body", "zhttp/zws client req: IN");

ZhttpResponsePacket p;
if(!p.fromVariant(data))
{
log_warning("zhttp/zws client req: received message with invalid format (parse failed), skipping");
continue;
}

if(p.ids.count() != 1)
{
log_warning("zhttp/zws client req: received message with multiple ids, skipping");
return;
}

const ZhttpResponsePacket::Id &id = p.ids.first();

ZhttpRequest *req = clientReqsByRid.value(ZhttpRequest::Rid(instanceId, id.id));
if(req)
{
req->handle(id.id, id.seq, p);
if(!self)
return;

continue;
}

log_debug("zhttp/zws client req: received message for unknown request id");

// NOTE: we don't respond with a cancel message in req mode
}
}

void server_in_stream_readyRead(const QList<QByteArray> &msg)
{
if(msg.count() != 3)
Expand Down

0 comments on commit adb184e

Please sign in to comment.