diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 7c8e20435e2..7afa211d75f 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -17,6 +17,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( const std::string& dir, const ImmutableDBOptions* options, const TransactionLogIterator::ReadOptions& read_options, const EnvOptions& soptions, const SequenceNumber seq, + WalManager* wal_manager, std::unique_ptr files, VersionSet const* const versions, const bool seq_per_batch, const std::shared_ptr& io_tracer) : dir_(dir), @@ -37,6 +38,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( assert(versions_ != nullptr); assert(!seq_per_batch_); current_status_.PermitUncheckedError(); // Clear on start + get_live_wal_handle_.wm = wal_manager; reporter_.env = options_->env; reporter_.info_log = options_->info_log.get(); SeekToStartSequence(); // Seek till starting sequence @@ -83,9 +85,10 @@ Status TransactionLogIteratorImpl::status() { return current_status_; } bool TransactionLogIteratorImpl::Valid() { return started_ && is_valid_; } -bool TransactionLogIteratorImpl::RestrictedRead(Slice* record) { +bool TransactionLogIteratorImpl::RestrictedRead(Slice* record, bool& no_new_entry) { // Don't read if no more complete entries to read from logs if (current_last_seq_ >= versions_->LastSequence()) { + no_new_entry = true; return false; } return current_log_reader_->ReadRecord(record, &scratch_); @@ -119,7 +122,8 @@ void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index, reporter_.Info(current_status_.ToString().c_str()); return; } - while (RestrictedRead(&record)) { + bool no_new_entry; + while (RestrictedRead(&record, no_new_entry)) { if (record.size() < WriteBatchInternal::kHeader) { reporter_.Corruption(record.size(), Status::Corruption("very small log record")); @@ -185,22 +189,22 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { if (current_log_reader_->IsEOF()) { current_log_reader_->UnmarkEOF(); } - while (RestrictedRead(&record)) { + bool no_new_entry; + while (RestrictedRead(&record, no_new_entry)) { if (record.size() < WriteBatchInternal::kHeader) { reporter_.Corruption(record.size(), Status::Corruption("very small log record")); continue; - } else { - // started_ should be true if called by application - assert(internal || started_); - // started_ should be false if called internally - assert(!internal || !started_); - UpdateCurrentWriteBatch(record); - if (internal && !started_) { - started_ = true; - } - return; } + // started_ should be true if called by application + assert(internal || started_); + // started_ should be false if called internally + assert(!internal || !started_); + UpdateCurrentWriteBatch(record); + if (internal && !started_) { + started_ = true; + } + return; } // Open the next file @@ -212,16 +216,36 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { current_status_ = s; return; } - } else { - is_valid_ = false; - if (current_last_seq_ == versions_->LastSequence()) { - current_status_ = Status::OK(); - } else { - const char* msg = "Create a new iterator to fetch the new tail."; - current_status_ = Status::TryAgain(msg); - } + continue; + } + + is_valid_ = false; + if (no_new_entry) { + current_status_ = Status::OK(); return; } + + // in some case current_log_reader_->ReadRecord return kEof, + // but files_->at(current_file_index_) didn't reach the end + // which lead to severe delay of replication + if (current_log_reader_->IsEOF()) { + std::unique_ptr new_wal_files(new VectorWalPtr); + SequenceNumber start_seq = files_->at(current_file_index_).get()->StartSequence(); + // only get live wal is enough in this case, GetLiveWalFiles will cost no more than 100us + if (get_live_wal_handle_.GetLiveWalFiles(start_seq, *new_wal_files).ok()) { + if (new_wal_files->size() == 1 && + new_wal_files->begin()->get()->StartSequence() == start_seq) { + current_status_ = Status::OK(); + return; + } + + // if new_wal_files->size() >1, just set files as new_wal_files may be more efficiency + } + } + + const char* msg = "Create a new iterator to fetch the new tail."; + current_status_ = Status::TryAgain(msg); + return; } } diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h index e8552a47d59..263c0825563 100644 --- a/db/transaction_log_impl.h +++ b/db/transaction_log_impl.h @@ -16,6 +16,7 @@ #include "rocksdb/options.h" #include "rocksdb/transaction_log.h" #include "rocksdb/types.h" +#include "wal_manager.h" namespace ROCKSDB_NAMESPACE { @@ -60,6 +61,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { const std::string& dir, const ImmutableDBOptions* options, const TransactionLogIterator::ReadOptions& read_options, const EnvOptions& soptions, const SequenceNumber seqNum, + WalManager* wal_manager, std::unique_ptr files, VersionSet const* const versions, const bool seq_per_batch, const std::shared_ptr& io_tracer); @@ -95,6 +97,13 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { Status OpenLogFile(const WalFile* log_file, std::unique_ptr* file); + struct GetLiveWalHandle{ + WalManager* wm; + Status GetLiveWalFiles(SequenceNumber seq_start, VectorWalPtr& live_log_files) const { + return wm->GetLiveWalFiles(seq_start, live_log_files); + } + } get_live_wal_handle_{}; + struct LogReporter : public log::Reader::Reporter { Env* env; Logger* info_log; @@ -109,7 +118,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { current_batch_seq_; // sequence number at start of current batch SequenceNumber current_last_seq_; // last sequence in the current batch // Reads from transaction log only if the writebatch record has been written - bool RestrictedRead(Slice* record); + bool RestrictedRead(Slice* record, bool& no_new_entry); // Seeks to starting_sequence_number_ reading from start_file_index in files_. // If strict is set, then must get a batch starting with // starting_sequence_number_. diff --git a/db/wal_manager.cc b/db/wal_manager.cc index a4cc44d75f1..4dfbe5bfc89 100644 --- a/db/wal_manager.cc +++ b/db/wal_manager.cc @@ -456,6 +456,14 @@ Status WalManager::GetLiveWalFile(uint64_t number, return Status::OK(); } +Status WalManager::GetLiveWalFiles(SequenceNumber seq_start, VectorWalPtr& live_log_files) { + if (Status s = GetSortedWalsOfType(wal_dir_, live_log_files, kAliveLogFile, true); + !s.ok()) { + return s; + } + return RetainProbableWalFiles(live_log_files, seq_start); +} + // the function returns status.ok() and sequence == 0 if the file exists, but is // empty Status WalManager::ReadFirstLine(const std::string& fname, diff --git a/db/wal_manager.h b/db/wal_manager.h index 8fa15bf3d32..f44a2d2556c 100644 --- a/db/wal_manager.h +++ b/db/wal_manager.h @@ -66,6 +66,8 @@ class WalManager { Status GetLiveWalFile(uint64_t number, std::unique_ptr* log_file); + Status GetLiveWalFiles(SequenceNumber seq_start, VectorWalPtr& live_log_files); + Status TEST_ReadFirstRecord(const WalFileType type, const uint64_t number, SequenceNumber* sequence) { return ReadFirstRecord(type, number, sequence);