Skip to content

Commit

Permalink
httpsession: simplify queue processing logic
Browse files Browse the repository at this point in the history
  • Loading branch information
jkarneges committed Feb 1, 2025
1 parent dc33a67 commit 164cdc1
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 52 deletions.
117 changes: 72 additions & 45 deletions src/handler/httpsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class HttpSession::Private : public QObject
Priority needUpdatePriority;
UpdateAction *pendingAction;
QList<QueuedItem> publishQueue;
bool processingSendQueue;
bool inProcessPublishQueue;
QByteArray retryToAddress;
RetryRequestPacket retryPacket;
LogUtil::Config logConfig;
Expand Down Expand Up @@ -223,7 +223,7 @@ class HttpSession::Private : public QObject
retries(0),
needUpdate(false),
pendingAction(0),
processingSendQueue(false),
inProcessPublishQueue(false),
responseFilters(0),
connectionSubscriptionMax(_connectionSubscriptionMax),
connectionStatsActive(true)
Expand Down Expand Up @@ -430,15 +430,22 @@ class HttpSession::Private : public QObject

if(f.type == PublishFormat::HttpResponse)
{
if(state != Holding)
return;

assert(instruct.holdMode == Instruct::ResponseHold);

publishQueue += QueuedItem(item, exposeHeaders);
if(state == SendingQueue || state == Holding)
{
if(publishQueue.count() < PUBLISH_QUEUE_MAX)
{
publishQueue += QueuedItem(item, exposeHeaders);

state = SendingQueue;
trySendQueue();
if(state == Holding)
sendQueue();
}
else
{
log_debug("httpsession: publish queue at max, dropping");
}
}
}
else if(f.type == PublishFormat::HttpStream)
{
Expand All @@ -449,7 +456,7 @@ class HttpSession::Private : public QObject
publishQueue += QueuedItem(item);

if(state == Holding)
trySendQueue();
sendQueue();
}
else
{
Expand Down Expand Up @@ -736,23 +743,25 @@ class HttpSession::Private : public QObject
break;
}

if(!publishQueue.isEmpty())
{
state = SendingQueue;
trySendQueue();
}
else
{
sendQueueDone();
}
// if there are items to send, this will send them. if there are
// no items to send, this will end up changing state to Holding
sendQueue();
}
}

void trySendQueue()
void sendQueue()
{
processingSendQueue = true;
state = SendingQueue;

while(!publishQueue.isEmpty() && req->writeBytesAvailable() > 0 && !messageFilters)
processPublishQueue();
}

void processPublishQueue()
{
assert(!inProcessPublishQueue);
inProcessPublishQueue = true;

while(state == SendingQueue && !publishQueue.isEmpty() && req->writeBytesAvailable() > 0 && !messageFilters)
{
const QueuedItem &qi = publishQueue.first();
const PublishItem &item = qi.item;
Expand Down Expand Up @@ -835,41 +844,54 @@ class HttpSession::Private : public QObject
messageFilters->start(fc, body);
}

if(!messageFilters && instruct.holdMode == Instruct::StreamHold)
if(!messageFilters)
{
// the queue is empty or client buffer is full
// the state changed, the queue is empty, or the client buffer is full

if(state == SendingQueue)
if(state != SendingQueue || publishQueue.isEmpty())
{
if(publishQueue.isEmpty())
sendQueueDone();
// if the state changed or the queue is empty then we're done
sendQueueDone();
}
else if(state == Holding)
else
{
if(!publishQueue.isEmpty())
{
// if backlogged, turn off timers until we're able to send again
timer->stop();
updateManager->unregisterSession(q);
}
// client buffer can only be full in stream mode
assert(instruct.holdMode == Instruct::StreamHold);

// NOTE: we can end up here multiple times in a single pass
// of the queue if the client buffer becomes full multiple
// times. so, whatever happens here should be idempotent and
// cheap.

// turn off timers until we're able to send again
timer->stop();
updateManager->unregisterSession(q);
}
}

processingSendQueue = false;
inProcessPublishQueue = false;
}

