Skip to content

Commit

Permalink
Some small improvements around allow_unprepared_value and multi-CF it…
Browse files Browse the repository at this point in the history
…erators (facebook#13113)

Summary:
Pull Request resolved: facebook#13113

The patch makes some small improvements related to `allow_unprepared_value` and multi-CF iterators as groundwork for further changes:

1) Similarly to `BaseDeltaIterator`'s base iterator, `MultiCfIteratorImpl` gets passed its child iterators by the client. Even though they are currently guaranteed to have been created using the same read options as each other and the multi-CF iterator, it is safer to not assume this and call `PrepareValue` unconditionally before using any child iterator's `value()` or `columns()`.
2) Again similarly to `BaseDeltaIterator`, it makes sense to pass the entire `ReadOptions` structure to `MultiCfIteratorImpl` in case it turns out to require other read options in the future.
3) The constructors of the various multi-CF iterator classes now take an rvalue reference to a vector of column family handle + `unique_ptr` to child iterator pairs and use move semantics to take ownership of this vector (instead of taking two separate vectors of column family handles and raw iterator pointers).
4) Constructor arguments and the members of `MultiCfIteratorImpl` are reordered for consistency.

Reviewed By: jowlyzhang

Differential Revision: D65407521

fbshipit-source-id: 66c2c689ec8b036740bd98641b7b5c0ff7e777f2
  • Loading branch information
ltamasi authored and facebook-github-bot committed Nov 5, 2024
1 parent e7ffca9 commit 3becc94
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 72 deletions.
10 changes: 5 additions & 5 deletions db/attribute_group_iterator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ namespace ROCKSDB_NAMESPACE {
class AttributeGroupIteratorImpl : public AttributeGroupIterator {
public:
AttributeGroupIteratorImpl(
const Comparator* comparator, bool allow_unprepared_value,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators)
: impl_(comparator, allow_unprepared_value, column_families,
child_iterators, ResetFunc(this), PopulateFunc(this)) {}
const ReadOptions& read_options, const Comparator* comparator,
std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>&&
cfh_iter_pairs)
: impl_(read_options, comparator, std::move(cfh_iter_pairs),
ResetFunc(this), PopulateFunc(this)) {}
~AttributeGroupIteratorImpl() override {}

// No copy allowed
Expand Down
11 changes: 6 additions & 5 deletions db/coalescing_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ namespace ROCKSDB_NAMESPACE {
// EXPERIMENTAL
class CoalescingIterator : public Iterator {
public:
CoalescingIterator(const Comparator* comparator, bool allow_unprepared_value,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators)
: impl_(comparator, allow_unprepared_value, column_families,
child_iterators, ResetFunc(this), PopulateFunc(this)) {}
CoalescingIterator(
const ReadOptions& read_options, const Comparator* comparator,
std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>&&
cfh_iter_pairs)
: impl_(read_options, comparator, std::move(cfh_iter_pairs),
ResetFunc(this), PopulateFunc(this)) {}
~CoalescingIterator() override {}

// No copy allowed
Expand Down
18 changes: 15 additions & 3 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3998,14 +3998,26 @@ std::unique_ptr<IterType> DBImpl::NewMultiCfIterator(
"Different comparators are being used across CFs"));
}
}

std::vector<Iterator*> child_iterators;
Status s = NewIterators(_read_options, column_families, &child_iterators);
if (!s.ok()) {
return error_iterator_func(s);
}
return std::make_unique<ImplType>(
column_families[0]->GetComparator(), _read_options.allow_unprepared_value,
column_families, std::move(child_iterators));

assert(column_families.size() == child_iterators.size());

std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>
cfh_iter_pairs;
cfh_iter_pairs.reserve(column_families.size());
for (size_t i = 0; i < column_families.size(); ++i) {
cfh_iter_pairs.emplace_back(column_families[i],
std::unique_ptr<Iterator>(child_iterators[i]));
}

return std::make_unique<ImplType>(_read_options,
column_families[0]->GetComparator(),
std::move(cfh_iter_pairs));
}

