Skip to content

Commit

Permalink
Verification for secondary test fails but at least it is checking som…
Browse files Browse the repository at this point in the history
…ething
  • Loading branch information
archang19 committed Jan 7, 2025
1 parent d84b10b commit de7f11c
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 57 deletions.
8 changes: 6 additions & 2 deletions db_stress_tool/db_stress_shared_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ class SharedState {

Status Restore(DB* db) { return expected_state_manager_->Restore(db); }

Status GetExpectedState(DB* db, std::unique_ptr<ExpectedState>& state) {
return expected_state_manager_->GetExpectedState(db, state);
Status SetSecondaryExpectedState(DB* db) {
return expected_state_manager_->SetSecondaryExpectedState(db);
}

// Requires external locking covering all keys in `cf`.
Expand Down Expand Up @@ -299,6 +299,10 @@ class SharedState {
return expected_state_manager_->Get(cf, key);
}

ExpectedValue GetSecondary(int cf, int64_t key) {
return expected_state_manager_->GetSecondary(cf, key);
}

// Prepare a Delete that will be started but not finish yet
// This is useful for crash-recovery testing when the process may crash
// before updating the corresponding expected value
Expand Down
41 changes: 30 additions & 11 deletions db_stress_tool/expected_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ Status FileSnapshotExpectedState::Open(bool create) {
if (create) {
return Status::NotSupported();
}
size_t expected_values_size = GetValuesLen();

Env* default_env = Env::Default();
Status status = default_env->NewMemoryMappedFileBuffer(
Expand All @@ -217,7 +216,7 @@ Status FileSnapshotExpectedState::Open(bool create) {
return status;
}

assert(expected_state_mmap_buffer_->GetLen() == expected_values_size);
assert(expected_state_mmap_buffer_->GetLen() == GetValuesLen());

values_ = static_cast<std::atomic<uint32_t>*>(
expected_state_mmap_buffer_->GetBase());
Expand Down Expand Up @@ -711,9 +710,9 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler,

} // anonymous namespace

Status FileExpectedStateManager::GetExpectedState(
DB* db, std::unique_ptr<ExpectedState>& state) {
std::cout << "Enter FileExpectedStateManager::GetExpectedState" << std::endl;
Status FileExpectedStateManager::SetSecondaryExpectedState(DB* db) {
std::cout << "Enter FileExpectedStateManager::SetSecondaryExpectedState"
<< std::endl;
assert(HasHistory());
SequenceNumber seqno = db->GetLatestSequenceNumber();
if (seqno < saved_seqno_) {
Expand All @@ -722,6 +721,7 @@ Status FileExpectedStateManager::GetExpectedState(
return Status::Corruption("DB is older than any restorable expected state");
}

// Create the trace reader. The trace file must exist.
std::string trace_filename =
std::to_string(saved_seqno_) + kTraceFilenameSuffix;
std::string trace_file_path = GetPathForFilename(trace_filename);
Expand All @@ -744,26 +744,45 @@ Status FileExpectedStateManager::GetExpectedState(
return s;
}

// Create an expected state by replaying the trace
std::string state_filename =
std::to_string(saved_seqno_) + kStateFilenameSuffix;
std::string state_file_path = GetPathForFilename(state_filename);
std::cout << "state_file_path = " << state_file_path << std::endl;

std::string verification_file_temp_path = GetTempPathForFilename(
"verification_" + std::to_string(seqno) + kStateFilenameSuffix);
if (s.ok()) {
s = CopyFile(FileSystem::Default(), state_file_path, Temperature::kUnknown,
verification_file_temp_path, Temperature::kUnknown,
0 /* size */, false /* use_fsync */, nullptr /* io_tracer */);
}

if (!s.ok()) {
return s;
}

std::cout << "verification_file_temp_path: " << verification_file_temp_path
<< std::endl;
std::unique_ptr<FileSnapshotExpectedState> replay_state(
new FileSnapshotExpectedState(state_file_path, max_key_,
new FileSnapshotExpectedState(verification_file_temp_path, max_key_,
num_column_families_));
s = replay_state->Open(false /* create */);
if (!s.ok()) {
std::cout << "Error opening FileSnapshotExpectedState" << std::endl;
return s;
}

s = ReplayTrace(db, std::move(trace_reader), seqno - saved_seqno_,
replay_state.get());
uint64_t write_ops = seqno - saved_seqno_;
std::cout << "Replaying " << write_ops << " operations from trace from "
<< saved_seqno_ << " to " << seqno << std::endl;
s = ReplayTrace(db, std::move(trace_reader), write_ops, replay_state.get());
if (!s.ok()) {
std::cout << "Error replaying trace" << std::endl;
return s;
}
state = std::move(replay_state);
std::cout << "Successful exit from GetExpectedState" << std::endl;
secondary_expected_state_ = std::move(replay_state);
assert(db->GetLatestSequenceNumber() == seqno);
std::cout << "Successful exit from SetSecondaryExpectedState" << std::endl;
return s;
}

Expand Down
13 changes: 7 additions & 6 deletions db_stress_tool/expected_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,7 @@ class ExpectedStateManager {
// is executing.
virtual Status Restore(DB* db) = 0;

virtual Status GetExpectedState(DB* db,
std::unique_ptr<ExpectedState>& state) = 0;
virtual Status SetSecondaryExpectedState(DB* db) = 0;

// Requires external locking covering all keys in `cf`.
void ClearColumnFamily(int cf) { return latest_->ClearColumnFamily(cf); }
Expand All @@ -266,6 +265,10 @@ class ExpectedStateManager {
// See ExpectedState::Get()
ExpectedValue Get(int cf, int64_t key) { return latest_->Get(cf, key); }

ExpectedValue GetSecondary(int cf, int64_t key) {
return secondary_expected_state_->Get(cf, key);
}

// See ExpectedState::PrepareDelete()
PendingExpectedValue PrepareDelete(int cf, int64_t key) {
return latest_->PrepareDelete(cf, key);
Expand Down Expand Up @@ -343,8 +346,7 @@ class FileExpectedStateManager : public ExpectedStateManager {
// file into "LATEST.state".
Status Restore(DB* db) override;

Status GetExpectedState(DB* db,
std::unique_ptr<ExpectedState>& state) override;
Status SetSecondaryExpectedState(DB* db) override;

private:
// Requires external locking preventing concurrent execution with any other
Expand Down Expand Up @@ -391,8 +393,7 @@ class AnonExpectedStateManager : public ExpectedStateManager {
// currently have a need to keep history of expected state within a process.
Status Restore(DB* /* db */) override { return Status::NotSupported(); }

Status GetExpectedState(
DB* /* db */, std::unique_ptr<ExpectedState>& /* state */) override {
Status SetSecondaryExpectedState(DB* /* db */) override {
return Status::NotSupported();
}

Expand Down
102 changes: 64 additions & 38 deletions db_stress_tool/no_batched_ops_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ class NonBatchedOpsStressTest : public StressTest {
s = Status::NotFound();
}

VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
VerifyOrSyncValue(static_cast<int>(cf), i, options, shared,
shared->Get(static_cast<int>(cf), i), from_db,
/* msg_prefix */ "Iterator verification", s);

if (!from_db.empty()) {
Expand All @@ -147,7 +148,8 @@ class NonBatchedOpsStressTest : public StressTest {

Status s = db_->Get(options, column_families_[cf], key, &from_db);

VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
VerifyOrSyncValue(static_cast<int>(cf), i, options, shared,
shared->Get(static_cast<int>(cf), i), from_db,
/* msg_prefix */ "Get verification", s);

if (!from_db.empty()) {
Expand Down Expand Up @@ -182,7 +184,8 @@ class NonBatchedOpsStressTest : public StressTest {
}
}

VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
VerifyOrSyncValue(static_cast<int>(cf), i, options, shared,
shared->Get(static_cast<int>(cf), i), from_db,
/* msg_prefix */ "GetEntity verification", s);

if (!from_db.empty()) {
Expand Down Expand Up @@ -217,7 +220,8 @@ class NonBatchedOpsStressTest : public StressTest {
const std::string from_db = values[j].ToString();

VerifyOrSyncValue(static_cast<int>(cf), i + j, options, shared,
from_db, /* msg_prefix */ "MultiGet verification",
shared->Get(static_cast<int>(cf), i + j), from_db,
/* msg_prefix */ "MultiGet verification",
statuses[j]);

if (!from_db.empty()) {
Expand Down Expand Up @@ -268,9 +272,10 @@ class NonBatchedOpsStressTest : public StressTest {
}
}

VerifyOrSyncValue(
static_cast<int>(cf), i + j, options, shared, from_db,
/* msg_prefix */ "MultiGetEntity verification", statuses[j]);
VerifyOrSyncValue(static_cast<int>(cf), i + j, options, shared,
shared->Get(static_cast<int>(cf), i + j), from_db,
/* msg_prefix */ "MultiGetEntity verification",
statuses[j]);

if (!from_db.empty()) {
PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i + j),
Expand Down Expand Up @@ -321,7 +326,8 @@ class NonBatchedOpsStressTest : public StressTest {
from_db = values[number_of_operands - 1].ToString();
}

VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
VerifyOrSyncValue(static_cast<int>(cf), i, options, shared,
shared->Get(static_cast<int>(cf), i), from_db,
/* msg_prefix */ "GetMergeOperands verification",
s);

Expand All @@ -339,18 +345,6 @@ class NonBatchedOpsStressTest : public StressTest {
return;
}

if (thread->shared->HasHistory()) {
std::unique_ptr<ExpectedState> state;
Status getExpectedStateStatus =
thread->shared->GetExpectedState(db_, state);
if (!getExpectedStateStatus.ok()) {
std::cout << "[NonBatchedOpsStressTest::ContinuouslyVerifyDb]: Failed "
"to get expected state"
<< std::endl;
assert(false);
}
}

assert(cmp_db_);
assert(!cmp_cfhs_.empty());
Status s = cmp_db_->TryCatchUpWithPrimary();
Expand All @@ -359,6 +353,48 @@ class NonBatchedOpsStressTest : public StressTest {
exit(1);
}

auto* shared = thread->shared;
assert(shared);
const int64_t max_key = shared->GetMaxKey();
ReadOptions read_opts(FLAGS_verify_checksum, true);

// If another thread calls SetSecondaryExpectedState while we are verifying
// the values, then that would mess up our verification if we are using Get
if (thread->shared->HasHistory() && thread->tid == 0) {
uint64_t start_get_expected_state = clock_->NowMicros();
Status setSecondaryExpectedStateStatus =
thread->shared->SetSecondaryExpectedState(cmp_db_);
if (!setSecondaryExpectedStateStatus.ok()) {
std::cout << "[NonBatchedOpsStressTest::ContinuouslyVerifyDb]: Failed "
"to get expected state"
<< std::endl;
assert(false);
}
uint64_t end_get_expected_state = clock_->NowMicros();
std::cout << "Retrieved expected state in "
<< end_get_expected_state - start_get_expected_state
<< " microseconds" << std::endl;

uint64_t start_secondary_scan = clock_->NowMicros();
for (int64_t i = 0; i < max_key; ++i) {
if (thread->shared->HasVerificationFailedYet()) {
break;
}
const std::string key = Key(i);
std::string from_db;
s = cmp_db_->Get(read_opts, column_families_[0], key, &from_db);
if (!VerifyOrSyncValue(
0, i, read_opts, shared, shared->GetSecondary(0, i), from_db,
/* msg_prefix */ "Secondary get verification", s)) {
std::cout << "Failed on key i=" << i << std::endl;
}
}
uint64_t end_secondary_scan = clock_->NowMicros();
std::cout << "Scanned all of secondary db in "
<< end_secondary_scan - start_secondary_scan << " microseconds"
<< std::endl;
}

const auto checksum_column_family = [](Iterator* iter,
uint32_t* checksum) -> Status {
assert(nullptr != checksum);
Expand All @@ -371,10 +407,6 @@ class NonBatchedOpsStressTest : public StressTest {
return iter->status();
};

auto* shared = thread->shared;
assert(shared);
const int64_t max_key = shared->GetMaxKey();
ReadOptions read_opts(FLAGS_verify_checksum, true);
std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
Expand Down Expand Up @@ -1644,9 +1676,11 @@ class NonBatchedOpsStressTest : public StressTest {

std::string from_db;
Status s = db_->Get(read_opts, cfh, k, &from_db);
bool res = VerifyOrSyncValue(
rand_column_family, rand_key, read_opts, shared,
/* msg_prefix */ "Pre-Put Get verification", from_db, s);
// looks like these arguments for msg_prefix and from_db were swapped
bool res =
VerifyOrSyncValue(rand_column_family, rand_key, read_opts, shared,
shared->Get(rand_column_family, rand_key), from_db,
/* msg_prefix */ "Pre-Put Get verification", s);

// Enable back error injection disabled for preparation
if (fault_fs_guard) {
Expand Down Expand Up @@ -2729,21 +2763,13 @@ class NonBatchedOpsStressTest : public StressTest {
}

bool VerifyOrSyncValue(int cf, int64_t key, const ReadOptions& opts,
SharedState* shared, const std::string& value_from_db,
SharedState* shared,
const ExpectedValue& expected_value,
const std::string& value_from_db,
std::string msg_prefix, const Status& s) const {
if (shared->HasVerificationFailedYet()) {
return false;
}
if (shared->HasHistory()) {
std::unique_ptr<ExpectedState> state;
Status getExpectedStateStatus = shared->GetExpectedState(db_, state);
if (!getExpectedStateStatus.ok()) {
std::cout << "[VerifyOrSyncValue]: Failed to get expected state"
<< std::endl;
return false;
}
}
const ExpectedValue expected_value = shared->Get(cf, key);

if (expected_value.PendingWrite() || expected_value.PendingDelete()) {
if (s.ok()) {
Expand Down

0 comments on commit de7f11c

Please sign in to comment.