Skip to content

Commit

Permalink
Review feedback from @bthomee and @vvysokikh1:
Browse files Browse the repository at this point in the history
- Rewrite the locking in DatabaseRotatingImp::rotateWithLock to use
  a shared_lock, and write a unit test to show (as much as possible)
  that it won't deadlock.
  • Loading branch information
ximinez committed Feb 7, 2025
1 parent 9f564bc commit 13fb47c
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 33 deletions.
116 changes: 116 additions & 0 deletions src/test/app/SHAMapStore_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
#include <test/jtx.h>
#include <test/jtx/envconfig.h>
#include <xrpld/app/main/Application.h>
#include <xrpld/app/main/NodeStoreScheduler.h>
#include <xrpld/app/misc/SHAMapStore.h>
#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
#include <xrpld/core/ConfigSections.h>
#include <xrpld/nodestore/detail/DatabaseRotatingImp.h>
#include <xrpl/protocol/jss.h>
#include <boost/lexical_cast.hpp>
#include <thread>

namespace ripple {
namespace test {
Expand Down Expand Up @@ -518,12 +522,124 @@ class SHAMapStore_test : public beast::unit_test::suite
lastRotated = ledgerSeq - 1;
}

std::unique_ptr<NodeStore::Backend>
makeBackendRotating(
jtx::Env& env,
NodeStoreScheduler& scheduler,
std::string path)
{
Section section{
env.app().config().section(ConfigSection::nodeDatabase())};
boost::filesystem::path newPath;

if (!BEAST_EXPECT(path.size()))
return {};
newPath = path;
section.set("path", newPath.string());

auto backend{NodeStore::Manager::instance().make_Backend(
section,
megabytes(env.app().config().getValueFor(
SizedItem::burstSize, std::nullopt)),
scheduler,
env.app().logs().journal("NodeStoreTest"))};
backend->open();
return backend;
}
void
testRotateWithLockContention()
{
// The only purpose of this test is to ensure that if something that
// should never happen happens, we don't get a deadlock.
testcase("rotate with lock contention");

using namespace jtx;
Env env(*this, envconfig(onlineDelete));

/////////////////////////////////////////////////////////////
// Create the backend. Normally, SHAMapStoreImp handles all these
// details
auto nscfg = env.app().config().section(ConfigSection::nodeDatabase());
nscfg.set(
NodeStore::DatabaseRotatingImp::unitTestFlag, std::to_string(true));

// Provide default values:
if (!nscfg.exists("cache_size"))
nscfg.set(
"cache_size",
std::to_string(env.app().config().getValueFor(
SizedItem::treeCacheSize, std::nullopt)));

if (!nscfg.exists("cache_age"))
nscfg.set(
"cache_age",
std::to_string(env.app().config().getValueFor(
SizedItem::treeCacheAge, std::nullopt)));

NodeStoreScheduler scheduler(env.app().getJobQueue());

std::string const writableDb = "write";
std::string const archiveDb = "archive";
auto writableBackend = makeBackendRotating(env, scheduler, writableDb);
auto archiveBackend = makeBackendRotating(env, scheduler, archiveDb);

// Create NodeStore with two backends to allow online deletion of
// data
constexpr int readThreads = 4;
auto dbr = std::make_unique<NodeStore::DatabaseRotatingImp>(
scheduler,
readThreads,
std::move(writableBackend),
std::move(archiveBackend),
nscfg,
env.app().logs().journal("NodeStoreTest"));

/////////////////////////////////////////////////////////////
// Create the impossible situation. Get several calls to rotateWithLock
// going in parallel using a callback that just delays
using namespace std::chrono_literals;
std::atomic<int> threadNum = 0;
auto const cb = [&](std::string const& writableBackendName) {
using namespace std::chrono_literals;
BEAST_EXPECT(writableBackendName == "write");
auto newBackend = makeBackendRotating(
env, scheduler, std::to_string(++threadNum));
std::this_thread::sleep_for(5s);
return newBackend;
};

std::atomic<int> successes = 0;
std::atomic<int> failures = 0;
std::vector<std::thread> threads;
threads.reserve(5);
for (int i = 0; i < 5; ++i)
{
threads.emplace_back([&]() {
auto const result = dbr->rotateWithLock(cb);
if (result)
++successes;
else
++failures;
});
}
for (auto& t : threads)
{
t.join();
}
BEAST_EXPECT(successes == 1);
BEAST_EXPECT(failures == 4);
// Only one thread will invoke the callback to increment threadNum
BEAST_EXPECT(threadNum == 1);
BEAST_EXPECT(dbr->getName() == "1");
}

void
run() override
{
testClear();
testAutomatic();
testCanDelete();
testRotateWithLockContention();
}
};

