Skip to content

Commit

Permalink
Merge pull request #1935 from Expensify/tyler-hctree-2
Browse files Browse the repository at this point in the history
Improve startup/shutdown speed
  • Loading branch information
johnmlee101 authored Nov 21, 2024
2 parents 00c50b2 + 7d1ee38 commit 58f14cb
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 12 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ libstuff/libstuff.d
libstuff/libstuff.h.gch
.idea
.clangd
.clang-tidy
.nfs*
.cache
compile_commands.json
Expand Down
17 changes: 14 additions & 3 deletions BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "BedrockServer.h"

#include <arpa/inet.h>
#include <csignal>
#include <cstring>
#include <fstream>
#include <sys/resource.h>
Expand Down Expand Up @@ -217,15 +218,15 @@ void BedrockServer::sync()
_syncNode->beginShutdown();

// This will cause us to skip the next `poll` iteration which avoids a 1 second wait.
_notifyDone.push(true);
_notifyDoneSync.push(true);
}

// The fd_map contains a list of all file descriptors (eg, sockets, Unix pipes) that poll will wait on for
// activity. Once any of them has activity (or the timeout ends), poll will return.
fd_map fdm;

// Pre-process any sockets the sync node is managing (i.e., communication with peer nodes).
_notifyDone.prePoll(fdm);
_notifyDoneSync.prePoll(fdm);
_syncNode->prePoll(fdm);

// Add our command queues to our fd_map.
Expand All @@ -236,6 +237,9 @@ void BedrockServer::sync()
{
AutoTimerTime pollTime(pollTimer);
S_poll(fdm, max(nextActivity, now) - now);
if (SCheckSignal(SIGTERM)) {
_notifyDone.push(true);
}
}

// And set our next timeout for 1 second from now.
Expand All @@ -251,7 +255,7 @@ void BedrockServer::sync()
AutoTimerTime postPollTime(postPollTimer);
_syncNode->postPoll(fdm, nextActivity);
_syncNodeQueuedCommands.postPoll(fdm);
_notifyDone.postPoll(fdm);
_notifyDoneSync.postPoll(fdm);
}

// Ok, let the sync node to it's updating for as many iterations as it requires. We'll update the replication
Expand Down Expand Up @@ -350,6 +354,9 @@ void BedrockServer::sync()
committingCommand = true;
_syncNode->startCommit(SQLiteNode::QUORUM);
_lastQuorumCommandTime = STimeNow();

// This interrupts the next poll loop immediately. This prevents a 1-second wait when running as a single server.
_notifyDoneSync.push(true);
SDEBUG("Finished sending distributed transaction for db upgrade.");

