diff --git a/db_stress_tool/db_stress_shared_state.h b/db_stress_tool/db_stress_shared_state.h index 199448be124..3e07e2a11af 100644 --- a/db_stress_tool/db_stress_shared_state.h +++ b/db_stress_tool/db_stress_shared_state.h @@ -265,8 +265,8 @@ class SharedState { Status Restore(DB* db) { return expected_state_manager_->Restore(db); } - Status GetExpectedState(DB* db, std::unique_ptr& state) { - return expected_state_manager_->GetExpectedState(db, state); + Status SetSecondaryExpectedState(DB* db) { + return expected_state_manager_->SetSecondaryExpectedState(db); } // Requires external locking covering all keys in `cf`. @@ -299,6 +299,10 @@ class SharedState { return expected_state_manager_->Get(cf, key); } + ExpectedValue GetSecondary(int cf, int64_t key) { + return expected_state_manager_->GetSecondary(cf, key); + } + // Prepare a Delete that will be started but not finish yet // This is useful for crash-recovery testing when the process may crash // before updating the corresponding expected value diff --git a/db_stress_tool/expected_state.cc b/db_stress_tool/expected_state.cc index b228a3ff642..8b0b98d583f 100644 --- a/db_stress_tool/expected_state.cc +++ b/db_stress_tool/expected_state.cc @@ -208,7 +208,6 @@ Status FileSnapshotExpectedState::Open(bool create) { if (create) { return Status::NotSupported(); } - size_t expected_values_size = GetValuesLen(); Env* default_env = Env::Default(); Status status = default_env->NewMemoryMappedFileBuffer( @@ -217,7 +216,7 @@ Status FileSnapshotExpectedState::Open(bool create) { return status; } - assert(expected_state_mmap_buffer_->GetLen() == expected_values_size); + assert(expected_state_mmap_buffer_->GetLen() == GetValuesLen()); values_ = static_cast*>( expected_state_mmap_buffer_->GetBase()); @@ -711,9 +710,9 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler, } // anonymous namespace -Status FileExpectedStateManager::GetExpectedState( - DB* db, std::unique_ptr& state) { - std::cout << "Enter FileExpectedStateManager::GetExpectedState" << std::endl; +Status FileExpectedStateManager::SetSecondaryExpectedState(DB* db) { + std::cout << "Enter FileExpectedStateManager::SetSecondaryExpectedState" + << std::endl; assert(HasHistory()); SequenceNumber seqno = db->GetLatestSequenceNumber(); if (seqno < saved_seqno_) { @@ -722,6 +721,7 @@ Status FileExpectedStateManager::GetExpectedState( return Status::Corruption("DB is older than any restorable expected state"); } + // Create the trace reader. The trace file must exist. std::string trace_filename = std::to_string(saved_seqno_) + kTraceFilenameSuffix; std::string trace_file_path = GetPathForFilename(trace_filename); @@ -744,26 +744,45 @@ Status FileExpectedStateManager::GetExpectedState( return s; } + // Create an expected state by replaying the trace std::string state_filename = std::to_string(saved_seqno_) + kStateFilenameSuffix; std::string state_file_path = GetPathForFilename(state_filename); + std::cout << "state_file_path = " << state_file_path << std::endl; + + std::string verification_file_temp_path = GetTempPathForFilename( + "verification_" + std::to_string(seqno) + kStateFilenameSuffix); + if (s.ok()) { + s = CopyFile(FileSystem::Default(), state_file_path, Temperature::kUnknown, + verification_file_temp_path, Temperature::kUnknown, + 0 /* size */, false /* use_fsync */, nullptr /* io_tracer */); + } + + if (!s.ok()) { + return s; + } + + std::cout << "verification_file_temp_path: " << verification_file_temp_path + << std::endl; std::unique_ptr replay_state( - new FileSnapshotExpectedState(state_file_path, max_key_, + new FileSnapshotExpectedState(verification_file_temp_path, max_key_, num_column_families_)); s = replay_state->Open(false /* create */); if (!s.ok()) { std::cout << "Error opening FileSnapshotExpectedState" << std::endl; return s; } - - s = ReplayTrace(db, std::move(trace_reader), seqno - saved_seqno_, - replay_state.get()); + uint64_t write_ops = seqno - saved_seqno_; + std::cout << "Replaying " << write_ops << " operations from trace from " + << saved_seqno_ << " to " << seqno << std::endl; + s = ReplayTrace(db, std::move(trace_reader), write_ops, replay_state.get()); if (!s.ok()) { std::cout << "Error replaying trace" << std::endl; return s; } - state = std::move(replay_state); - std::cout << "Successful exit from GetExpectedState" << std::endl; + secondary_expected_state_ = std::move(replay_state); + assert(db->GetLatestSequenceNumber() == seqno); + std::cout << "Successful exit from SetSecondaryExpectedState" << std::endl; return s; } diff --git a/db_stress_tool/expected_state.h b/db_stress_tool/expected_state.h index 6c05d6115ec..a4a3b48380a 100644 --- a/db_stress_tool/expected_state.h +++ b/db_stress_tool/expected_state.h @@ -246,8 +246,7 @@ class ExpectedStateManager { // is executing. virtual Status Restore(DB* db) = 0; - virtual Status GetExpectedState(DB* db, - std::unique_ptr& state) = 0; + virtual Status SetSecondaryExpectedState(DB* db) = 0; // Requires external locking covering all keys in `cf`. void ClearColumnFamily(int cf) { return latest_->ClearColumnFamily(cf); } @@ -266,6 +265,10 @@ class ExpectedStateManager { // See ExpectedState::Get() ExpectedValue Get(int cf, int64_t key) { return latest_->Get(cf, key); } + ExpectedValue GetSecondary(int cf, int64_t key) { + return secondary_expected_state_->Get(cf, key); + } + // See ExpectedState::PrepareDelete() PendingExpectedValue PrepareDelete(int cf, int64_t key) { return latest_->PrepareDelete(cf, key); @@ -343,8 +346,7 @@ class FileExpectedStateManager : public ExpectedStateManager { // file into "LATEST.state". Status Restore(DB* db) override; - Status GetExpectedState(DB* db, - std::unique_ptr& state) override; + Status SetSecondaryExpectedState(DB* db) override; private: // Requires external locking preventing concurrent execution with any other @@ -391,8 +393,7 @@ class AnonExpectedStateManager : public ExpectedStateManager { // currently have a need to keep history of expected state within a process. Status Restore(DB* /* db */) override { return Status::NotSupported(); } - Status GetExpectedState( - DB* /* db */, std::unique_ptr& /* state */) override { + Status SetSecondaryExpectedState(DB* /* db */) override { return Status::NotSupported(); } diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index 4fe958bbd5f..708b230c64b 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -128,7 +128,8 @@ class NonBatchedOpsStressTest : public StressTest { s = Status::NotFound(); } - VerifyOrSyncValue(static_cast(cf), i, options, shared, from_db, + VerifyOrSyncValue(static_cast(cf), i, options, shared, + shared->Get(cf, i), from_db, /* msg_prefix */ "Iterator verification", s); if (!from_db.empty()) { @@ -147,7 +148,8 @@ class NonBatchedOpsStressTest : public StressTest { Status s = db_->Get(options, column_families_[cf], key, &from_db); - VerifyOrSyncValue(static_cast(cf), i, options, shared, from_db, + VerifyOrSyncValue(static_cast(cf), i, options, shared, + shared->Get(cf, i), from_db, /* msg_prefix */ "Get verification", s); if (!from_db.empty()) { @@ -182,7 +184,8 @@ class NonBatchedOpsStressTest : public StressTest { } } - VerifyOrSyncValue(static_cast(cf), i, options, shared, from_db, + VerifyOrSyncValue(static_cast(cf), i, options, shared, + shared->Get(cf, i), from_db, /* msg_prefix */ "GetEntity verification", s); if (!from_db.empty()) { @@ -217,7 +220,8 @@ class NonBatchedOpsStressTest : public StressTest { const std::string from_db = values[j].ToString(); VerifyOrSyncValue(static_cast(cf), i + j, options, shared, - from_db, /* msg_prefix */ "MultiGet verification", + shared->Get(cf, i + j), from_db, + /* msg_prefix */ "MultiGet verification", statuses[j]); if (!from_db.empty()) { @@ -268,9 +272,10 @@ class NonBatchedOpsStressTest : public StressTest { } } - VerifyOrSyncValue( - static_cast(cf), i + j, options, shared, from_db, - /* msg_prefix */ "MultiGetEntity verification", statuses[j]); + VerifyOrSyncValue(static_cast(cf), i + j, options, shared, + shared->Get(cf, i + j), from_db, + /* msg_prefix */ "MultiGetEntity verification", + statuses[j]); if (!from_db.empty()) { PrintKeyValue(static_cast(cf), static_cast(i + j), @@ -321,7 +326,8 @@ class NonBatchedOpsStressTest : public StressTest { from_db = values[number_of_operands - 1].ToString(); } - VerifyOrSyncValue(static_cast(cf), i, options, shared, from_db, + VerifyOrSyncValue(static_cast(cf), i, options, shared, + shared->Get(cf, i), from_db, /* msg_prefix */ "GetMergeOperands verification", s); @@ -339,24 +345,52 @@ class NonBatchedOpsStressTest : public StressTest { return; } + assert(cmp_db_); + assert(!cmp_cfhs_.empty()); + Status s = cmp_db_->TryCatchUpWithPrimary(); + if (!s.ok()) { + assert(false); + exit(1); + } + + auto* shared = thread->shared; + assert(shared); + const int64_t max_key = shared->GetMaxKey(); + ReadOptions read_opts(FLAGS_verify_checksum, true); + if (thread->shared->HasHistory()) { - std::unique_ptr state; - Status getExpectedStateStatus = - thread->shared->GetExpectedState(db_, state); - if (!getExpectedStateStatus.ok()) { + uint64_t start_get_expected_state = clock_->NowMicros(); + Status setSecondaryExpectedStateStatus = + thread->shared->SetSecondaryExpectedState(cmp_db_); + if (!setSecondaryExpectedStateStatus.ok()) { std::cout << "[NonBatchedOpsStressTest::ContinuouslyVerifyDb]: Failed " "to get expected state" << std::endl; assert(false); } - } + uint64_t end_get_expected_state = clock_->NowMicros(); + std::cout << "Retrieved expected state in " + << end_get_expected_state - start_get_expected_state + << " microseconds" << std::endl; - assert(cmp_db_); - assert(!cmp_cfhs_.empty()); - Status s = cmp_db_->TryCatchUpWithPrimary(); - if (!s.ok()) { - assert(false); - exit(1); + uint64_t start_secondary_scan = clock_->NowMicros(); + for (int64_t i = 0; i < max_key; ++i) { + if (thread->shared->HasVerificationFailedYet()) { + break; + } + const std::string key = Key(i); + std::string from_db; + s = cmp_db_->Get(read_opts, column_families_[0], key, &from_db); + if (!VerifyOrSyncValue( + 0, i, read_opts, shared, shared->GetSecondary(0, i), from_db, + /* msg_prefix */ "Secondary get verification", s)) { + std::cout << "Failed on key i=" << i << std::endl; + } + } + uint64_t end_secondary_scan = clock_->NowMicros(); + std::cout << "Scanned all of secondary db in " + << end_secondary_scan - start_secondary_scan << " microseconds" + << std::endl; } const auto checksum_column_family = [](Iterator* iter, @@ -371,10 +405,6 @@ class NonBatchedOpsStressTest : public StressTest { return iter->status(); }; - auto* shared = thread->shared; - assert(shared); - const int64_t max_key = shared->GetMaxKey(); - ReadOptions read_opts(FLAGS_verify_checksum, true); std::string ts_str; Slice ts; if (FLAGS_user_timestamp_size > 0) { @@ -1644,9 +1674,11 @@ class NonBatchedOpsStressTest : public StressTest { std::string from_db; Status s = db_->Get(read_opts, cfh, k, &from_db); - bool res = VerifyOrSyncValue( - rand_column_family, rand_key, read_opts, shared, - /* msg_prefix */ "Pre-Put Get verification", from_db, s); + // looks like these arguments for msg_prefix and from_db were swapped + bool res = + VerifyOrSyncValue(rand_column_family, rand_key, read_opts, shared, + shared->Get(rand_column_family, rand_key), from_db, + /* msg_prefix */ "Pre-Put Get verification", s); // Enable back error injection disabled for preparation if (fault_fs_guard) { @@ -2729,21 +2761,13 @@ class NonBatchedOpsStressTest : public StressTest { } bool VerifyOrSyncValue(int cf, int64_t key, const ReadOptions& opts, - SharedState* shared, const std::string& value_from_db, + SharedState* shared, + const ExpectedValue& expected_value, + const std::string& value_from_db, std::string msg_prefix, const Status& s) const { if (shared->HasVerificationFailedYet()) { return false; } - if (shared->HasHistory()) { - std::unique_ptr state; - Status getExpectedStateStatus = shared->GetExpectedState(db_, state); - if (!getExpectedStateStatus.ok()) { - std::cout << "[VerifyOrSyncValue]: Failed to get expected state" - << std::endl; - return false; - } - } - const ExpectedValue expected_value = shared->Get(cf, key); if (expected_value.PendingWrite() || expected_value.PendingDelete()) { if (s.ok()) {