Skip to content

Commit

Permalink
boostification of req session
Browse files Browse the repository at this point in the history
  • Loading branch information
sima-fastly committed Jan 22, 2024
1 parent d184d71 commit 7a3cc29
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 60 deletions.
25 changes: 11 additions & 14 deletions src/cpp/proxy/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ class Engine::Private : public QObject
Connection socketReadyConnection;
Connection iRequestReadyConnection;
Connection inspectErrorConnection;
Connection inspectedConnection;
Connection finConnection;
Connection finishedByAcceptConnection;
map<ProxySession*, Connection> addNotAllowedConnection;
map<ProxySession*, Connection> finishedConnection;
map<ProxySession*, Connection> reqSessionDestroyedConnection;
Expand Down Expand Up @@ -592,10 +595,10 @@ class Engine::Private : public QObject
rs->setAutoShare(autoShare);

// TODO: use callbacks for performance
connect(rs, &RequestSession::inspected, this, &Private::rs_inspected);
inspectedConnection = rs->inspected.connect(boost::bind(&Private::rs_inspected, this, boost::placeholders::_1, rs));
inspectErrorConnection = rs->inspectError.connect(boost::bind(&Private::rs_inspectError, this, rs));
connect(rs, &RequestSession::finished, this, &Private::rs_finished);
connect(rs, &RequestSession::finishedByAccept, this, &Private::rs_finishedByAccept);
finConnection = rs->finished.connect(boost::bind(&Private::rs_finished, this, rs));
finishedByAcceptConnection = rs->finishedByAccept.connect(boost::bind(&Private::rs_finishedByAccept, this, rs));

requestSessions += rs;

Expand Down Expand Up @@ -729,22 +732,17 @@ class Engine::Private : public QObject
doProxy(rs);
}

private slots:
void rs_inspected(const InspectData &idata)
void rs_inspected(const InspectData &idata, RequestSession *rs)
{
RequestSession *rs = (RequestSession *)sender();

// if we get here, then the request must be proxied. if it was to be directly
// accepted, then finishedByAccept would have been emitted instead
assert(idata.doProxy);

doProxy(rs, &idata);
}

void rs_finished()
void rs_finished(RequestSession *rs)
{
RequestSession *rs = (RequestSession *)sender();

if(!rs->isSockJs())
logFinished(rs);

Expand All @@ -754,10 +752,8 @@ private slots:
tryTakeNext();
}

void rs_finishedByAccept()
void rs_finishedByAccept(RequestSession *rs)
{
RequestSession *rs = (RequestSession *)sender();

logFinished(rs, true);

requestSessions.remove(rs);
Expand All @@ -766,6 +762,7 @@ private slots:
tryTakeNext();
}

private slots:
void ps_addNotAllowed(ProxySession *ps)
{
ProxyItem *i = proxyItemsBySession.value(ps);
Expand Down Expand Up @@ -832,6 +829,7 @@ private slots:
tryTakeNext();
}

private:
void handler_retry_in_readyRead(const QList<QByteArray> &message)
{
if(message.count() != 1)
Expand Down Expand Up @@ -910,7 +908,6 @@ private slots:
}
}