// As it's a quorum commit, we'll need to read from peers. Let's start the next loop iteration.
Expand Down Expand Up @@ -378,6 +385,7 @@ void BedrockServer::sync()
_upgradeInProgress = false;
_upgradeCompleted = true;
SINFO("UpgradeDB succeeded, done.");
_notifyDone.push(true);
} else {
SINFO("UpgradeDB failed, trying again.");
}
Expand Down Expand Up @@ -1224,6 +1232,9 @@ BedrockServer::BedrockServer(const SData& args_)
{
_version = VERSION;

// This allows the signal thread to notify us when a signal is received to interrupt the current poll loop.
SSIGNAL_NOTIFY_INTERRUPT = &_notifyDoneSync;

// Enable the requested plugins, and update our version string if required.
list<string> pluginNameList = SParseList(args["-plugins"]);
SINFO("Loading plugins: " << args["-plugins"]);
Expand Down
4 changes: 2 additions & 2 deletions BedrockServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,9 @@ class BedrockServer : public SQLiteServer {
// We call this method whenever a node changes state
void notifyStateChangeToPlugins(SQLite& db, SQLiteNodeState newState) override;

// This is just here to allow `poll` in main.cpp to get interrupted when the server shuts down.
// to wait up to a full second for them.
// These are just here to allow `poll` in main.cpp to get interrupted when the server shuts down.
SSynchronizedQueue<bool> _notifyDone;
SSynchronizedQueue<bool> _notifyDoneSync;

atomic<size_t> _maxSocketThreads{3'000};
atomic<size_t> _dbPoolSize{25'000};
Expand Down
7 changes: 7 additions & 0 deletions libstuff/SSignal.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "libstuff.h"
#include "SSynchronizedQueue.h"
#include <sqlitecluster/SQLiteNode.h>
#include <cxxabi.h>
#include <execinfo.h>
Expand All @@ -17,6 +18,8 @@ void SSetSignalHandlerDieFunc(function<void()>&& func) {
constexpr auto sigStackSize{1024*64};
char __SIGSTACK[sigStackSize];

void* SSIGNAL_NOTIFY_INTERRUPT;

// The function to call in our thread that handles signals.
void _SSignal_signalHandlerThreadFunc();

Expand Down Expand Up @@ -163,6 +166,10 @@ void _SSignal_signalHandlerThreadFunc() {
_SSignal_pendingSignalBitMask.fetch_or(1 << signum);
}
}

if (SSIGNAL_NOTIFY_INTERRUPT) {
static_cast<SSynchronizedQueue<bool>*>(SSIGNAL_NOTIFY_INTERRUPT)->push(true);
}
}
}

Expand Down
1 change: 1 addition & 0 deletions libstuff/SSynchronizedQueue.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <libstuff/libstuff.h>
#include <unistd.h>
#include <fcntl.h>

Expand Down
2 changes: 2 additions & 0 deletions libstuff/libstuff.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ using namespace std;
// Global indicating whether we're running the server on dev or production.
extern atomic<bool> GLOBAL_IS_LIVE;

extern void* SSIGNAL_NOTIFY_INTERRUPT;

// Initialize libstuff on every thread before calling any of its functions
void SInitialize(string threadName = "", const char* processName = 0);

Expand Down
6 changes: 3 additions & 3 deletions test/lib/BedrockTester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ string BedrockTester::startServer(bool wait) {
return result[0].content;
}
// This will happen if the server's not up yet. We'll just try again.
usleep(100000); // 0.1 seconds.
usleep(50'000); // 0.05 seconds.
continue;
}
}
Expand All @@ -293,7 +293,7 @@ string BedrockTester::executeWaitVerifyContent(SData request, const string& expe
uint64_t start = STimeNow();
vector<SData> results;
do {
results = executeWaitMultipleData({request}, 1, control);
results = BedrockTester::executeWaitMultipleData({request}, 1, control);

if (results.size() > 0 && SStartsWith(results[0].methodLine, expectedResult)) {
// good, got the result we wanted
Expand Down Expand Up @@ -608,7 +608,7 @@ bool BedrockTester::waitForStatusTerm(const string& term, const string& testValu
uint64_t start = STimeNow();
while (STimeNow() < start + timeoutUS) {
try {
string result = SParseJSONObject(executeWaitVerifyContent(SData("Status"), "200", true))[term];
string result = SParseJSONObject(BedrockTester::executeWaitVerifyContent(SData("Status"), "200", true))[term];

// if the value matches, return, otherwise wait
if (result == testValue) {
Expand Down
8 changes: 4 additions & 4 deletions test/lib/BedrockTester.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class BedrockTester {
atomic<uint64_t>* alternateCounter = nullptr);

// Destructor.
~BedrockTester();
virtual ~BedrockTester();

// Start the server. If `wait` is specified, wait until the server is fully up with the command port open and
// accepting requests. Otherwise, returns as soon as the control port is open and can return `Status`.
Expand Down Expand Up @@ -65,17 +65,17 @@ class BedrockTester {
// Takes a list of requests, and returns a corresponding list of responses.
// Uses `connections` parallel connections to the server to send the requests.
// If `control` is set, sends the message to the control port.
vector<SData> executeWaitMultipleData(vector<SData> requests, int connections = 10, bool control = false, bool returnOnDisconnect = false, int* errorCode = nullptr);
virtual vector<SData> executeWaitMultipleData(vector<SData> requests, int connections = 10, bool control = false, bool returnOnDisconnect = false, int* errorCode = nullptr);

// Sends a single request, returning the response content.
// If the response method line doesn't begin with the expected result, throws.
// Convenience wrapper around executeWaitMultipleData.
string executeWaitVerifyContent(SData request, const string& expectedResult = "200 OK", bool control = false, uint64_t retryTimeoutUS = 0);
virtual string executeWaitVerifyContent(SData request, const string& expectedResult = "200 OK", bool control = false, uint64_t retryTimeoutUS = 0);

// Sends a single request, returning the response content as a STable.
// If the response method line doesn't begin with the expected result, throws.
// Convenience wrapper around executeWaitMultipleData.
STable executeWaitVerifyContentTable(SData request, const string& expectedResult = "200 OK");
virtual STable executeWaitVerifyContentTable(SData request, const string& expectedResult = "200 OK");

// Read from the DB file, without going through the bedrock server. Two interfaces are provided to maintain
// compatibility with the `SQLite` class.
Expand Down

0 comments on commit 58f14cb

Please sign in to comment.