Expand Down
31 changes: 19 additions & 12 deletions src/xrpld/app/misc/SHAMapStoreImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,18 +366,25 @@ SHAMapStoreImp::run()

lastRotated = validatedSeq;

dbRotating_->rotateWithLock(
[&](std::string const& writableBackendName) {
SavedState savedState;
savedState.writableDb = newBackend->getName();
savedState.archiveDb = writableBackendName;
savedState.lastRotated = lastRotated;
state_db_.setState(savedState);

clearCaches(validatedSeq);

return std::move(newBackend);
});
if (!dbRotating_->rotateWithLock(
[&](std::string const& writableBackendName) {
SavedState savedState;
savedState.writableDb = newBackend->getName();
savedState.archiveDb = writableBackendName;
savedState.lastRotated = lastRotated;
state_db_.setState(savedState);

clearCaches(validatedSeq);

return std::move(newBackend);
}))
{
JLOG(journal_.error())
<< validatedSeq
<< " rotation failed. Discard unused new backend.";
newBackend->setDeletePath();
return;
}

JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
}
Expand Down
3 changes: 2 additions & 1 deletion src/xrpld/nodestore/DatabaseRotating.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class DatabaseRotating : public Database
@param f A function executed before the rotation
*/
virtual void
[[nodiscard]]
virtual bool
rotateWithLock(std::function<std::unique_ptr<NodeStore::Backend>(
std::string const& writableBackendName)> const& f) = 0;
};
Expand Down
61 changes: 47 additions & 14 deletions src/xrpld/nodestore/detail/DatabaseRotatingImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,55 +33,88 @@ DatabaseRotatingImp::DatabaseRotatingImp(
: DatabaseRotating(scheduler, readThreads, config, j)
, writableBackend_(std::move(writableBackend))
, archiveBackend_(std::move(archiveBackend))
, unitTest_(get<bool>(config, unitTestFlag, false))
{
if (writableBackend_)
fdRequired_ += writableBackend_->fdRequired();
if (archiveBackend_)
fdRequired_ += archiveBackend_->fdRequired();
}

void
[[nodiscard]] bool
DatabaseRotatingImp::rotateWithLock(
std::function<std::unique_ptr<NodeStore::Backend>(
std::string const& writableBackendName)> const& f)
{
// backendMutex_ prevents writableBackend_ and archiveBackend_ from changing
// while the main mutex_ is released during the callback.
std::lock_guard backendLock(backendMutex_);
// This function should be the only one taking any kind of unique/write
// lock, and should only be called once at a time by its syncronous caller.
// The extra checking involving the "rotating" flag, are to ensure that if
// that ever changes, we still avoid deadlocks and incorrect behavior.

{
std::unique_lock writeLock(mutex_);
if (!rotating)
{
// Once this flag is set, we're committed to doing the work and
// returning true.
rotating = true;
}
else
{
// This should only be reachable through unit tests.
XRPL_ASSERT(
unitTest_,
"ripple::NodeStore::DatabaseRotatingImp::rotateWithLock "
"unit testing");
return false;
}
}
auto const writableBackend = [&] {
std::lock_guard lock(mutex_);
std::shared_lock readLock(mutex_);
XRPL_ASSERT(
rotating,
"ripple::NodeStore::DatabaseRotatingImp::rotateWithLock rotating "
"flag set");

return writableBackend_;
}();

auto newBackend = f(writableBackend->getName());

std::lock_guard lock(mutex_);
// Because of the "rotating" flag, there should be no way any other thread
// is waiting at this point. As long as they all release the shared_lock
// before taking the unique_lock (which they have to, because upgrading is
// unsupported), there can be no deadlock.
std::unique_lock writeLock(mutex_);

archiveBackend_->setDeletePath();
archiveBackend_ = std::move(writableBackend_);
writableBackend_ = std::move(newBackend);

rotating = false;

return true;
}

std::string
DatabaseRotatingImp::getName() const
{
std::lock_guard lock(mutex_);
std::shared_lock lock(mutex_);
return writableBackend_->getName();
}

std::int32_t
DatabaseRotatingImp::getWriteLoad() const
{
std::lock_guard lock(mutex_);
std::shared_lock lock(mutex_);
return writableBackend_->getWriteLoad();
}

void
DatabaseRotatingImp::importDatabase(Database& source)
{
auto const backend = [&] {
std::lock_guard lock(mutex_);
std::shared_lock lock(mutex_);
return writableBackend_;
}();

Expand All @@ -91,7 +124,7 @@ DatabaseRotatingImp::importDatabase(Database& source)
void
DatabaseRotatingImp::sync()
{
std::lock_guard lock(mutex_);
std::shared_lock lock(mutex_);
writableBackend_->sync();
}

Expand All @@ -105,7 +138,7 @@ DatabaseRotatingImp::store(
auto nObj = NodeObject::createObject(type, std::move(data), hash);

auto const backend = [&] {
std::lock_guard lock(mutex_);
std::shared_lock lock(mutex_);
return writableBackend_;
}();

Expand Down Expand Up @@ -159,7 +192,7 @@ DatabaseRotatingImp::fetchNodeObject(
std::shared_ptr<NodeObject> nodeObject;

auto [writable, archive] = [&] {
std::lock_guard lock(mutex_);
std::shared_lock lock(mutex_);
return std::make_pair(writableBackend_, archiveBackend_);
}();

Expand All @@ -173,7 +206,7 @@ DatabaseRotatingImp::fetchNodeObject(
{
{
// Refresh the writable backend pointer
std::lock_guard lock(mutex_);
std::shared_lock lock(mutex_);
writable = writableBackend_;
}

Expand All @@ -194,7 +227,7 @@ DatabaseRotatingImp::for_each(
std::function<void(std::shared_ptr<NodeObject>)> f)
{
auto [writable, archive] = [&] {
std::lock_guard lock(mutex_);
std::shared_lock lock(mutex_);
return std::make_pair(writableBackend_, archiveBackend_);
}();

Expand Down
18 changes: 12 additions & 6 deletions src/xrpld/nodestore/detail/DatabaseRotatingImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

#include <xrpld/nodestore/DatabaseRotating.h>

#include <mutex>
#include <shared_mutex>

namespace ripple {
namespace NodeStore {
Expand All @@ -48,7 +48,7 @@ class DatabaseRotatingImp : public DatabaseRotating
stop();
}

void
[[nodiscard]] bool
rotateWithLock(
std::function<std::unique_ptr<NodeStore::Backend>(
std::string const& writableBackendName)> const& f) override;
Expand Down Expand Up @@ -79,14 +79,20 @@ class DatabaseRotatingImp : public DatabaseRotating
void
sweep() override;

// Include the space in the name to ensure it can't be set in a file
static constexpr auto unitTestFlag = " unit_test";

private:
// backendMutex_ is only needed when the *Backend_ members are modified.
// Reads are protected by the general mutex_.
std::mutex backendMutex_;
bool const unitTest_;
bool rotating = false;
std::shared_ptr<Backend> writableBackend_;
std::shared_ptr<Backend> archiveBackend_;

mutable std::mutex mutex_;
// https://en.cppreference.com/w/cpp/thread/shared_timed_mutex/lock
// "Shared mutexes do not support direct transition from shared to unique
// ownership mode: the shared lock has to be relinquished with
// unlock_shared() before exclusive ownership may be obtained with lock()."
mutable std::shared_timed_mutex mutex_;

std::shared_ptr<NodeObject>
fetchNodeObject(
Expand Down

0 comments on commit 13fb47c

Please sign in to comment.