Skip to content

Commit

Permalink
Make callbacks to pass std::chrono::high_resolution_clock::duration i…
Browse files Browse the repository at this point in the history
…nstead

Summary: As said

Differential Revision: D56430332
  • Loading branch information
Daniel Munoz authored and facebook-github-bot committed Apr 22, 2024
1 parent 018f36c commit 08095ee
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 39 deletions.
14 changes: 8 additions & 6 deletions velox/dwio/common/OnDemandUnitLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include "velox/common/base/Exceptions.h"
#include "velox/dwio/common/UnitLoaderTools.h"

using facebook::velox::dwio::common::unit_loader_tools::measureBlockedOnIo;
using facebook::velox::dwio::common::unit_loader_tools::measureWithCallback;

namespace facebook::velox::dwio::common {

Expand All @@ -29,9 +29,10 @@ class OnDemandUnitLoader : public UnitLoader {
public:
OnDemandUnitLoader(
std::vector<std::unique_ptr<LoadUnit>> loadUnits,
std::function<void(uint64_t)> blockedOnIoMsCallback)
std::function<void(const std::chrono::high_resolution_clock::duration&)>
blockedOnIoCallback)
: loadUnits_{std::move(loadUnits)},
blockedOnIoMsCallback_{std::move(blockedOnIoMsCallback)} {}
blockedOnIoCallback_{std::move(blockedOnIoCallback)} {}

~OnDemandUnitLoader() override = default;

Expand All @@ -48,7 +49,7 @@ class OnDemandUnitLoader : public UnitLoader {
}

{
auto measure = measureBlockedOnIo(blockedOnIoMsCallback_);
auto measure = measureWithCallback(blockedOnIoCallback_);
loadUnits_[unit]->load();
}
loadedUnit_ = unit;
Expand All @@ -63,7 +64,8 @@ class OnDemandUnitLoader : public UnitLoader {

private:
std::vector<std::unique_ptr<LoadUnit>> loadUnits_;
std::function<void(uint64_t)> blockedOnIoMsCallback_;
std::function<void(const std::chrono::high_resolution_clock::duration&)>
blockedOnIoCallback_;
std::optional<uint32_t> loadedUnit_;
};

Expand All @@ -72,7 +74,7 @@ class OnDemandUnitLoader : public UnitLoader {
std::unique_ptr<UnitLoader> OnDemandUnitLoaderFactory::create(
std::vector<std::unique_ptr<LoadUnit>> loadUnits) {
return std::make_unique<OnDemandUnitLoader>(
std::move(loadUnits), blockedOnIoMsCallback_);
std::move(loadUnits), blockedOnIoCallback_);
}

} // namespace facebook::velox::dwio::common
9 changes: 6 additions & 3 deletions velox/dwio/common/OnDemandUnitLoader.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <chrono>
#include <functional>

#include "velox/dwio/common/UnitLoader.h"
Expand All @@ -26,16 +27,18 @@ class OnDemandUnitLoaderFactory
: public velox::dwio::common::UnitLoaderFactory {
public:
explicit OnDemandUnitLoaderFactory(
std::function<void(uint64_t)> blockedOnIoMsCallback)
: blockedOnIoMsCallback_{std::move(blockedOnIoMsCallback)} {}
std::function<void(const std::chrono::high_resolution_clock::duration&)>
blockedOnIoCallback)
: blockedOnIoCallback_{std::move(blockedOnIoCallback)} {}
~OnDemandUnitLoaderFactory() override = default;

std::unique_ptr<velox::dwio::common::UnitLoader> create(
std::vector<std::unique_ptr<velox::dwio::common::LoadUnit>> loadUnits)
override;