private:
void stats_connMax(const StatsPacket &packet)
{
if(accept->canWriteImmediately())
Expand Down
45 changes: 19 additions & 26 deletions src/cpp/proxy/proxysession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ class ProxySession::Private : public QObject
Connection writeBytesChangedConnection;
Connection errorConnection;
Connection finishedConnection;
Connection bytesConnection;
Connection errConnection;
Connection pausedConneciton;
Connection headerConneciton;
Connection bodyConnection;

Private(ProxySession *_q, ZRoutes *_zroutes, ZrpcManager *_acceptManager, const LogUtil::Config &_logConfig, StatsManager *_statsManager) :
QObject(_q),
Expand Down Expand Up @@ -238,12 +243,13 @@ class ProxySession::Private : public QObject

sessionItems += si;
sessionItemsBySession.insert(rs, si);
connect(rs, &RequestSession::bytesWritten, this, &Private::rs_bytesWritten);
connect(rs, &RequestSession::errorResponding, this, &Private::rs_errorResponding);
connect(rs, &RequestSession::finished, this, &Private::rs_finished);
connect(rs, &RequestSession::paused, this, &Private::rs_paused);
connect(rs, &RequestSession::headerBytesSent, this, &Private::rs_headerBytesSent);
connect(rs, &RequestSession::bodyBytesSent, this, &Private::rs_bodyBytesSent);

bytesConnection = rs->bytesWritten.connect(boost::bind(&Private::rs_bytesWritten, this, boost::placeholders::_1, rs));
errConnection = rs->errorResponding.connect(boost::bind(&Private::rs_errorResponding, this, rs));
finishedConnection = rs->finished.connect(boost::bind(&Private::rs_finished, this, rs));
pausedConneciton = rs->paused.connect(boost::bind(&Private::rs_paused, this, rs));
headerConneciton = rs->headerBytesSent.connect(boost::bind(&Private::rs_headerBytesSent, this, boost::placeholders::_1, rs));
bodyConnection = rs->bodyBytesSent.connect(boost::bind(&Private::rs_bodyBytesSent, this, boost::placeholders::_1, rs));

HttpRequestData rsRequestData = rs->requestData();

Expand Down Expand Up @@ -1167,11 +1173,9 @@ class ProxySession::Private : public QObject
}
}

public slots:
void rs_bytesWritten(int count)
public:
void rs_bytesWritten(int count, RequestSession *rs)
{
RequestSession *rs = (RequestSession *)sender();

log_debug("proxysession: %p response bytes written id=%s: %d", q, rs->rid().second.data(), count);

SessionItem *si = sessionItemsBySession.value(rs);
Expand All @@ -1187,10 +1191,8 @@ public slots:
tryResponseRead();
}

void rs_finished()
void rs_finished(RequestSession *rs)
{
RequestSession *rs = (RequestSession *)sender();

log_debug("proxysession: %p response finished id=%s", q, rs->rid().second.data());

SessionItem *si = sessionItemsBySession.value(rs);
Expand Down Expand Up @@ -1228,10 +1230,8 @@ public slots:
}
}

void rs_paused()
void rs_paused(RequestSession *rs)
{
RequestSession *rs = (RequestSession *)sender();

log_debug("proxysession: %p response paused id=%s", q, rs->rid().second.data());

SessionItem *si = sessionItemsBySession.value(rs);
Expand Down Expand Up @@ -1333,10 +1333,8 @@ public slots:
}
}

void rs_errorResponding()
void rs_errorResponding(RequestSession *rs)
{
RequestSession *rs = (RequestSession *)sender();

log_debug("proxysession: %p response error id=%s", q, rs->rid().second.data());

SessionItem *si = sessionItemsBySession.value(rs);
Expand All @@ -1351,29 +1349,24 @@ public slots:
// don't destroy the RequestSession here. a finished signal will arrive next.
}

void rs_headerBytesSent(int count)
void rs_headerBytesSent(int count, RequestSession *rs)
{
RequestSession *rs = (RequestSession *)sender();

SessionItem *si = sessionItemsBySession.value(rs);
assert(si);

if(si->countClientSentBytes)
incCounter(Stats::ClientHeaderBytesSent, count);
}

void rs_bodyBytesSent(int count)
void rs_bodyBytesSent(int count, RequestSession *rs)
{
RequestSession *rs = (RequestSession *)sender();

SessionItem *si = sessionItemsBySession.value(rs);
assert(si);

if(si->countClientSentBytes)
incCounter(Stats::ClientContentBytesSent, count);
}

