From 95780b458995b32e75d8256faffe94838510e46e Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Mon, 13 Jan 2025 14:52:06 -0800 Subject: [PATCH 1/2] Fix fork when COMMIT_TRANSACTION is dropped --- sqlitecluster/SQLiteNode.cpp | 53 ++++++++++++++++++++++++++---------- sqlitecluster/SQLiteNode.h | 2 ++ sqlitecluster/SQLitePeer.cpp | 3 +- sqlitecluster/SQLitePeer.h | 1 + 4 files changed, 43 insertions(+), 16 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 047a0fd78..d0c4e0427 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -2563,6 +2563,37 @@ STCPManager::Socket* SQLiteNode::_acceptSocket() { return socket; } +void SQLiteNode::_processPeerMessages(uint64_t& nextActivity, SQLitePeer* peer, bool unlimited) { + try { + size_t messagesDeqeued = 0; + if (unlimited) { + if (peer->socket) { + string recvBuffer = peer->socket->recvBuffer.c_str(); + if (recvBuffer.size()) { + SINFO("TYLER: peer recv buffer " << peer->socket->recvBuffer); + } + } else { + SINFO("TYLER: no socket"); + } + } + while (true) { + SData message = peer->popMessage(); + _onMESSAGE(peer, message); + if (unlimited) { + SINFO("TYLER: processed message with unlimited set " << message.methodLine); + } + messagesDeqeued++; + if (messagesDeqeued >= 100 && !unlimited) { + // We should run again immediately, we have more to do. + nextActivity = STimeNow(); + break; + } + } + } catch (const out_of_range& e) { + // Ok, just no messages. + } +} + void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { unique_lock uniqueLock(_stateMutex); @@ -2633,15 +2664,18 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { // Now check established peer connections. for (SQLitePeer* peer : _peerList) { auto result = peer->postPoll(fdm, nextActivity); + switch (result) { case SQLitePeer::PeerPostPollStatus::JUST_CONNECTED: { _onConnect(peer); _sendPING(peer); + _processPeerMessages(nextActivity, peer); } break; case SQLitePeer::PeerPostPollStatus::SOCKET_ERROR: { + _processPeerMessages(nextActivity, peer, true); SData reconnect("RECONNECT"); reconnect["Reason"] = "socket error"; _sendToPeer(peer, reconnect); @@ -2650,6 +2684,8 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { break; case SQLitePeer::PeerPostPollStatus::SOCKET_CLOSED: { + _processPeerMessages(nextActivity, peer, true); + peer->reset(); _onDisconnect(peer); } break; @@ -2666,21 +2702,8 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { _sendPING(peer); } } - try { - size_t messagesDeqeued = 0; - while (true) { - SData message = peer->popMessage(); - _onMESSAGE(peer, message); - messagesDeqeued++; - if (messagesDeqeued >= 100) { - // We should run again immediately, we have more to do. - nextActivity = STimeNow(); - break; - } - } - } catch (const out_of_range& e) { - // Ok, just no messages. - } + + _processPeerMessages(nextActivity, peer); } break; } diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index 6377c9511..5cb8365b8 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -278,6 +278,8 @@ class SQLiteNode : public STCPManager { void _dieIfForkedFromCluster(); + void _processPeerMessages(uint64_t& nextActivity, SQLitePeer* peer, bool unlimited = false); + const string _commandAddress; const string _name; const vector _peerList; diff --git a/sqlitecluster/SQLitePeer.cpp b/sqlitecluster/SQLitePeer.cpp index a9636ca1d..0053f8fc2 100644 --- a/sqlitecluster/SQLitePeer.cpp +++ b/sqlitecluster/SQLitePeer.cpp @@ -121,7 +121,8 @@ SQLitePeer::PeerPostPollStatus SQLitePeer::postPoll(fd_map& fdm, uint64_t& nextA } else { SHMMM("Lost peer connection after " << (STimeNow() - socket->openTime) / 1000 << "ms, reconnecting in " << delay / 1000 << "ms"); } - reset(); + // Can't reset until the buffer is cleared. + // reset(); nextReconnect = STimeNow() + delay; nextActivity = min(nextActivity, nextReconnect.load()); return PeerPostPollStatus::SOCKET_CLOSED; diff --git a/sqlitecluster/SQLitePeer.h b/sqlitecluster/SQLitePeer.h index 48bc3433c..7f98a8734 100644 --- a/sqlitecluster/SQLitePeer.h +++ b/sqlitecluster/SQLitePeer.h @@ -104,6 +104,7 @@ class SQLitePeer { mutable recursive_mutex peerMutex; // Not named with an underscore because it's only sort-of private (see friend class declaration above). + public: STCPManager::Socket* socket = nullptr; }; From 5f74f581fce6dcdb61af9013909a8b0db491fcb6 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Mon, 13 Jan 2025 15:06:18 -0800 Subject: [PATCH 2/2] Cleanup --- sqlitecluster/SQLiteNode.cpp | 14 -------------- sqlitecluster/SQLitePeer.cpp | 2 -- sqlitecluster/SQLitePeer.h | 3 ++- 3 files changed, 2 insertions(+), 17 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index d0c4e0427..87edfd469 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -2566,22 +2566,9 @@ STCPManager::Socket* SQLiteNode::_acceptSocket() { void SQLiteNode::_processPeerMessages(uint64_t& nextActivity, SQLitePeer* peer, bool unlimited) { try { size_t messagesDeqeued = 0; - if (unlimited) { - if (peer->socket) { - string recvBuffer = peer->socket->recvBuffer.c_str(); - if (recvBuffer.size()) { - SINFO("TYLER: peer recv buffer " << peer->socket->recvBuffer); - } - } else { - SINFO("TYLER: no socket"); - } - } while (true) { SData message = peer->popMessage(); _onMESSAGE(peer, message); - if (unlimited) { - SINFO("TYLER: processed message with unlimited set " << message.methodLine); - } messagesDeqeued++; if (messagesDeqeued >= 100 && !unlimited) { // We should run again immediately, we have more to do. @@ -2664,7 +2651,6 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { // Now check established peer connections. for (SQLitePeer* peer : _peerList) { auto result = peer->postPoll(fdm, nextActivity); - switch (result) { case SQLitePeer::PeerPostPollStatus::JUST_CONNECTED: { diff --git a/sqlitecluster/SQLitePeer.cpp b/sqlitecluster/SQLitePeer.cpp index 0053f8fc2..abc50b1d2 100644 --- a/sqlitecluster/SQLitePeer.cpp +++ b/sqlitecluster/SQLitePeer.cpp @@ -121,8 +121,6 @@ SQLitePeer::PeerPostPollStatus SQLitePeer::postPoll(fd_map& fdm, uint64_t& nextA } else { SHMMM("Lost peer connection after " << (STimeNow() - socket->openTime) / 1000 << "ms, reconnecting in " << delay / 1000 << "ms"); } - // Can't reset until the buffer is cleared. - // reset(); nextReconnect = STimeNow() + delay; nextActivity = min(nextActivity, nextReconnect.load()); return PeerPostPollStatus::SOCKET_CLOSED; diff --git a/sqlitecluster/SQLitePeer.h b/sqlitecluster/SQLitePeer.h index 7f98a8734..a11d5a075 100644 --- a/sqlitecluster/SQLitePeer.h +++ b/sqlitecluster/SQLitePeer.h @@ -45,6 +45,8 @@ class SQLitePeer { // If there are no messages, throws `std::out_of_range`. SData popMessage(); + // NOTE: If this returns PeerPostPollStatus::SOCKET_CLOSED then the caller must call `reset` on this peer. + // This is not done internally becuase we need to expose the outstanding data on the socket before deleting it. PeerPostPollStatus postPoll(fd_map& fdm, uint64_t& nextActivity); // Send a message to this peer. Thread-safe. @@ -104,7 +106,6 @@ class SQLitePeer { mutable recursive_mutex peerMutex; // Not named with an underscore because it's only sort-of private (see friend class declaration above). - public: STCPManager::Socket* socket = nullptr; };