Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize replication latency by fix the misjudgment of the WAL iterator status(#13260) #13261

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 45 additions & 21 deletions db/transaction_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const std::string& dir, const ImmutableDBOptions* options,
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seq,
WalManager* wal_manager,
std::unique_ptr<VectorWalPtr> files, VersionSet const* const versions,
const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer)
: dir_(dir),
Expand All @@ -37,6 +38,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
assert(versions_ != nullptr);
assert(!seq_per_batch_);
current_status_.PermitUncheckedError(); // Clear on start
get_live_wal_handle_.wm = wal_manager;
reporter_.env = options_->env;
reporter_.info_log = options_->info_log.get();
SeekToStartSequence(); // Seek till starting sequence
Expand Down Expand Up @@ -83,9 +85,10 @@ Status TransactionLogIteratorImpl::status() { return current_status_; }

bool TransactionLogIteratorImpl::Valid() { return started_ && is_valid_; }

bool TransactionLogIteratorImpl::RestrictedRead(Slice* record) {
bool TransactionLogIteratorImpl::RestrictedRead(Slice* record, bool& no_new_entry) {
// Don't read if no more complete entries to read from logs
if (current_last_seq_ >= versions_->LastSequence()) {
no_new_entry = true;
return false;
}
return current_log_reader_->ReadRecord(record, &scratch_);
Expand Down Expand Up @@ -119,7 +122,8 @@ void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index,
reporter_.Info(current_status_.ToString().c_str());
return;
}
while (RestrictedRead(&record)) {
bool no_new_entry;
while (RestrictedRead(&record, no_new_entry)) {
if (record.size() < WriteBatchInternal::kHeader) {
reporter_.Corruption(record.size(),
Status::Corruption("very small log record"));
Expand Down Expand Up @@ -185,22 +189,22 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) {
if (current_log_reader_->IsEOF()) {
current_log_reader_->UnmarkEOF();
}
while (RestrictedRead(&record)) {
bool no_new_entry;
while (RestrictedRead(&record, no_new_entry)) {
if (record.size() < WriteBatchInternal::kHeader) {
reporter_.Corruption(record.size(),
Status::Corruption("very small log record"));
continue;
} else {
// started_ should be true if called by application
assert(internal || started_);
// started_ should be false if called internally
assert(!internal || !started_);
UpdateCurrentWriteBatch(record);
if (internal && !started_) {
started_ = true;
}
return;
}
// started_ should be true if called by application
assert(internal || started_);
// started_ should be false if called internally
assert(!internal || !started_);
UpdateCurrentWriteBatch(record);
if (internal && !started_) {
started_ = true;
}
return;
}

// Open the next file
Expand All @@ -212,16 +216,36 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) {
current_status_ = s;
return;
}
} else {
is_valid_ = false;
if (current_last_seq_ == versions_->LastSequence()) {
current_status_ = Status::OK();
} else {
const char* msg = "Create a new iterator to fetch the new tail.";
current_status_ = Status::TryAgain(msg);
}
continue;
}

is_valid_ = false;
if (no_new_entry) {
current_status_ = Status::OK();
return;
}

// in some case current_log_reader_->ReadRecord return kEof,
// but files_->at(current_file_index_) didn't reach the end
// which lead to severe delay of replication
if (current_log_reader_->IsEOF()) {
std::unique_ptr<VectorWalPtr> new_wal_files(new VectorWalPtr);
SequenceNumber start_seq = files_->at(current_file_index_).get()->StartSequence();
// only get live wal is enough in this case, GetLiveWalFiles will cost no more than 100us
if (get_live_wal_handle_.GetLiveWalFiles(start_seq, *new_wal_files).ok()) {
if (new_wal_files->size() == 1 &&
new_wal_files->begin()->get()->StartSequence() == start_seq) {
current_status_ = Status::OK();
return;
}

// if new_wal_files->size() >1, just set files as new_wal_files may be more efficiency
}
}

const char* msg = "Create a new iterator to fetch the new tail.";
current_status_ = Status::TryAgain(msg);
return;
}
}

Expand Down
11 changes: 10 additions & 1 deletion db/transaction_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "rocksdb/options.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/types.h"
#include "wal_manager.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -60,6 +61,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
const std::string& dir, const ImmutableDBOptions* options,
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seqNum,
WalManager* wal_manager,
std::unique_ptr<VectorWalPtr> files, VersionSet const* const versions,
const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer);

Expand Down Expand Up @@ -95,6 +97,13 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
Status OpenLogFile(const WalFile* log_file,
std::unique_ptr<SequentialFileReader>* file);

struct GetLiveWalHandle{
WalManager* wm;
Status GetLiveWalFiles(SequenceNumber seq_start, VectorWalPtr& live_log_files) const {
return wm->GetLiveWalFiles(seq_start, live_log_files);
}
} get_live_wal_handle_{};

struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
Expand All @@ -109,7 +118,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
current_batch_seq_; // sequence number at start of current batch
SequenceNumber current_last_seq_; // last sequence in the current batch
// Reads from transaction log only if the writebatch record has been written
bool RestrictedRead(Slice* record);
bool RestrictedRead(Slice* record, bool& no_new_entry);
// Seeks to starting_sequence_number_ reading from start_file_index in files_.
// If strict is set, then must get a batch starting with
// starting_sequence_number_.
Expand Down
8 changes: 8 additions & 0 deletions db/wal_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,14 @@ Status WalManager::GetLiveWalFile(uint64_t number,
return Status::OK();
}

Status WalManager::GetLiveWalFiles(SequenceNumber seq_start, VectorWalPtr& live_log_files) {
if (Status s = GetSortedWalsOfType(wal_dir_, live_log_files, kAliveLogFile, true);
!s.ok()) {
return s;
}
return RetainProbableWalFiles(live_log_files, seq_start);
}

// the function returns status.ok() and sequence == 0 if the file exists, but is
// empty
Status WalManager::ReadFirstLine(const std::string& fname,
Expand Down
2 changes: 2 additions & 0 deletions db/wal_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class WalManager {

Status GetLiveWalFile(uint64_t number, std::unique_ptr<WalFile>* log_file);

Status GetLiveWalFiles(SequenceNumber seq_start, VectorWalPtr& live_log_files);

Status TEST_ReadFirstRecord(const WalFileType type, const uint64_t number,
SequenceNumber* sequence) {
return ReadFirstRecord(type, number, sequence);
Expand Down