Status DBImpl::NewIterators(
Expand Down
106 changes: 47 additions & 59 deletions db/multi_cf_iterator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,18 @@ struct MultiCfIteratorInfo {
template <typename ResetFunc, typename PopulateFunc>
class MultiCfIteratorImpl {
public:
MultiCfIteratorImpl(const Comparator* comparator, bool allow_unprepared_value,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators,
ResetFunc reset_func, PopulateFunc populate_func)
: comparator_(comparator),
allow_unprepared_value_(allow_unprepared_value),
heap_(MultiCfMinHeap(
MultiCfHeapItemComparator<std::greater<int>>(comparator_))),
MultiCfIteratorImpl(
const ReadOptions& read_options, const Comparator* comparator,
std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>&&
cfh_iter_pairs,
ResetFunc reset_func, PopulateFunc populate_func)
: allow_unprepared_value_(read_options.allow_unprepared_value),
comparator_(comparator),
cfh_iter_pairs_(std::move(cfh_iter_pairs)),
reset_func_(std::move(reset_func)),
populate_func_(std::move(populate_func)) {
assert(column_families.size() > 0 &&
column_families.size() == child_iterators.size());
cfh_iter_pairs_.reserve(column_families.size());
for (size_t i = 0; i < column_families.size(); ++i) {
cfh_iter_pairs_.emplace_back(
column_families[i], std::unique_ptr<Iterator>(child_iterators[i]));
}
}
populate_func_(std::move(populate_func)),
heap_(MultiCfMinHeap(
MultiCfHeapItemComparator<std::greater<int>>(comparator_))) {}
~MultiCfIteratorImpl() { status_.PermitUncheckedError(); }

// No copy allowed
Expand Down Expand Up @@ -108,38 +102,21 @@ class MultiCfIteratorImpl {
return true;
}

auto prepare_value_func = [this](auto& heap, Iterator* iterator) {
assert(iterator);
assert(iterator->Valid());
assert(iterator->status().ok());

if (!iterator->PrepareValue()) {
assert(!iterator->Valid());
assert(!iterator->status().ok());

considerStatus(iterator->status());
assert(!status_.ok());

heap.clear();
return false;
}

return true;
};

if (std::holds_alternative<MultiCfMaxHeap>(heap_)) {
return PopulateIterator(std::get<MultiCfMaxHeap>(heap_),
prepare_value_func);
return PopulateIterator(std::get<MultiCfMaxHeap>(heap_));
}

return PopulateIterator(std::get<MultiCfMinHeap>(heap_),
prepare_value_func);
return PopulateIterator(std::get<MultiCfMinHeap>(heap_));
}

private:
Status status_;
bool allow_unprepared_value_;
const Comparator* comparator_;
std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>
cfh_iter_pairs_;
Status status_;
ResetFunc reset_func_;
PopulateFunc populate_func_;

template <typename CompareOp>
class MultiCfHeapItemComparator {
Expand All @@ -161,9 +138,6 @@ class MultiCfIteratorImpl {
const Comparator* comparator_;
};

const Comparator* comparator_;
bool allow_unprepared_value_;

using MultiCfMinHeap =
BinaryHeap<MultiCfIteratorInfo,
MultiCfHeapItemComparator<std::greater<int>>>;
Expand All @@ -174,9 +148,6 @@ class MultiCfIteratorImpl {

MultiCfIterHeap heap_;

ResetFunc reset_func_;
PopulateFunc populate_func_;

Iterator* current() const {
if (std::holds_alternative<MultiCfMaxHeap>(heap_)) {
auto& max_heap = std::get<MultiCfMaxHeap>(heap_);
Expand Down Expand Up @@ -230,10 +201,8 @@ class MultiCfIteratorImpl {
++i;
}
if (!allow_unprepared_value_ && !heap.empty()) {
[[maybe_unused]] const bool result = PopulateIterator(
heap,
[](auto& /* heap */, Iterator* /* iterator */) { return true; });
assert(result);
[[maybe_unused]] const bool result = PopulateIterator(heap);
assert(result || (!Valid() && !status_.ok()));
}
}

Expand Down Expand Up @@ -300,15 +269,13 @@ class MultiCfIteratorImpl {
}

if (!allow_unprepared_value_ && !heap.empty()) {
[[maybe_unused]] const bool result = PopulateIterator(
heap,
[](auto& /* heap */, Iterator* /* iterator */) { return true; });
assert(result);
[[maybe_unused]] const bool result = PopulateIterator(heap);
assert(result || (!Valid() && !status_.ok()));
}
}

template <typename BinaryHeap, typename PrepareValueFunc>
bool PopulateIterator(BinaryHeap& heap, PrepareValueFunc prepare_value_func) {
template <typename BinaryHeap>
bool PopulateIterator(BinaryHeap& heap) {
// 1. Keep the top iterator (by popping it from the heap) and add it to list
// to populate
// 2. For all non-top iterators having the same key as top iter popped
Expand All @@ -319,12 +286,33 @@ class MultiCfIteratorImpl {
// collected in step 1 and 2 and add all the iters back to the heap
assert(!heap.empty());

auto prepare_value = [this, &heap](Iterator* iterator) {
assert(iterator);
assert(iterator->Valid());
assert(iterator->status().ok());

if (!iterator->PrepareValue()) {
assert(!iterator->Valid());
assert(!iterator->status().ok());

considerStatus(iterator->status());
heap.clear();

assert(!Valid());
assert(!status_.ok());

return false;
}

return true;
};

auto top = heap.top();
assert(top.iterator);
assert(top.iterator->Valid());
assert(top.iterator->status().ok());

if (!prepare_value_func(heap, top.iterator)) {
if (!prepare_value(top.iterator)) {
return false;
}

Expand All @@ -344,7 +332,7 @@ class MultiCfIteratorImpl {
break;
}

if (!prepare_value_func(heap, current.iterator)) {
if (!prepare_value(current.iterator)) {
return false;
}

Expand Down

0 comments on commit 3becc94

Please sign in to comment.