Skip to content

Commit

Permalink
Cleanup pass 1
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerkaraszewski committed Jan 14, 2025
1 parent f559240 commit 89e2a3b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 55 deletions.
71 changes: 18 additions & 53 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,17 +166,18 @@ void SQLiteNode::_replicate() {
SQLiteScopedHandle dbScope(*_dbPool, _dbPool->getIndex(false));
SQLite& db = dbScope.db();

while (!_replicateThreadShouldExit) {
// If this happens mid-commit, we need to rollback.
while (true) {
unique_lock<mutex> lock(_replicateMutex);
while (!_replicateThreadShouldExit && _replicateQueue.empty()) {
// Note, we need to interrupt this on purpose to exit the thread.
// We'll set _replicationThreadsShouldExit and `notify_one` without adding any actual work.
_replicateCV.wait(lock);
}

// If _replicationThreadsShouldExit was set while we were waiting, we will exit straight away.
// This needs to be here because it's possible we won't have any work in the queue in this case.
if (_replicateThreadShouldExit) {
if (db.insideTransaction()) {
db.rollback();
}
return;
}

Expand All @@ -186,56 +187,19 @@ void SQLiteNode::_replicate() {
_replicateQueue.pop();
uint64_t dequeueTime = STimeNow();

// Now we are locked again, and there's work in the queue to do.
bool goSearchingOnExit = false;
{
if (SIEquals(command.methodLine, "BEGIN_TRANSACTION")) {
uint64_t newCount = command.calcU64("NewCount");
uint64_t currentCount = newCount - 1;

// Transactions are either ASYNC or QUORUM. QUORUM transactions can only start when the DB is completely
// up-to-date. ASYNC transactions can start as soon as the DB is at `dbCountAtStart` (the same value that
// the DB was at when the transaction began on leader).
bool quorum = !SStartsWith(command["ID"], "ASYNC");
uint64_t waitForCount = SStartsWith(command["ID"], "ASYNC") ? command.calcU64("dbCountAtStart") : currentCount;
SINFO("[performance] BEGIN_TRANSACTION replicate thread for commit " << newCount << " waiting on DB count " << waitForCount << " (" << (quorum ? "QUORUM" : "ASYNC") << ")");

auto start = chrono::steady_clock::now();
_handleBeginTransaction(db, peer, command);
_handlePrepareTransaction(db, peer, command, dequeueTime);
auto duration = chrono::steady_clock::now() - start;
SINFO("[performance] Wrote replicate transaction in " << chrono::duration_cast<chrono::microseconds>(duration).count() << "us.");
} else if (SIEquals(command.methodLine, "COMMIT_TRANSACTION")) {
try {
int result = -1;
int commitAttemptCount = 1;
while (result != SQLITE_OK) {
if (commitAttemptCount > 1) {
SINFO("Commit attempt number " << commitAttemptCount << " for concurrent replication.");
}
SINFO("[performance] BEGIN for commit " << newCount);
bool uniqueContraintsError = false;
try {
auto start = chrono::steady_clock::now();
_handleBeginTransaction(db, peer, command, commitAttemptCount > 1);

// Ok, almost ready.
// Note:: calls _sendToPeer() which is a write operation.
_handlePrepareTransaction(db, peer, command, dequeueTime);
auto duration = chrono::steady_clock::now() - start;
SINFO("[performance] Wrote replicate transaction in " << chrono::duration_cast<chrono::microseconds>(duration).count() << "us.");
} catch (const SQLite::constraint_error& e) {
// We could `continue` immediately upon catching this exception, but instead, we wait for the
// leader commit notifier to be ready. This prevents us from spinning in an endless loop on the
// same error over and over until whatever thread we're waiting for finishes.
uniqueContraintsError = true;
}
if (uniqueContraintsError) {
SINFO("Got unique constraints error in replication, restarting.");
db.rollback();
continue;
}

// Leader says it has committed this transaction, so we can too.
++commitAttemptCount;
result = _handleCommitTransaction(db, peer, command.calcU64("NewCount"), command["NewHash"]);
if (result != SQLITE_OK) {
db.rollback();
}
int result = _handleCommitTransaction(db, peer, command.calcU64("NewCount"), command["NewHash"]);
if (result != SQLITE_OK) {
STHROW("commit failed");
}
} catch (const SException& e) {
SALERT("Caught exception in replication thread. Assuming this means we want to stop following. Exception: " << e.what());
Expand Down Expand Up @@ -1854,8 +1818,9 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance
// If we were following, and now we're not, we give up an any replications.
if (_state == SQLiteNodeState::FOLLOWING) {
_replicateThreadShouldExit = true;
_replicateCV.notify_one();
_replicateThread->join();
delete(_replicateThread);
delete _replicateThread;
_replicateThread = nullptr;
_replicateThreadShouldExit = false;

Expand Down Expand Up @@ -2222,7 +2187,7 @@ bool SQLiteNode::_majoritySubscribed() const {
return (numFullFollowers * 2 >= numFullPeers);
}

void SQLiteNode::_handleBeginTransaction(SQLite& db, SQLitePeer* peer, const SData& message, bool wasConflict) {
void SQLiteNode::_handleBeginTransaction(SQLite& db, SQLitePeer* peer, const SData& message) {
// BEGIN_TRANSACTION: Sent by the LEADER to all subscribed followers to begin a new distributed transaction. Each
// follower begins a local transaction with this query and responds APPROVE_TRANSACTION. If the follower cannot start
// the transaction for any reason, it is broken somehow -- disconnect from the leader.
Expand All @@ -2235,7 +2200,7 @@ void SQLiteNode::_handleBeginTransaction(SQLite& db, SQLitePeer* peer, const SDa
STHROW("already in a transaction");
}

if (!db.beginTransaction(wasConflict ? SQLite::TRANSACTION_TYPE::EXCLUSIVE : SQLite::TRANSACTION_TYPE::SHARED)) {
if (!db.beginTransaction(SQLite::TRANSACTION_TYPE::EXCLUSIVE)) {
STHROW("failed to begin transaction");
}

Expand Down
3 changes: 1 addition & 2 deletions sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <libstuff/STCPManager.h>
#include <sqlitecluster/SQLite.h>
#include <sqlitecluster/SQLitePool.h>
#include <sqlitecluster/SQLiteSequentialNotifier.h>

#include <mutex>
#include <condition_variable>
Expand Down Expand Up @@ -229,7 +228,7 @@ class SQLiteNode : public STCPManager {
string _getLostQuorumLogMessage() const;

// Handlers for transaction messages.
void _handleBeginTransaction(SQLite& db, SQLitePeer* peer, const SData& message, bool wasConflict);
void _handleBeginTransaction(SQLite& db, SQLitePeer* peer, const SData& message);
void _handlePrepareTransaction(SQLite& db, SQLitePeer* peer, const SData& message, uint64_t dequeueTime);
int _handleCommitTransaction(SQLite& db, SQLitePeer* peer, const uint64_t commandCommitCount, const string& commandCommitHash);
void _handleRollbackTransaction(SQLite& db, SQLitePeer* peer, const SData& message);
Expand Down

0 comments on commit 89e2a3b

Please sign in to comment.