From af1468cf82d1679f55a2bd983b908b76c22c8a6d Mon Sep 17 00:00:00 2001 From: Daniel Munoz Date: Mon, 15 Apr 2024 13:13:17 -0700 Subject: [PATCH] Calculate BufferedInput::nextFetchSize (#9437) Summary: This method wasn't implemented and I'll need it soon. Reviewed By: helfman Differential Revision: D55991833 --- velox/dwio/common/BufferedInput.cpp | 8 ++++++++ velox/dwio/common/BufferedInput.h | 4 +--- velox/dwio/common/CachedBufferedInput.cpp | 1 - velox/dwio/common/CachedBufferedInput.h | 5 ++--- velox/dwio/common/tests/TestBufferedInput.cpp | 14 ++++++++++++++ 5 files changed, 25 insertions(+), 7 deletions(-) diff --git a/velox/dwio/common/BufferedInput.cpp b/velox/dwio/common/BufferedInput.cpp index ed75fcb96d5e5..5f3edfb53671e 100644 --- a/velox/dwio/common/BufferedInput.cpp +++ b/velox/dwio/common/BufferedInput.cpp @@ -15,6 +15,7 @@ */ #include +#include #include #include "folly/io/Cursor.h" @@ -36,6 +37,13 @@ void copyIOBufToMemory(folly::IOBuf&& iobuf, folly::Range allocated) { } } // namespace +uint64_t BufferedInput::nextFetchSize() const { + return std::accumulate( + regions_.cbegin(), regions_.cend(), 0L, [](uint64_t a, const Region& b) { + return a + b.length; + }); +} + void BufferedInput::load(const LogType logType) { // no regions to load if (regions_.size() == 0) { diff --git a/velox/dwio/common/BufferedInput.h b/velox/dwio/common/BufferedInput.h index 2e21a199c286c..dc5200898cda4 100644 --- a/velox/dwio/common/BufferedInput.h +++ b/velox/dwio/common/BufferedInput.h @@ -141,9 +141,7 @@ class BufferedInput { return nullptr; } - virtual int64_t prefetchSize() const { - return 0; - } + virtual uint64_t nextFetchSize() const; protected: std::shared_ptr input_; diff --git a/velox/dwio/common/CachedBufferedInput.cpp b/velox/dwio/common/CachedBufferedInput.cpp index b3414793b2773..43593d07b71b8 100644 --- a/velox/dwio/common/CachedBufferedInput.cpp +++ b/velox/dwio/common/CachedBufferedInput.cpp @@ -271,7 +271,6 @@ void CachedBufferedInput::makeLoads( for (auto i = 0; i < allCoalescedLoads_.size(); ++i) { auto& load = allCoalescedLoads_[i]; if (load->state() == CoalescedLoad::State::kPlanned) { - prefetchSize_ += load->size(); executor_->add([pendingLoad = load]() { process::TraceContext trace("Read Ahead"); pendingLoad->loadOrFuture(nullptr); diff --git a/velox/dwio/common/CachedBufferedInput.h b/velox/dwio/common/CachedBufferedInput.h index ceaad5ca9c012..50be89b289e1a 100644 --- a/velox/dwio/common/CachedBufferedInput.h +++ b/velox/dwio/common/CachedBufferedInput.h @@ -159,8 +159,8 @@ class CachedBufferedInput : public BufferedInput { return executor_; } - int64_t prefetchSize() const override { - return prefetchSize_; + uint64_t nextFetchSize() const override { + VELOX_NYI(); } private: @@ -195,7 +195,6 @@ class CachedBufferedInput : public BufferedInput { std::vector> allCoalescedLoads_; const uint64_t fileSize_; - int64_t prefetchSize_{0}; io::ReaderOptions options_; }; diff --git a/velox/dwio/common/tests/TestBufferedInput.cpp b/velox/dwio/common/tests/TestBufferedInput.cpp index 38ce6efb38829..c78940bd356a1 100644 --- a/velox/dwio/common/tests/TestBufferedInput.cpp +++ b/velox/dwio/common/tests/TestBufferedInput.cpp @@ -122,6 +122,7 @@ TEST_F(TestBufferedInput, ZeroLengthStream) { std::make_shared(std::string()); BufferedInput input(readFile, *pool_); auto ret = input.enqueue({0, 0}); + EXPECT_EQ(input.nextFetchSize(), 0); EXPECT_NE(ret, nullptr); const void* buf = nullptr; int32_t size = 1; @@ -144,6 +145,7 @@ TEST_F(TestBufferedInput, UseRead) { auto ret = input.enqueue({0, 5}); ASSERT_NE(ret, nullptr); + EXPECT_EQ(input.nextFetchSize(), 5); input.load(LogType::TEST); auto next = getNext(*ret); @@ -166,6 +168,7 @@ TEST_F(TestBufferedInput, UseVRead) { auto ret = input.enqueue({0, 5}); ASSERT_NE(ret, nullptr); + EXPECT_EQ(input.nextFetchSize(), 5); input.load(LogType::TEST); auto next = getNext(*ret); @@ -194,6 +197,7 @@ TEST_F(TestBufferedInput, WillMerge) { ASSERT_NE(ret1, nullptr); ASSERT_NE(ret2, nullptr); + EXPECT_EQ(input.nextFetchSize(), 10); input.load(LogType::TEST); auto next1 = getNext(*ret1); @@ -226,6 +230,7 @@ TEST_F(TestBufferedInput, WontMerge) { ASSERT_NE(ret1, nullptr); ASSERT_NE(ret2, nullptr); + EXPECT_EQ(input.nextFetchSize(), 10); input.load(LogType::TEST); auto next1 = getNext(*ret1); @@ -254,13 +259,16 @@ TEST_F(TestBufferedInput, ReadSorting) { std::vector, std::string>> result; result.reserve(regions.size()); + int64_t bytesToRead = 0; for (auto& region : regions) { + bytesToRead += region.length; auto ret = input.enqueue(region); ASSERT_NE(ret, nullptr); result.push_back( {std::move(ret), content.substr(region.offset, region.length)}); } + EXPECT_EQ(input.nextFetchSize(), bytesToRead); input.load(LogType::TEST); for (auto& r : result) { @@ -288,13 +296,16 @@ TEST_F(TestBufferedInput, VReadSorting) { std::vector, std::string>> result; result.reserve(regions.size()); + int64_t bytesToRead = 0; for (auto& region : regions) { + bytesToRead += region.length; auto ret = input.enqueue(region); ASSERT_NE(ret, nullptr); result.push_back( {std::move(ret), content.substr(region.offset, region.length)}); } + EXPECT_EQ(input.nextFetchSize(), bytesToRead); input.load(LogType::TEST); for (auto& r : result) { @@ -326,13 +337,16 @@ TEST_F(TestBufferedInput, VReadSortingWithLabels) { std::vector, std::string>> result; result.reserve(regions.size()); + int64_t bytesToRead = 0; for (auto& region : regions) { + bytesToRead += region.length; auto ret = input.enqueue(region); ASSERT_NE(ret, nullptr); result.push_back( {std::move(ret), content.substr(region.offset, region.length)}); } + EXPECT_EQ(input.nextFetchSize(), bytesToRead); input.load(LogType::TEST); for (auto& r : result) {