private:
std::function<void(uint64_t)> blockedOnIoMsCallback_;
std::function<void(const std::chrono::high_resolution_clock::duration&)>
blockedOnIoCallback_;
};

} // namespace facebook::velox::dwio::common
23 changes: 15 additions & 8 deletions velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,10 @@ class RowReaderOptions {
// Function to track how much time we spend waiting on IO before reading rows
// (in dwrf row reader). todo: encapsulate this and keySelectionCallBack_ in a
// struct
std::function<void(uint64_t)> blockedOnIoCallback_;
std::function<void(uint64_t)> decodingTimeUsCallback_;
std::function<void(const std::chrono::high_resolution_clock::duration&)>
blockedOnIoCallback_;
std::function<void(const std::chrono::high_resolution_clock::duration&)>
decodingTimeCallback_;
std::function<void(uint16_t)> stripeCountCallback_;
bool eagerFirstStripeLoad = true;
uint64_t skipRows_ = 0;
Expand Down Expand Up @@ -354,20 +356,25 @@ class RowReaderOptions {
}

void setBlockedOnIoCallback(
std::function<void(int64_t)> blockedOnIoCallback) {
std::function<void(const std::chrono::high_resolution_clock::duration&)>
blockedOnIoCallback) {
blockedOnIoCallback_ = std::move(blockedOnIoCallback);
}

const std::function<void(int64_t)> getBlockedOnIoCallback() const {
const std::function<void(const std::chrono::high_resolution_clock::duration&)>
getBlockedOnIoCallback() const {
return blockedOnIoCallback_;
}

void setDecodingTimeUsCallback(std::function<void(int64_t)> decodingTimeUs) {
decodingTimeUsCallback_ = std::move(decodingTimeUs);
void setDecodingTimeCallback(
std::function<void(const std::chrono::high_resolution_clock::duration&)>
decodingTime) {
decodingTimeCallback_ = std::move(decodingTime);
}

std::function<void(int64_t)> getDecodingTimeUsCallback() const {
return decodingTimeUsCallback_;
std::function<void(const std::chrono::high_resolution_clock::duration&)>
getDecodingTimeCallback() const {
return decodingTimeCallback_;
}

void setStripeCountCallback(
Expand Down
23 changes: 12 additions & 11 deletions velox/dwio/common/UnitLoaderTools.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ namespace facebook::velox::dwio::common::unit_loader_tools {

class Measure {
public:
explicit Measure(const std::function<void(uint64_t)>& blockedOnIoMsCallback)
: blockedOnIoMsCallback_{blockedOnIoMsCallback},
explicit Measure(
const std::function<
void(const std::chrono::high_resolution_clock::duration&)>& callback)
: callback_{callback},
startTime_{std::chrono::high_resolution_clock::now()} {}

Measure(const Measure&) = delete;
Expand All @@ -40,21 +42,20 @@ class Measure {
Measure& operator=(Measure&& other) = delete;

~Measure() {
auto timeBlockedOnIo =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - startTime_);
blockedOnIoMsCallback_(timeBlockedOnIo.count());
callback_(std::chrono::high_resolution_clock::now() - startTime_);
}

private:
const std::function<void(uint64_t)>& blockedOnIoMsCallback_;
const std::function<void(
const std::chrono::high_resolution_clock::duration&)>& callback_;
const std::chrono::time_point<std::chrono::high_resolution_clock> startTime_;
};

inline std::optional<Measure> measureBlockedOnIo(
const std::function<void(uint64_t)>& blockedOnIoMsCallback) {
if (blockedOnIoMsCallback) {
return std::make_optional<Measure>(blockedOnIoMsCallback);
inline std::optional<Measure> measureWithCallback(
const std::function<
void(const std::chrono::high_resolution_clock::duration&)>& callback) {
if (callback) {
return std::make_optional<Measure>(callback);
}
return std::nullopt;
}
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/tests/OnDemandUnitLoaderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ using facebook::velox::dwio::common::test::ReaderMock;

TEST(OnDemandUnitLoaderTests, LoadsCorrectlyWithReader) {
size_t blockedOnIoCount = 0;
OnDemandUnitLoaderFactory factory([&](uint64_t) { ++blockedOnIoCount; });
OnDemandUnitLoaderFactory factory([&](auto) { ++blockedOnIoCount; });
ReaderMock readerMock{{10, 20, 30}, {0, 0, 0}, factory};
EXPECT_EQ(readerMock.unitsLoaded(), std::vector<bool>({false, false, false}));
EXPECT_EQ(blockedOnIoCount, 0);
Expand Down
10 changes: 4 additions & 6 deletions velox/dwio/dwrf/reader/DwrfReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ DwrfRowReader::DwrfRowReader(
: StripeReaderBase(reader),
strideIndex_{0},
options_(opts),
decodingTimeUsCallback_{options_.getDecodingTimeUsCallback()},
decodingTimeCallback_{options_.getDecodingTimeCallback()},
columnSelector_{std::make_shared<ColumnSelector>(
ColumnSelector::apply(opts.getSelector(), reader->getSchema()))},
currentUnit_{nullptr} {
Expand Down Expand Up @@ -480,7 +480,7 @@ void DwrfRowReader::readNext(
VectorPtr& result) {
if (!getSelectiveColumnReader()) {
std::optional<std::chrono::steady_clock::time_point> startTime;
if (decodingTimeUsCallback_) {
if (decodingTimeCallback_) {
// We'll use wall time since we have parallel decoding.
// If we move to sequential decoding only, we can use CPU time.
startTime.emplace(std::chrono::steady_clock::now());
Expand All @@ -492,10 +492,8 @@ void DwrfRowReader::readNext(
"Mutation pushdown is only supported in selective reader");
getColumnReader()->next(rowsToRead, result);
if (startTime.has_value()) {
decodingTimeUsCallback_(
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - startTime.value())
.count());
decodingTimeCallback_(
std::chrono::steady_clock::now() - startTime.value());
}
return;
}
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/dwrf/reader/DwrfReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ class DwrfRowReader : public StrideIndexProvider,
uint64_t rowsInCurrentStripe_;
uint64_t strideIndex_;
dwio::common::RowReaderOptions options_;
std::function<void(uint64_t)> decodingTimeUsCallback_;
std::function<void(const std::chrono::high_resolution_clock::duration&)>
decodingTimeCallback_;

// column selector
std::shared_ptr<dwio::common::ColumnSelector> columnSelector_;
Expand Down
18 changes: 15 additions & 3 deletions velox/dwio/dwrf/test/ReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,11 @@ TEST_F(TestReader, testBlockedIoCallbackFiredBlocking) {
std::optional<uint64_t> metricToIncrement;

rowReaderOpts.setBlockedOnIoCallback(
[&metricToIncrement](uint64_t blockedTimeMs) {
[&metricToIncrement](
std::chrono::high_resolution_clock::duration blockedTime) {
const auto blockedTimeMs =
std::chrono::duration_cast<std::chrono::milliseconds>(blockedTime)
.count();
if (metricToIncrement) {
*metricToIncrement += blockedTimeMs;
} else {
Expand Down Expand Up @@ -722,7 +726,11 @@ TEST_F(TestReader, DISABLED_testBlockedIoCallbackFiredNonBlocking) {
std::optional<uint64_t> metricToIncrement;

rowReaderOpts.setBlockedOnIoCallback(
[&metricToIncrement](uint64_t blockedTimeMs) {
[&metricToIncrement](
std::chrono::high_resolution_clock::duration blockedTime) {
const auto blockedTimeMs =
std::chrono::duration_cast<std::chrono::milliseconds>(blockedTime)
.count();
if (metricToIncrement) {
*metricToIncrement += blockedTimeMs;
} else {
Expand Down Expand Up @@ -766,7 +774,11 @@ TEST_F(TestReader, DISABLED_testBlockedIoCallbackFiredWithFirstStripeLoad) {
std::optional<uint64_t> metricToIncrement;

rowReaderOpts.setBlockedOnIoCallback(
[&metricToIncrement](uint64_t blockedTimeMs) {
[&metricToIncrement](
std::chrono::high_resolution_clock::duration blockedTime) {
const auto blockedTimeMs =
std::chrono::duration_cast<std::chrono::milliseconds>(blockedTime)
.count();
if (metricToIncrement) {
*metricToIncrement += blockedTimeMs;
} else {
Expand Down

0 comments on commit 08095ee

Please sign in to comment.