From f9d0de249491d34a66971eb6c87fc5f74f256661 Mon Sep 17 00:00:00 2001 From: Maciej Szeszko Date: Fri, 20 Dec 2024 01:12:21 -0800 Subject: [PATCH] First logic draft --- include/rocksdb/env.h | 2 +- include/rocksdb/utilities/backup_engine.h | 52 ++- utilities/backup/backup_engine.cc | 395 +++++++++++++++++++++- 3 files changed, 428 insertions(+), 21 deletions(-) diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 5f299ee1a76b..fa2e1d242ac4 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -630,7 +630,7 @@ class Env : public Customizable { const EnvOptions& env_options, const ImmutableDBOptions& immutable_ops) const; - // OptimizeForCompactionTableWrite will create a new EnvOptions object that + // OptimizeForCompactionTableRead will create a new EnvOptions object that // is a copy of the EnvOptions in the parameters, but is optimized for reading // table files. virtual EnvOptions OptimizeForCompactionTableRead( diff --git a/include/rocksdb/utilities/backup_engine.h b/include/rocksdb/utilities/backup_engine.h index 204d12d6fea5..159b252bf190 100644 --- a/include/rocksdb/utilities/backup_engine.h +++ b/include/rocksdb/utilities/backup_engine.h @@ -117,8 +117,8 @@ struct BackupEngineOptions { // Default: true bool share_files_with_checksum; - // Up to this many background threads will copy files for CreateNewBackup() - // and RestoreDBFromBackup() + // Up to this many background threads will be used to copy files & compute + // checksums for CreateNewBackup() and RestoreDBFromBackup(). // Default: 1 int max_background_operations; @@ -337,6 +337,46 @@ struct CreateBackupOptions { CpuPriority background_thread_cpu_priority = CpuPriority::kNormal; }; +// Enum reflecting tiered approach to incremental restores. +// Options `kKeepLatestDbSessionIdFiles`, `kVerifyChecksum` are intended +// to be used separately and NOT to be combined with one another. +enum RestoreMode : uint32_t { + // Instructs restore engine to consider existing destination file and its' + // backup counterpart as 'equal' IF the db session id AND size values read + // from the backup file footer match the corresponding values in existing + // destination file metadata block. + // + // NOTE: + // ===== + // + // Only applicable to backup files that preserve the notion of 'session' + // in the footer. Good approximation to tell if that's the case is to verify + // that backup files do follow the `kUseDbSessionId` naming scheme, ex: + // + // _s[_].sst + // + // RISK WARNING: + // ============= + // + // Determination is made solely based on the backup & db file footer metadata. + // Technically speaking, it is possible that backup or db file with the very + // same db session id and size hold different data (think file corruption, + // blocks filled with zeros [trash], etc.). If you need stronger guarantees + // with no 'session' constraints, use `kVerifyChecksum` restore mode instead. + kKeepLatestDbSessionIdFiles = 1U, + + // When opted-in, restore engine will scan the db file, evaluate the checksum + // and compare it against the checksum hardened in the backup file metadata. + // If checksums match, existing file will be retained as-is. Otherwise, it + // will be deleted and replaced it with its' restored backup counterpart. + // If backup file doesn't have a checksum hardened in the metadata, + // we'll schedule an async task to compute it. + kVerifyChecksum = 2U, + + // Zero trust. Purge all the destination files and restore all the files. + kPurgeAllFiles = 0xffffU, +}; + struct RestoreOptions { // If true, restore won't overwrite the existing log files in wal_dir. It will // also move all log files from archive directory to wal_dir. Use this option @@ -350,8 +390,12 @@ struct RestoreOptions { // directories known to contain the required files. std::forward_list alternate_dirs; - explicit RestoreOptions(bool _keep_log_files = false) - : keep_log_files(_keep_log_files) {} + // Specifies the level of incremental restore. 'kPurgeAllFiles' by default. + RestoreMode mode; + + explicit RestoreOptions(bool _keep_log_files = false, + RestoreMode _mode = RestoreMode::kPurgeAllFiles) + : keep_log_files(_keep_log_files), mode(_mode) {} }; using BackupID = uint32_t; diff --git a/utilities/backup/backup_engine.cc b/utilities/backup/backup_engine.cc index ed43ff0b89e8..17640c488260 100644 --- a/utilities/backup/backup_engine.cc +++ b/utilities/backup/backup_engine.cc @@ -190,6 +190,7 @@ class BackupEngineImpl { private: void DeleteChildren(const std::string& dir, + const std::unordered_set& files_to_keep, uint32_t file_type_filter = 0) const; IOStatus DeleteBackupNoGC(BackupID backup_id); @@ -503,6 +504,14 @@ class BackupEngineImpl { BackupInfo* backup_info, bool include_file_details) const; + // Infers set of existing destination files that could be retained + // during the incremental restore procedure in accordance with user + // selected strategy (RestoreMode). File can be retained only if it's + // deemed to exist in the referenced backup set. + void InferDBFilesToRetainInRestore( + const std::unique_ptr& backup, const std::string& dir, + RestoreMode mode, std::unordered_set& files_to_keep) const; + inline std::string GetAbsolutePath( const std::string& relative_path = "") const { assert(relative_path.size() == 0 || relative_path[0] != '/'); @@ -592,7 +601,8 @@ class BackupEngineImpl { Status GetFileDbIdentities(Env* src_env, const EnvOptions& src_env_options, const std::string& file_path, Temperature file_temp, RateLimiter* rate_limiter, - std::string* db_id, std::string* db_session_id); + std::string* db_id, + std::string* db_session_id) const; struct WorkItemResult { WorkItemResult() @@ -637,6 +647,7 @@ class BackupEngineImpl { enum WorkItemType : uint64_t { CopyOrCreate = 1U, + ComputeChecksum = 2U, }; // Exactly one of src_path and contents must be non-empty. If src_path is @@ -784,6 +795,30 @@ class BackupEngineImpl { using BackupWorkItemPair = std::pair; + struct ComputeChecksumWorkItem { + std::future result; + std::string file_path; + uint64_t file_number; + + ComputeChecksumWorkItem(std::future&& _result, + const std::string& _file_path, + uint64_t _file_number) + : result(std::move(_result)), + file_path(_file_path), + file_number(_file_number) {} + + ComputeChecksumWorkItem(ComputeChecksumWorkItem&& o) noexcept { + *this = std::move(o); + } + + ComputeChecksumWorkItem& operator=(ComputeChecksumWorkItem&& o) noexcept { + result = std::move(o.result); + file_path = std::move(o.file_path); + file_number = o.file_number; + return *this; + } + }; + struct RestoreAfterCopyOrCreateWorkItem { std::future result; std::string from_file; @@ -1266,8 +1301,8 @@ IOStatus BackupEngineImpl::Initialize() { ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u", latest_valid_backup_id_); - // set up threads perform copies from work_items_ in the - // background + // set up threads to perform file creation / copy or checksum computations + // from work_items_ in the background. threads_cpu_priority_ = CpuPriority::kNormal; threads_.reserve(options_.max_background_operations); for (int t = 0; t < options_.max_background_operations; t++) { @@ -1296,8 +1331,8 @@ IOStatus BackupEngineImpl::Initialize() { uint64_t prev_bytes_written = IOSTATS(bytes_written); WorkItemResult result; - Temperature temp = work_item.src_temperature; if (work_item.type == WorkItemType::CopyOrCreate) { + Temperature temp = work_item.src_temperature; result.io_status = CopyOrCreateFile( work_item.src_path, work_item.dst_path, work_item.contents, work_item.size_limit, work_item.src_env, work_item.dst_env, @@ -1343,11 +1378,19 @@ IOStatus BackupEngineImpl::Initialize() { work_item.dst_path.c_str(), checksum_function_info.c_str()); } } + } else if (work_item.type == ComputeChecksum) { + result.io_status = ReadFileAndComputeChecksum( + work_item.src_path, work_item.src_env->GetFileSystem(), + work_item.src_env_options, work_item.size_limit, + &result.checksum_hex, work_item.src_temperature); + result.db_id = work_item.db_id; + result.db_session_id = work_item.db_session_id; work_item.result.set_value(std::move(result)); } else { result.io_status = IOStatus::InvalidArgument( "Unknown work item type: " + std::to_string(work_item.type)); } + work_item.result.set_value(std::move(result)); } }); } @@ -1914,9 +1957,12 @@ IOStatus BackupEngineImpl::RestoreDBFromBackup( db_fs_->CreateDirIfMissing(wal_dir, io_options_, nullptr) .PermitUncheckedError(); + std::unordered_set files_to_keep; + InferDBFilesToRetainInRestore(backup, db_dir, options.mode, files_to_keep); + if (options.keep_log_files) { - // delete files in db_dir, but keep all the log files - DeleteChildren(db_dir, 1 << kWalFile); + // delete non-matching files in db_dir, but keep all the log files + DeleteChildren(db_dir, files_to_keep, 1 << kWalFile); // move all the files from archive dir to wal_dir std::string archive_dir = ArchivalDirectory(wal_dir); std::vector archive_files; @@ -1940,9 +1986,9 @@ IOStatus BackupEngineImpl::RestoreDBFromBackup( } } } else { - DeleteChildren(wal_dir); - DeleteChildren(ArchivalDirectory(wal_dir)); - DeleteChildren(db_dir); + DeleteChildren(wal_dir, files_to_keep); + DeleteChildren(ArchivalDirectory(wal_dir), files_to_keep); + DeleteChildren(db_dir, files_to_keep); } // Files to restore, and from where (taking into account excluded files) @@ -2001,6 +2047,12 @@ IOStatus BackupEngineImpl::RestoreDBFromBackup( return IOStatus::Corruption("Backup corrupted: Fail to parse filename " + dst); } + + if (files_to_keep.find(number) != files_to_keep.end()) { + // This file is already in the destination directory. + continue; + } + // 3. Construct the final path // kWalFile lives in wal_dir and all the rest live in db_dir if (type == kWalFile) { @@ -2032,6 +2084,14 @@ IOStatus BackupEngineImpl::RestoreDBFromBackup( ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str()); + + // When file is being copied over, it means that it was either non-existent, + // purged or its' original on-disk representation didn't meet incremental + // restore tiering criteria. As such, we need to unconditionally recompute + // the checksum on the newly restored files - even if checksum was already + // computed on its' seed backup file in early assessment phase. Protection + // is put in place to ensure that there are no bugs in the actual restore / + // file copy logic and we're not producing garbage db files. WorkItem copy_or_create_work_item( absolute_file, dst, Temperature::kUnknown /* src_temp */, file_info->temp, "" /* contents */, src_env, db_env_, @@ -2613,10 +2673,13 @@ IOStatus BackupEngineImpl::ReadFileAndComputeChecksum( return io_s; } -Status BackupEngineImpl::GetFileDbIdentities( - Env* src_env, const EnvOptions& src_env_options, - const std::string& file_path, Temperature file_temp, - RateLimiter* rate_limiter, std::string* db_id, std::string* db_session_id) { +Status BackupEngineImpl::GetFileDbIdentities(Env* src_env, + const EnvOptions& src_env_options, + const std::string& file_path, + Temperature file_temp, + RateLimiter* rate_limiter, + std::string* db_id, + std::string* db_session_id) const { assert(db_id != nullptr || db_session_id != nullptr); Options options; @@ -2691,8 +2754,304 @@ void BackupEngineImpl::LoopRateLimitRequestHelper( } } -void BackupEngineImpl::DeleteChildren(const std::string& dir, - uint32_t file_type_filter) const { +void BackupEngineImpl::InferDBFilesToRetainInRestore( + const std::unique_ptr& backup, const std::string& dir, + RestoreMode mode, std::unordered_set& files_to_keep) const { + if (mode == RestoreMode::kPurgeAllFiles) { + return; + } + + ROCKS_LOG_INFO(options_.info_log, + "Starting incremental restore evaluation in %" PRIu32 " mode", + mode); + + ROCKS_LOG_INFO(options_.info_log, "Constructing backup files map..."); + std::unordered_map, FileType>> + file_id_to_backup_file_info_and_type; + for (const auto& file_info_shared : backup->GetFiles()) { + uint64_t number; + FileType type; + + std::string filename = file_info_shared->GetDbFileName(); + + bool ok = ParseFileName(filename, &number, &type); + + // We only care to optimize restore for large files - like SSTs / blobs. + if (ok && (type == kTableFile || type == kBlobFile)) { + file_id_to_backup_file_info_and_type.insert( + {number, std::make_pair(file_info_shared, type)}); + } + } + + ROCKS_LOG_INFO(options_.info_log, + "Evaluating existing files restore retention eligibility..."); + + std::vector children; + db_fs_->GetChildren(dir, io_options_, &children, nullptr) + .PermitUncheckedError(); // ignore errors + RateLimiter* rate_limiter = options_.restore_rate_limiter.get(); + std::vector backup_files_compute_checksum_work_items; + std::vector db_files_compute_checksum_work_items; + std::unordered_map backup_file_number_to_checksum; + for (const auto& f : children) { + uint64_t number; + FileType type; + bool ok = ParseFileName(f, &number, &type); + if (!ok) { + Log(options_.info_log, "Couldn't parse existing file name: '%s'", + f.c_str()); + continue; + } + + if (type != kTableFile && type != kBlobFile) { + // We only care to optimize restore for large files - like SSTs / blobs. + Log(options_.info_log, + "Existing file '%s' is neither a table nor a blob file. Type: %d.", + f.c_str(), type); + continue; + } + + auto it = file_id_to_backup_file_info_and_type.find(number); + if (it == file_id_to_backup_file_info_and_type.end()) { + Log(options_.info_log, "Existing file '%s' is not present in the backup.", + f.c_str()); + continue; + } + + auto backup_file_info = (it->second).first; + FileType backup_file_type = (it->second).second; + if (type != backup_file_type) { + Log(options_.info_log, + "File type mismatch between backup and dest file system!" + "Backup file '%s' type: %d, existing file '%s' type: %d", + backup_file_info->filename.c_str(), backup_file_type, f.c_str(), + type); + continue; + } + + if (backup_file_info->size == 0) { + Log(options_.info_log, "Backup file '%s' size is 0!", + backup_file_info->filename.c_str()); + continue; + } + + uint64_t size_bytes = 0; + std::string db_file_path = dir + "/" + f; + IOStatus io_st = db_fs_->GetFileSize(db_file_path, io_options_, &size_bytes, + nullptr /* dbg */); + if (!io_st.ok()) { + Log(options_.info_log, + "Failed to get the file size for existing file: '%s'. IO status: %s", + f.c_str(), io_st.ToString().c_str()); + continue; + } + + if (backup_file_info->size != size_bytes) { + Log(options_.info_log, + "File size mismatch between backup and file system!" + "Backup file '%s' size: %" PRIu64 + " bytes" + "Existing file '%s' size: %" PRIu64 " bytes", + backup_file_info->filename.c_str(), backup_file_info->size, f.c_str(), + size_bytes); + continue; + } + + std::string backup_file_path = GetAbsolutePath(backup_file_info->filename); + if (mode == RestoreMode::kKeepLatestDbSessionIdFiles) { + // Backup file does not need to be restored if it's corresponding + // matching ID file has the exact same session ID and size. + + // Unlike checksum_hex and size, backup db_session_id and db_id values + // are usually not derived at the time of backup engine initialization / + // loading metadata from file. Read directly from the footer. + std::string backup_db_id; + std::string backup_db_session_id; + Status s = GetFileDbIdentities( + backup_env_, EnvOptions() /* src_env_options */, backup_file_path, + backup_file_info->temp /* src_temp */, rate_limiter, &backup_db_id, + &backup_db_session_id); + if (!s.ok()) { + Log(options_.info_log, + "Encountered IO error while obtaining db session id metadata for " + "backup file '%s'.", + backup_file_info->filename.c_str()); + continue; + } + + if (backup_db_session_id.empty()) { + Log(options_.info_log, "Empty db_session_id for backup file '%s'", + backup_file_info->filename.c_str()); + continue; + } + + std::string db_id; + std::string db_session_id; + s = GetFileDbIdentities(db_env_, EnvOptions() /* src_env_options */, + db_file_path /* file_path */, + Temperature::kUnknown /* src_temp */, + rate_limiter, &db_id, &db_session_id); + if (!s.ok()) { + Log(options_.info_log, + "Encountered IO error while obtaining db session id metadata for " + "existing file '%s'.", + db_file_path.c_str()); + continue; + } + + if (backup_db_session_id != db_session_id) { + Log(options_.info_log, + "File db session id mismatch between backup and file system!" + "Backup file '%s' db session id: '%s'" + "Existing file '%s' db session id: '%s'", + backup_file_info->filename.c_str(), backup_db_session_id.c_str(), + f.c_str(), db_session_id.c_str()); + continue; + } + + files_to_keep.insert(number); + + ROCKS_LOG_INFO(options_.info_log, + "Existing file '%s' is retained for restore.", f.c_str()); + } else if (mode == RestoreMode::kVerifyChecksum) { + DBOptions db_options; + std::string backup_file_checksum = backup_file_info->checksum_hex; + if (!backup_file_checksum.empty()) { + backup_file_number_to_checksum.insert( + std::make_pair(number, backup_file_checksum)); + } else { + // Backup file checksum is missing in the backup metadata. + // Given explicit requirement, compute it asynchronously. + EnvOptions backup_env_options; + if (type == kBlobFile) { + backup_env_->OptimizeForBlobFileRead(backup_env_options, + ImmutableDBOptions(db_options)); + } else if (type == kTableFile) { + backup_env_->OptimizeForCompactionTableRead( + backup_env_options, ImmutableDBOptions(db_options)); + } + + WorkItem backup_file_work_item( + backup_file_path, "" /* dst_path */, backup_file_info->temp, + Temperature::kUnknown /* dst_temperature */, "" /* contents */, + backup_env_, nullptr /* dst_env */, backup_env_options, + false /* sync */, options_.restore_rate_limiter.get(), + 0 /* size_limit */, nullptr /* stats */, {} /* progress_callback */, + kUnknownFileChecksumFuncName /* src_checksum_func_name */, + "" /* src_checksum_hex */, "" /* db_id */, "" /* db_session_id*/, + WorkItemType::ComputeChecksum); + + ComputeChecksumWorkItem backup_file_checksum_work_item( + backup_file_work_item.result.get_future(), backup_file_path, + number); + + work_items_.write(std::move(backup_file_work_item)); + backup_files_compute_checksum_work_items.push_back( + std::move(backup_file_checksum_work_item)); + + Log(options_.info_log, + "Checksum is missing in '%s' backup file metadata." + "Schedule async computation", + backup_file_info->filename.c_str()); + } + + // Unconditionally compute checksum for existing file. + EnvOptions db_env_options; + if (type == kBlobFile) { + db_env_->OptimizeForBlobFileRead(db_env_options, + ImmutableDBOptions(db_options)); + } else if (type == kTableFile) { + db_env_->OptimizeForCompactionTableRead(db_env_options, + ImmutableDBOptions(db_options)); + } + + WorkItem db_file_work_item( + db_file_path, "" /* dst_path */, backup_file_info->temp, + Temperature::kUnknown /* dst_temperature */, "" /* contents */, + db_env_, nullptr /* dst_env */, db_env_options, false /* sync */, + options_.restore_rate_limiter.get(), 0 /* size_limit */, + nullptr /* stats */, {} /* progress_callback */, + kUnknownFileChecksumFuncName /* src_checksum_func_name */, + "" /* src_checksum_hex */, "" /* db_id */, "" /* db_session_id*/, + WorkItemType::ComputeChecksum); + + ComputeChecksumWorkItem db_file_checksum_work_item( + db_file_work_item.result.get_future(), db_file_path, number); + work_items_.write(std::move(db_file_work_item)); + db_files_compute_checksum_work_items.push_back( + std::move(db_file_checksum_work_item)); + + Log(options_.info_log, + "Schedule async checksum computation for file '%s'", f.c_str()); + } + } + + if (mode == RestoreMode::kVerifyChecksum) { + // First loop through checksum computation results for backup files. + for (auto& item : backup_files_compute_checksum_work_items) { + item.result.wait(); + auto result = item.result.get(); + IOStatus item_io_status = result.io_status; + if (!item_io_status.ok()) { + // Failed computation for backup file will result in purging + // the existing file and restoring the backup file. + Log(options_.info_log, + "Encountered IO error while computing checksum for " + "backup file '%s'.", + item.file_path.c_str()); + continue; + } + + backup_file_number_to_checksum.insert( + std::make_pair(item.file_number, result.checksum_hex)); + } + + // Loop through db files checksum computation results. + for (auto& item : db_files_compute_checksum_work_items) { + item.result.wait(); + auto result = item.result.get(); + IOStatus item_io_status = result.io_status; + if (!item_io_status.ok()) { + // Failed computation for existing file will result in purging + // and restoring it from the corresponding backup file. + Log(options_.info_log, + "Encountered IO error while computing checksum for " + "existing file '%s'.", + item.file_path.c_str()); + continue; + } + + auto it = backup_file_number_to_checksum.find(item.file_number); + if (it == backup_file_number_to_checksum.end()) { + Log(options_.info_log, + "Failed to find backup file checksum for existing file '%s'.", + item.file_path.c_str()); + continue; + } + + if (it->second != result.checksum_hex) { + Log(options_.info_log, + "Checksum mismatch between backup file and existing file '%s'.", + item.file_path.c_str()); + continue; + } + + files_to_keep.insert(item.file_number); + + Log(options_.info_log, "Existing file '%s' is retained for restore.", + item.file_path.c_str()); + } + } + + ROCKS_LOG_INFO(options_.info_log, + "Done with incremental restore evaluation. " + "Retained %" PRIu64 " files.", + files_to_keep.size()); +} + +void BackupEngineImpl::DeleteChildren( + const std::string& dir, const std::unordered_set& files_to_keep, + uint32_t file_type_filter) const { std::vector children; db_fs_->GetChildren(dir, io_options_, &children, nullptr) .PermitUncheckedError(); // ignore errors @@ -2701,8 +3060,12 @@ void BackupEngineImpl::DeleteChildren(const std::string& dir, uint64_t number; FileType type; bool ok = ParseFileName(f, &number, &type); + if (ok && (files_to_keep.find(number) != files_to_keep.end())) { + // don't delete file with this number. + continue; + } if (ok && (file_type_filter & (1 << type))) { - // don't delete this file + // don't delete this file type. continue; } db_fs_->DeleteFile(dir + "/" + f, io_options_, nullptr)