Skip to content

Commit

Permalink
Most of the junk is cleaned up.
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerkaraszewski committed Jan 14, 2025
1 parent 8bc0851 commit 05aa8c9
Showing 1 changed file with 2 additions and 36 deletions.
38 changes: 2 additions & 36 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,6 @@ void SQLiteNode::_replicate() {
SQLiteScopedHandle dbScope(*_dbPool, _dbPool->getIndex(false));
SQLite& db = dbScope.db();

bool skipNext = false;
uint64_t commitNumber = 0;

while (true) {
unique_lock<mutex> lock(_replicateMutex);
while (!_replicateThreadShouldExit && _replicateQueue.empty()) {
Expand All @@ -190,44 +187,15 @@ void SQLiteNode::_replicate() {
_replicateQueue.pop();
uint64_t dequeueTime = STimeNow();

if (skipNext) {
skipNext = false;
continue;
}

if (SIEquals(command.methodLine, "BEGIN_TRANSACTION")) {
uint64_t messageCommitCount = command.calcU64("newCount");
uint64_t myCommitCount = db.getCommitCount();
if (myCommitCount >= messageCommitCount) {
SALERT("Got BEGIN_TRANSACTION for commit " << messageCommitCount << " but have commit " << myCommitCount);
skipNext = true;
continue;
}
}

bool shouldGoSearchingAndExit = false;
try {
if (SIEquals(command.methodLine, "BEGIN_TRANSACTION")) {
auto start = chrono::steady_clock::now();
_handleBeginTransaction(db, peer, command);
_handlePrepareTransaction(db, peer, command, dequeueTime);
auto duration = chrono::steady_clock::now() - start;
SINFO("[performance] Wrote replicate transaction in " << chrono::duration_cast<chrono::microseconds>(duration).count() << "us.");
}
commitNumber = command.calcU64("newCount");
} catch (const SQLite::constraint_error& e) {
// A contraints error can happen in a situation where we're forked because we can try and insert the same row
// twice, violating the uniqueness of the key. This happens *before* we compute the hash of the entire trandsaction,
// so we see the constraint_error first.
SALERT("constraint_error in begin/prepare. CommitCount:" << db.getCommitCount() << ", message: " << command.serialize());
db.rollback();
}

bool shouldGoSearchingAndExit = false;
try {
if (commitNumber != command.calcU64("newCount")) {
SALERT("Instructed to commit transaction: " << command.calcU64("newCount") << " but expected " << commitNumber);
}
if (SIEquals(command.methodLine, "COMMIT_TRANSACTION")) {
} else if (SIEquals(command.methodLine, "COMMIT_TRANSACTION")) {
int result = _handleCommitTransaction(db, peer, command.calcU64("NewCount"), command["NewHash"]);
if (result != SQLITE_OK) {
STHROW("commit failed");
Expand All @@ -236,7 +204,6 @@ void SQLiteNode::_replicate() {
_handleRollbackTransaction(db, peer, command);
shouldGoSearchingAndExit = true;
}

} catch (const SException& e) {
SALERT("Caught SException in replication thread. Assuming this means we want to stop following. Exception: " << e.what());
shouldGoSearchingAndExit = true;
Expand Down Expand Up @@ -1855,7 +1822,6 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance
delete _replicateThread;
_replicateThread = nullptr;
while (_replicateQueue.size()) {
SALERT("Discarding: " << _replicateQueue.front().second.methodLine);
_replicateQueue.pop();
}
_replicateThreadShouldExit = false;
Expand Down

0 comments on commit 05aa8c9

Please sign in to comment.