Skip to content

Commit

Permalink
Process all replciate messages at shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerkaraszewski committed Jan 15, 2025
1 parent a697a4d commit fb5c90a
Showing 1 changed file with 58 additions and 28 deletions.
86 changes: 58 additions & 28 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,27 +166,46 @@ void SQLiteNode::_replicate() {
SQLiteScopedHandle dbScope(*_dbPool, _dbPool->getIndex(false));
SQLite& db = dbScope.db();

SQLitePeer* peer = nullptr;
SData command;
while (true) {
unique_lock<mutex> lock(_replicateMutex);
while (!_replicateThreadShouldExit && _replicateQueue.empty()) {
_replicateCV.wait(lock);
}

// If _replicationThreadsShouldExit was set while we were waiting, we will exit straight away.
// This needs to be here because it's possible we won't have any work in the queue in this case.
if (_replicateThreadShouldExit) {
if (db.insideTransaction()) {
db.rollback();
bool exitWhenQueueEmpty = false;
bool queueEmpty = false;
uint64_t dequeueTime = 0;
{
unique_lock<mutex> lock(_replicateMutex);
while (!_replicateThreadShouldExit && _replicateQueue.empty()) {
_replicateCV.wait(lock);
}

exitWhenQueueEmpty = _replicateThreadShouldExit;
queueEmpty = _replicateQueue.empty();
if (!queueEmpty) {
peer = _replicateQueue.front().first;
command = move(_replicateQueue.front().second);
_replicateQueue.pop();
dequeueTime = STimeNow();
}
}

// If there was no work, we either wait again, or we can exit.
if (queueEmpty) {
// There are no commands to process.
if (exitWhenQueueEmpty) {
if (db.insideTransaction()) {
SINFO("Finished replication mid-transaction, missing COMMIT_TRANSACTION, rolling back.");
db.rollback();
}

// Done with replication.
return;
} else {
// The queue is empty but we're not exiting so go back to the top and wait.
continue;
}
return;
}

// Get the first message from the queue.
SQLitePeer* peer = _replicateQueue.front().first;
SData command = move(_replicateQueue.front().second);
_replicateQueue.pop();
uint64_t dequeueTime = STimeNow();

// At this point, we're guaranteed to have a message. Process it and then run again.
if (SIEquals(command.methodLine, "BEGIN_TRANSACTION")) {
auto start = chrono::steady_clock::now();
_handleBeginTransaction(db, peer, command);
Expand Down Expand Up @@ -1550,17 +1569,22 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
SINFO("Ignoring " << message.methodLine << " in state " << stateName(_state));
return;
}
if (_replicateThreadShouldExit) {
SINFO("Discarding replication message, stopping FOLLOWING");
} else {

bool replicationRunning = false;
{
lock_guard<mutex> lock(_replicateMutex);
if (!_replicateThreadShouldExit) {
_replicateQueue.push(make_pair(peer, message));
replicationRunning = true;
}
}
if (replicationRunning) {
if (!_replicateThread) {
_replicateThread = new thread(&SQLiteNode::_replicate, this);
}
{
lock_guard<mutex> lock(_replicateMutex);
_replicateQueue.push(make_pair(peer, message));
}
_replicateCV.notify_one();
} else {
SINFO("Discarding replication message, stopping FOLLOWING");
}
} else if (SIEquals(message.methodLine, "APPROVE_TRANSACTION") || SIEquals(message.methodLine, "DENY_TRANSACTION")) {
// APPROVE_TRANSACTION: Sent to the leader by a follower when it confirms it was able to begin a transaction and
Expand Down Expand Up @@ -1796,15 +1820,21 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance

// If we were following, and now we're not, we give up an any replications.
if (_state == SQLiteNodeState::FOLLOWING) {
if (_replicateThread) {
{
lock_guard<mutex> lock(_replicateMutex);
_replicateThreadShouldExit = true;
}
if (_replicateThread) {
_replicateCV.notify_one();
_replicateThread->join();
delete _replicateThread;
_replicateThread = nullptr;
while (_replicateQueue.size()) {
_replicateQueue.pop();
}
}
if (_replicateQueue.size()) {
SWARN("Replicate queue contains " << _replicateQueue.size() << " messages at thread shutdown.");
}
{
lock_guard<mutex> lock(_replicateMutex);
_replicateThreadShouldExit = false;
}

Expand Down

0 comments on commit fb5c90a

Please sign in to comment.