Skip to content

Commit

Permalink
Create UnitLoader::onSeek
Browse files Browse the repository at this point in the history
Summary: So loaders can prefetch the stripe.

Differential Revision: D56681720
  • Loading branch information
Daniel Munoz authored and facebook-github-bot committed Apr 29, 2024
1 parent a1706c3 commit db00d79
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 9 deletions.
16 changes: 12 additions & 4 deletions velox/dwio/common/OnDemandUnitLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,18 @@ class OnDemandUnitLoader : public UnitLoader {
return *loadUnits_[unit];
}

void onRead(
uint32_t /* unit */,
uint64_t /* rowOffsetInUnit */,
uint64_t /* rowCount */) override {}
void onRead(uint32_t unit, uint64_t rowOffsetInUnit, uint64_t /* rowCount */)
override {
VELOX_CHECK_LT(unit, loadUnits_.size(), "Unit out of range");
VELOX_CHECK_LT(
rowOffsetInUnit, loadUnits_[unit]->getNumRows(), "Row out of range");
}

void onSeek(uint32_t unit, uint64_t rowOffsetInUnit) override {
VELOX_CHECK_LT(unit, loadUnits_.size(), "Unit out of range");
VELOX_CHECK_LT(
rowOffsetInUnit, loadUnits_[unit]->getNumRows(), "Row out of range");
}

private:
std::vector<std::unique_ptr<LoadUnit>> loadUnits_;
Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/common/UnitLoader.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,13 @@ class UnitLoader {
virtual LoadUnit& getLoadedUnit(uint32_t unit) = 0;

// Reader reports progress calling this method
// The call must be done **after** getLoadedUnit for unit
virtual void
onRead(uint32_t unit, uint64_t rowOffsetInUnit, uint64_t rowCount) = 0;

// Reader reports seek calling this method.
// The call must be done **before** getLoadedUnit for the new unit
virtual void onSeek(uint32_t unit, uint64_t rowOffsetInUnit) = 0;
};

class UnitLoaderFactory {
Expand Down
41 changes: 41 additions & 0 deletions velox/dwio/common/tests/OnDemandUnitLoaderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,47 @@ TEST(OnDemandUnitLoaderTests, LoadsCorrectlyWithNoCallback) {
EXPECT_EQ(readerMock.unitsLoaded(), std::vector<bool>({false, false, true}));
}

TEST(OnDemandUnitLoaderTests, CanSeek) {
size_t blockedOnIoCount = 0;
OnDemandUnitLoaderFactory factory([&](auto) { ++blockedOnIoCount; });
ReaderMock readerMock{{10, 20, 30}, {0, 0, 0}, factory};
EXPECT_EQ(readerMock.unitsLoaded(), std::vector<bool>({false, false, false}));
EXPECT_EQ(blockedOnIoCount, 0);

EXPECT_NO_THROW(readerMock.seek(10););

EXPECT_TRUE(readerMock.read(3)); // Unit: 1, rows: 0-2, load(1)
EXPECT_EQ(readerMock.unitsLoaded(), std::vector<bool>({false, true, false}));
EXPECT_EQ(blockedOnIoCount, 1);

EXPECT_NO_THROW(readerMock.seek(0););

EXPECT_TRUE(readerMock.read(3)); // Unit: 0, rows: 0-2, load(0), unload(1)
EXPECT_EQ(readerMock.unitsLoaded(), std::vector<bool>({true, false, false}));
EXPECT_EQ(blockedOnIoCount, 2);

EXPECT_NO_THROW(readerMock.seek(30););

EXPECT_TRUE(readerMock.read(3)); // Unit: 2, rows: 0-2, load(2), unload(0)
EXPECT_EQ(readerMock.unitsLoaded(), std::vector<bool>({false, false, true}));
EXPECT_EQ(blockedOnIoCount, 3);
}

TEST(OnDemandUnitLoaderTests, SeekOutOfRange) {
size_t blockedOnIoCount = 0;
OnDemandUnitLoaderFactory factory([&](auto) { ++blockedOnIoCount; });
ReaderMock readerMock{{10, 20, 30}, {0, 0, 0}, factory};
EXPECT_EQ(readerMock.unitsLoaded(), std::vector<bool>({false, false, false}));
EXPECT_EQ(blockedOnIoCount, 0);

EXPECT_THAT(
[&]() { readerMock.seek(60); },
Throws<facebook::velox::VeloxRuntimeError>(Property(
&facebook::velox::VeloxRuntimeError::message,
HasSubstr(
"Can't seek to possition 60 in file. Must be less than: 60."))));
}

TEST(OnDemandUnitLoaderTests, UnitOutOfRange) {
OnDemandUnitLoaderFactory factory(nullptr);
std::vector<std::atomic_bool> unitsLoaded(getUnitsLoadedWithFalse(1));
Expand Down
28 changes: 23 additions & 5 deletions velox/dwio/common/tests/utils/UnitLoaderTestTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,26 @@ bool ReaderMock::read(uint64_t maxRows) {
return true;
}

void ReaderMock::seek(uint64_t rowNumber) {
uint64_t totalRows = 0;
uint64_t rowsLeft = rowNumber;
for (size_t unit = 0; unit < rowsPerUnit_.size(); ++unit) {
const uint64_t rowCount = rowsPerUnit_[unit];
if (rowsLeft < rowCount) {
currentUnit_ = unit;
currentRowInUnit_ = rowsLeft;
loader_->onSeek(currentUnit_, currentRowInUnit_);
return;
}
rowsLeft -= rowCount;
totalRows += rowCount;
}
VELOX_FAIL(
"Can't seek to possition {} in file. Must be less than: {}.",
rowNumber,
totalRows);
}

bool ReaderMock::loadUnit() {
VELOX_CHECK(currentRowInUnit_ <= rowsPerUnit_[currentUnit_]);
if (currentRowInUnit_ == rowsPerUnit_[currentUnit_]) {
Expand All @@ -51,11 +71,9 @@ bool ReaderMock::loadUnit() {
return false;
}
}
if (currentRowInUnit_ == 0) {
auto& unit = loader_->getLoadedUnit(currentUnit_);
auto& unitMock = dynamic_cast<LoadUnitMock&>(unit);
VELOX_CHECK(unitMock.isLoaded());
}
auto& unit = loader_->getLoadedUnit(currentUnit_);
auto& unitMock = dynamic_cast<LoadUnitMock&>(unit);
VELOX_CHECK(unitMock.isLoaded());
return true;
}

Expand Down
2 changes: 2 additions & 0 deletions velox/dwio/common/tests/utils/UnitLoaderTestTools.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class ReaderMock {

bool read(uint64_t maxRows);

void seek(uint64_t rowNumber);

std::vector<bool> unitsLoaded() const {
return {unitsLoaded_.begin(), unitsLoaded_.end()};
}
Expand Down
3 changes: 3 additions & 0 deletions velox/dwio/dwrf/reader/DwrfReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ uint64_t DwrfRowReader::seekToRow(uint64_t rowNumber) {
currentRowInStripe_ = rowNumber - firstRowOfStripe_[currentStripe_];
previousRow_ = rowNumber;

const auto loadUnitIdx = currentStripe_ - firstStripe_;
unitLoader_->onSeek(loadUnitIdx, currentRowInStripe_);

if (currentStripe_ != previousStripe) {
// Different stripe. Let's load the new stripe.
currentUnit_ = nullptr;
Expand Down

0 comments on commit db00d79

Please sign in to comment.