Skip to content

Commit

Permalink
Merge pull request #2064 from Expensify/main
Browse files Browse the repository at this point in the history
Update expensify_prod branch
  • Loading branch information
pecanoro authored Jan 15, 2025
2 parents 9660ab5 + 543adaa commit 27ff9f4
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 16 deletions.
39 changes: 24 additions & 15 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2563,6 +2563,24 @@ STCPManager::Socket* SQLiteNode::_acceptSocket() {
return socket;
}

void SQLiteNode::_processPeerMessages(uint64_t& nextActivity, SQLitePeer* peer, bool unlimited) {
try {
size_t messagesDeqeued = 0;
while (true) {
SData message = peer->popMessage();
_onMESSAGE(peer, message);
messagesDeqeued++;
if (messagesDeqeued >= 100 && !unlimited) {
// We should run again immediately, we have more to do.
nextActivity = STimeNow();
break;
}
}
} catch (const out_of_range& e) {
// Ok, just no messages.
}
}

void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) {
unique_lock<decltype(_stateMutex)> uniqueLock(_stateMutex);

Expand Down Expand Up @@ -2638,10 +2656,12 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) {
{
_onConnect(peer);
_sendPING(peer);
_processPeerMessages(nextActivity, peer);
}
break;
case SQLitePeer::PeerPostPollStatus::SOCKET_ERROR:
{
_processPeerMessages(nextActivity, peer, true);
SData reconnect("RECONNECT");
reconnect["Reason"] = "socket error";
_sendToPeer(peer, reconnect);
Expand All @@ -2650,6 +2670,8 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) {
break;
case SQLitePeer::PeerPostPollStatus::SOCKET_CLOSED:
{
_processPeerMessages(nextActivity, peer, true);
peer->reset();
_onDisconnect(peer);
}
break;
Expand All @@ -2666,21 +2688,8 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) {
_sendPING(peer);
}
}
try {
size_t messagesDeqeued = 0;
while (true) {
SData message = peer->popMessage();
_onMESSAGE(peer, message);
messagesDeqeued++;
if (messagesDeqeued >= 100) {
// We should run again immediately, we have more to do.
nextActivity = STimeNow();
break;
}
}
} catch (const out_of_range& e) {
// Ok, just no messages.
}

_processPeerMessages(nextActivity, peer);
}
break;
}
Expand Down
2 changes: 2 additions & 0 deletions sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ class SQLiteNode : public STCPManager {

void _dieIfForkedFromCluster();

void _processPeerMessages(uint64_t& nextActivity, SQLitePeer* peer, bool unlimited = false);

const string _commandAddress;
const string _name;
const vector<SQLitePeer*> _peerList;
Expand Down
1 change: 0 additions & 1 deletion sqlitecluster/SQLitePeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ SQLitePeer::PeerPostPollStatus SQLitePeer::postPoll(fd_map& fdm, uint64_t& nextA
} else {
SHMMM("Lost peer connection after " << (STimeNow() - socket->openTime) / 1000 << "ms, reconnecting in " << delay / 1000 << "ms");
}
reset();
nextReconnect = STimeNow() + delay;
nextActivity = min(nextActivity, nextReconnect.load());
return PeerPostPollStatus::SOCKET_CLOSED;
Expand Down
2 changes: 2 additions & 0 deletions sqlitecluster/SQLitePeer.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class SQLitePeer {
// If there are no messages, throws `std::out_of_range`.
SData popMessage();

// NOTE: If this returns PeerPostPollStatus::SOCKET_CLOSED then the caller must call `reset` on this peer.
// This is not done internally becuase we need to expose the outstanding data on the socket before deleting it.
PeerPostPollStatus postPoll(fd_map& fdm, uint64_t& nextActivity);

// Send a message to this peer. Thread-safe.
Expand Down

0 comments on commit 27ff9f4

Please sign in to comment.