public:
void acceptRequest_finished()
{
if(acceptRequest->success())
Expand Down
20 changes: 10 additions & 10 deletions src/cpp/proxy/requestsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ class RequestSession::Private : public QObject

state = WaitingForResponse;
requestData.body = in.take();
emit q->inspected(idata);
q->inspected(idata);
}
}
else if(state == ReceivingForAccept)
Expand Down Expand Up @@ -817,7 +817,7 @@ class RequestSession::Private : public QObject
if(zhttpRequest->isFinished())
{
cleanup();
emit q->finished();
q->finished();
}
}

Expand Down Expand Up @@ -867,7 +867,7 @@ class RequestSession::Private : public QObject
{
log_debug("requestsession: request error id=%s", rid.second.data());
cleanup();
emit q->finished();
q->finished();
}

void inspectRequest_finished()
Expand Down Expand Up @@ -912,7 +912,7 @@ class RequestSession::Private : public QObject
{
state = WaitingForResponse;
requestData.body = in.take();
emit q->inspected(idata);
q->inspected(idata);
}
}
}
Expand All @@ -935,7 +935,7 @@ class RequestSession::Private : public QObject
zhttpRequest = 0;

cleanup();
emit q->finishedByAccept();
q->finishedByAccept();
}
else
{
Expand Down Expand Up @@ -1004,7 +1004,7 @@ public slots:
zhttpRequest->writeBody(body);
responseBodySize += body.size();
zhttpRequest->endBody();
emit q->errorResponding();
q->errorResponding();
return;
}

Expand Down Expand Up @@ -1051,7 +1051,7 @@ public slots:
zhttpRequest->writeBody(body);
responseBodySize += body.size();
zhttpRequest->endBody();
emit q->errorResponding();
q->errorResponding();
return;
}

Expand Down Expand Up @@ -1119,7 +1119,7 @@ public slots:

// if we error while streaming, all we can do is give up
zhttpRequest->endBody();
emit q->errorResponding();
q->errorResponding();
return;
}

Expand Down Expand Up @@ -1357,7 +1357,7 @@ void RequestSession::startResponse(int code, const QByteArray &reason, const Htt
{
assert(d->state == Private::ReceivingForAccept || d->state == Private::WaitingForResponse);

emit headerBytesSent(ZhttpManager::estimateResponseHeaderBytes(code, reason, headers));
headerBytesSent(ZhttpManager::estimateResponseHeaderBytes(code, reason, headers));

d->state = Private::RespondingStart;
d->responseData.code = code;
Expand All @@ -1372,7 +1372,7 @@ void RequestSession::writeResponseBody(const QByteArray &body)
assert(d->state == Private::RespondingStart || d->state == Private::Responding);
assert(!d->responseBodyFinished);

emit bodyBytesSent(body.size());
bodyBytesSent(body.size());

d->out += body;
d->responseUpdate();
Expand Down
19 changes: 9 additions & 10 deletions src/cpp/proxy/requestsession.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <boost/signals2.hpp>

using Signal = boost::signals2::signal<void()>;
using SignalInt = boost::signals2::signal<void(int)>;
using Connection = boost::signals2::scoped_connection;

class QHostAddress;
Expand Down Expand Up @@ -104,20 +105,18 @@ class RequestSession : public QObject

Signal inspectError;

signals:
void inspected(const InspectData &idata);
void finished();
void finishedByAccept();
void bytesWritten(int count);
void paused();
void headerBytesSent(int count);
void bodyBytesSent(int count);

boost::signals2::signal<void(const InspectData&)> inspected;
Signal finished;
Signal finishedByAccept;
SignalInt bytesWritten;
Signal paused;
SignalInt headerBytesSent;
SignalInt bodyBytesSent;
// this signal means some error was encountered while responding and
// that you should not attempt to call further response-related
// methods. the object remains in an active state though, and so you
// should still wait for finished()
void errorResponding();
Signal errorResponding;

private:
class Private;
Expand Down

0 comments on commit 7a3cc29

Please sign in to comment.