void sendQueueDone()
{
// if the state changed during queue processing (e.g. to Closing),
// then we want to leave the state alone and do nothing else
if(state != SendingQueue)
return;

state = Holding;

activeChannels.clear();
if(instruct.holdMode == Instruct::StreamHold)
{
activeChannels.clear();

// start keep alive timer, if it wasn't started already
if(!timer->isActive())
setupKeepAlive();
// start keep alive timer, if it wasn't started already
if(!timer->isActive())
setupKeepAlive();

if(!nextUri.isEmpty() && instruct.nextLinkTimeout >= 0)
updateManager->registerSession(q, instruct.nextLinkTimeout, nextUri);
if(!nextUri.isEmpty() && instruct.nextLinkTimeout >= 0)
updateManager->registerSession(q, instruct.nextLinkTimeout, nextUri);
}

if(needUpdate)
update(needUpdatePriority);
Expand Down Expand Up @@ -1395,8 +1417,8 @@ class HttpSession::Private : public QObject
processItem(qi.item, result.sendAction, result.content, qi.exposeHeaders);

// if filters finished asynchronously then we need to resume processing
if(!processingSendQueue)
trySendQueue();
if(!inProcessPublishQueue)
processPublishQueue();
}

void processItem(const PublishItem &item, Filter::SendAction sendAction, const QByteArray &content, const QList<QByteArray> &exposeHeaders)
Expand Down Expand Up @@ -1439,7 +1461,8 @@ class HttpSession::Private : public QObject
{
activeChannels.clear();

updateManager->registerSession(q, instruct.nextLinkTimeout, nextUri);
// all channels had activity. reset the timeout
updateManager->registerSession(q, instruct.nextLinkTimeout, nextUri, true);
}
}
}
Expand Down Expand Up @@ -1505,9 +1528,13 @@ class HttpSession::Private : public QObject
{
tryProcessOutReq();
}
else if(state == SendingQueue || state == Holding)
else if(state == SendingQueue)
{
trySendQueue();
// in this state, the writeBytesChanged signal is only
// interesting if it indicates write bytes are available

if(req->writeBytesAvailable() > 0)
processPublishQueue();
}
}

Expand Down
14 changes: 8 additions & 6 deletions src/handler/httpsessionupdatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class HttpSessionUpdateManager::Private : public QObject
delete bucket;
}

void registerSession(HttpSession *hs, int timeout, const QUrl &uri)
void registerSession(HttpSession *hs, int timeout, const QUrl &uri, bool resetTimeout)
{
QUrl tmp = uri;
tmp.setQuery(QString()); // remove the query part
Expand All @@ -92,9 +92,11 @@ class HttpSessionUpdateManager::Private : public QObject
{
if(bucket->sessions.contains(hs))
{
// if the session is already in this bucket, flag it
// for later processing
bucket->deferredSessions += hs;
if(resetTimeout)
{
// flag for later processing
bucket->deferredSessions += hs;
}
}
else
{
Expand Down Expand Up @@ -186,9 +188,9 @@ HttpSessionUpdateManager::~HttpSessionUpdateManager()
delete d;
}

void HttpSessionUpdateManager::registerSession(HttpSession *hs, int timeout, const QUrl &uri)
void HttpSessionUpdateManager::registerSession(HttpSession *hs, int timeout, const QUrl &uri, bool resetTimeout)
{
d->registerSession(hs, timeout, uri);
d->registerSession(hs, timeout, uri, resetTimeout);
}

void HttpSessionUpdateManager::unregisterSession(HttpSession *hs)
Expand Down
4 changes: 3 additions & 1 deletion src/handler/httpsessionupdatemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ class HttpSessionUpdateManager : public QObject
HttpSessionUpdateManager(QObject *parent = 0);
~HttpSessionUpdateManager();

void registerSession(HttpSession *hs, int timeout, const QUrl &uri);
// no-op if session already registered and resetTimeout=false
void registerSession(HttpSession *hs, int timeout, const QUrl &uri, bool resetTimeout = false);

void unregisterSession(HttpSession *hs);

private:
Expand Down

0 comments on commit 164cdc1

Please sign in to comment.