diff --git a/velox/dwio/common/Budget.h b/velox/dwio/common/Budget.h new file mode 100644 index 0000000000000..b335e36e0f1c6 --- /dev/null +++ b/velox/dwio/common/Budget.h @@ -0,0 +1,98 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include "velox/common/base/Exceptions.h" +#include "velox/dwio/common/MeasureTime.h" +#include "velox/dwio/common/UnitLoaderTools.h" + +namespace facebook::velox::dwio::common { + +// This class allows threads to block until enough memory budget for their +// request is available. +class Budget { + public: + explicit Budget(uint64_t budget) + : budget_{budget}, availableBudget_{asSigned(budget)} {} + + void increaseBudget(int64_t delta) { + auto lock = std::scoped_lock(mutex_); + availableBudget_ += delta; + budget_ += delta; + cv_.notify_all(); + } + + void setBudget(uint64_t newValue) { + const auto value = asSigned(newValue); + auto lock = std::scoped_lock(mutex_); + availableBudget_ += value - budget_; + budget_ = newValue; + cv_.notify_all(); + } + + uint64_t getBudget() { + auto lock = std::scoped_lock(mutex_); + return budget_; + } + + int64_t getAvailableBudget() { + auto lock = std::scoped_lock(mutex_); + return availableBudget_; + } + + std::optional waitForBudget( + uint64_t requiredBudget) { + const auto reqBudget = asSigned(requiredBudget); + auto lock = std::unique_lock(mutex_); + if (availableBudget_ >= reqBudget) { + availableBudget_ -= reqBudget; + return std::nullopt; + } + const std::chrono::time_point + startTime = std::chrono::high_resolution_clock::now(); + cv_.wait(lock, [&]() { return availableBudget_ >= reqBudget; }); + availableBudget_ -= reqBudget; + return std::chrono::high_resolution_clock::now() - startTime; + } + + void releaseBudget(uint64_t releasedBudget) { + const auto relBudget = asSigned(releasedBudget); + auto lock = std::unique_lock(mutex_); + availableBudget_ += relBudget; + cv_.notify_all(); + } + + private: + int64_t asSigned(uint64_t value) const { + VELOX_CHECK(value <= std::numeric_limits::max()); + return static_cast(value); + } + + // The budget can only be > 0, but when we set a new budget, availableBudget_ + // can go negative, if the budget is already being used and there isn't enough + // left. + uint64_t budget_; + int64_t availableBudget_; + std::condition_variable cv_; + std::mutex mutex_; + std::chrono::high_resolution_clock::duration lastDuration_; +}; + +} // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/tests/BudgetTests.cpp b/velox/dwio/common/tests/BudgetTests.cpp new file mode 100644 index 0000000000000..3cfa6fec3b6ba --- /dev/null +++ b/velox/dwio/common/tests/BudgetTests.cpp @@ -0,0 +1,103 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include + +#include "folly/Synchronized.h" +#include "folly/executors/CPUThreadPoolExecutor.h" +#include "velox/dwio/common/Budget.h" +#include "velox/dwio/common/ExecutorBarrier.h" + +using namespace ::testing; +using namespace ::facebook::velox::dwio::common; + +TEST(BudgetTests, SetBudget) { + Budget budget(10); + EXPECT_EQ(budget.getBudget(), 10); + EXPECT_EQ(budget.getAvailableBudget(), 10); + budget.waitForBudget(3); + EXPECT_EQ(budget.getBudget(), 10); + EXPECT_EQ(budget.getAvailableBudget(), 7); + budget.setBudget(5); + EXPECT_EQ(budget.getBudget(), 5); + EXPECT_EQ(budget.getAvailableBudget(), 2); + budget.releaseBudget(2); + EXPECT_EQ(budget.getBudget(), 5); + EXPECT_EQ(budget.getAvailableBudget(), 4); + budget.increaseBudget(4); + EXPECT_EQ(budget.getBudget(), 9); + EXPECT_EQ(budget.getAvailableBudget(), 8); +} + +TEST(BudgetTests, OutOfLimits) { + uint64_t budgetBig = std::numeric_limits::max() - 1; + EXPECT_THAT( + [&]() { Budget budget(budgetBig); }, + Throws(Property( + &facebook::velox::VeloxRuntimeError::failingExpression, + HasSubstr("value <= std::numeric_limits::max()")))); + + Budget budget(1); + EXPECT_THAT( + [&]() { budget.setBudget(budgetBig); }, + Throws(Property( + &facebook::velox::VeloxRuntimeError::failingExpression, + HasSubstr("value <= std::numeric_limits::max()")))); +} + +TEST(BudgetTests, ContentionForBudget) { + std::atomic_uint64_t simultaneous = 0; + std::atomic_uint64_t maxSimultaneous = 0; + std::atomic_uint64_t timesDidntWaitForBudget = 0; + folly::CPUThreadPoolExecutor executor(3); + ExecutorBarrier barrier(executor); + Budget budget(2); + folly::Synchronized + timeWaitingForBudget; + // I tested with i = [0 1] and it already achieved maxSimultaneous = 2. So + // setting 100 should be enough for this test not to be flaky, and to try to + // run 3 simultaneously if it were possible (if there's a bug in the budget). + for (int i = 0; i < 100; ++i) { + barrier.add([&]() { + auto timeWaiting = budget.waitForBudget(1); + ++simultaneous; + if (timeWaiting.has_value()) { + timeWaitingForBudget.withWLock([&](auto& timeWaitingForBudget) { + timeWaitingForBudget += timeWaiting.value(); + }); + } else { + timesDidntWaitForBudget += 1; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + maxSimultaneous = std::max(maxSimultaneous.load(), simultaneous.load()); + --simultaneous; + budget.releaseBudget(1); + }); + } + barrier.waitAll(); + EXPECT_EQ(maxSimultaneous.load(), 2); + EXPECT_GT( + std::chrono::duration_cast( + *timeWaitingForBudget.rlock()) + .count(), + 0); + EXPECT_GT(timesDidntWaitForBudget.load(), 0); +} diff --git a/velox/dwio/common/tests/CMakeLists.txt b/velox/dwio/common/tests/CMakeLists.txt index e550c10989d16..2c600c2fa9486 100644 --- a/velox/dwio/common/tests/CMakeLists.txt +++ b/velox/dwio/common/tests/CMakeLists.txt @@ -18,6 +18,7 @@ add_executable( velox_dwio_common_test BitConcatenationTest.cpp BitPackDecoderTest.cpp + BudgetTests.cpp ChainedBufferTests.cpp ColumnSelectorTests.cpp DataBufferTests.cpp