Skip to content

Commit

Permalink
Calculate BufferedInput::nextFetchSize (facebookincubator#9437)
Browse files Browse the repository at this point in the history
Summary:

This method wasn't implemented and I'll need it soon.

Reviewed By: helfman

Differential Revision: D55991833
  • Loading branch information
Daniel Munoz authored and facebook-github-bot committed Apr 15, 2024
1 parent d796cfc commit af1468c
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 7 deletions.
8 changes: 8 additions & 0 deletions velox/dwio/common/BufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include <fmt/format.h>
#include <numeric>
#include <utility>

#include "folly/io/Cursor.h"
Expand All @@ -36,6 +37,13 @@ void copyIOBufToMemory(folly::IOBuf&& iobuf, folly::Range<char*> allocated) {
}
} // namespace

uint64_t BufferedInput::nextFetchSize() const {
return std::accumulate(
regions_.cbegin(), regions_.cend(), 0L, [](uint64_t a, const Region& b) {
return a + b.length;
});
}

void BufferedInput::load(const LogType logType) {
// no regions to load
if (regions_.size() == 0) {
Expand Down
4 changes: 1 addition & 3 deletions velox/dwio/common/BufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,7 @@ class BufferedInput {
return nullptr;
}

virtual int64_t prefetchSize() const {
return 0;
}
virtual uint64_t nextFetchSize() const;

protected:
std::shared_ptr<ReadFileInputStream> input_;
Expand Down
1 change: 0 additions & 1 deletion velox/dwio/common/CachedBufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ void CachedBufferedInput::makeLoads(
for (auto i = 0; i < allCoalescedLoads_.size(); ++i) {
auto& load = allCoalescedLoads_[i];
if (load->state() == CoalescedLoad::State::kPlanned) {
prefetchSize_ += load->size();
executor_->add([pendingLoad = load]() {
process::TraceContext trace("Read Ahead");
pendingLoad->loadOrFuture(nullptr);
Expand Down
5 changes: 2 additions & 3 deletions velox/dwio/common/CachedBufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ class CachedBufferedInput : public BufferedInput {
return executor_;
}

int64_t prefetchSize() const override {
return prefetchSize_;
uint64_t nextFetchSize() const override {
VELOX_NYI();
}

private:
Expand Down Expand Up @@ -195,7 +195,6 @@ class CachedBufferedInput : public BufferedInput {
std::vector<std::shared_ptr<cache::CoalescedLoad>> allCoalescedLoads_;

const uint64_t fileSize_;
int64_t prefetchSize_{0};
io::ReaderOptions options_;
};

Expand Down
14 changes: 14 additions & 0 deletions velox/dwio/common/tests/TestBufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ TEST_F(TestBufferedInput, ZeroLengthStream) {
std::make_shared<facebook::velox::InMemoryReadFile>(std::string());
BufferedInput input(readFile, *pool_);
auto ret = input.enqueue({0, 0});
EXPECT_EQ(input.nextFetchSize(), 0);
EXPECT_NE(ret, nullptr);
const void* buf = nullptr;
int32_t size = 1;
Expand All @@ -144,6 +145,7 @@ TEST_F(TestBufferedInput, UseRead) {
auto ret = input.enqueue({0, 5});
ASSERT_NE(ret, nullptr);

EXPECT_EQ(input.nextFetchSize(), 5);
input.load(LogType::TEST);

auto next = getNext(*ret);
Expand All @@ -166,6 +168,7 @@ TEST_F(TestBufferedInput, UseVRead) {
auto ret = input.enqueue({0, 5});
ASSERT_NE(ret, nullptr);

EXPECT_EQ(input.nextFetchSize(), 5);
input.load(LogType::TEST);

auto next = getNext(*ret);
Expand Down Expand Up @@ -194,6 +197,7 @@ TEST_F(TestBufferedInput, WillMerge) {
ASSERT_NE(ret1, nullptr);
ASSERT_NE(ret2, nullptr);

EXPECT_EQ(input.nextFetchSize(), 10);
input.load(LogType::TEST);

auto next1 = getNext(*ret1);
Expand Down Expand Up @@ -226,6 +230,7 @@ TEST_F(TestBufferedInput, WontMerge) {
ASSERT_NE(ret1, nullptr);
ASSERT_NE(ret2, nullptr);

EXPECT_EQ(input.nextFetchSize(), 10);
input.load(LogType::TEST);

auto next1 = getNext(*ret1);
Expand Down Expand Up @@ -254,13 +259,16 @@ TEST_F(TestBufferedInput, ReadSorting) {
std::vector<std::pair<std::unique_ptr<SeekableInputStream>, std::string>>
result;
result.reserve(regions.size());
int64_t bytesToRead = 0;
for (auto& region : regions) {
bytesToRead += region.length;
auto ret = input.enqueue(region);
ASSERT_NE(ret, nullptr);
result.push_back(
{std::move(ret), content.substr(region.offset, region.length)});
}

EXPECT_EQ(input.nextFetchSize(), bytesToRead);
input.load(LogType::TEST);

for (auto& r : result) {
Expand Down Expand Up @@ -288,13 +296,16 @@ TEST_F(TestBufferedInput, VReadSorting) {
std::vector<std::pair<std::unique_ptr<SeekableInputStream>, std::string>>
result;
result.reserve(regions.size());
int64_t bytesToRead = 0;
for (auto& region : regions) {
bytesToRead += region.length;
auto ret = input.enqueue(region);
ASSERT_NE(ret, nullptr);
result.push_back(
{std::move(ret), content.substr(region.offset, region.length)});
}

EXPECT_EQ(input.nextFetchSize(), bytesToRead);
input.load(LogType::TEST);

for (auto& r : result) {
Expand Down Expand Up @@ -326,13 +337,16 @@ TEST_F(TestBufferedInput, VReadSortingWithLabels) {
std::vector<std::pair<std::unique_ptr<SeekableInputStream>, std::string>>
result;
result.reserve(regions.size());
int64_t bytesToRead = 0;
for (auto& region : regions) {
bytesToRead += region.length;
auto ret = input.enqueue(region);
ASSERT_NE(ret, nullptr);
result.push_back(
{std::move(ret), content.substr(region.offset, region.length)});
}

EXPECT_EQ(input.nextFetchSize(), bytesToRead);
input.load(LogType::TEST);

for (auto& r : result) {
Expand Down

0 comments on commit af1468c

Please sign in to comment.