Skip to content

Commit

Permalink
Create Budget utility class
Browse files Browse the repository at this point in the history
Summary:
Create a Budget class to allow threads to block until there's budget available.

This is though to be used as memory budget, but could be any kind of budget.

Differential Revision: D56379447
  • Loading branch information
munozdaniel authored and facebook-github-bot committed Apr 23, 2024
1 parent 490e25d commit a950c53
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 0 deletions.
99 changes: 99 additions & 0 deletions velox/dwio/common/Budget.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <condition_variable>
#include <functional>
#include <limits>
#include <mutex>
#include "velox/common/base/Exceptions.h"
#include "velox/dwio/common/MeasureTime.h"
#include "velox/dwio/common/UnitLoaderTools.h"

namespace facebook::velox::dwio::common {

// This class allows threads to block until enough memory budget for their
// request is available.
class Budget {
public:
explicit Budget(std::function<uint64_t()> budgetCallback)
: budget_{budgetCallback()},
availableBudget_{asSigned(budget_)},
budgetCallback_{std::move(budgetCallback)} {}

std::optional<std::chrono::high_resolution_clock::duration> waitForBudget(
uint64_t requiredBudget) {
const auto reqBudget = asSigned(requiredBudget);
auto lock = std::unique_lock(mutex_);
if (hasBudget(reqBudget)) {
useBudget(reqBudget);
return std::nullopt;
}
const std::chrono::time_point<std::chrono::high_resolution_clock>
startTime = std::chrono::high_resolution_clock::now();
// Wait until budget is available because another thread released it. Also
// check every second if the total budget didn't change (so there may be
// budget available).
while (!hasBudget(reqBudget)) {
cv_.wait_for(lock, std::chrono::seconds(1));
}
useBudget(reqBudget);
return std::chrono::high_resolution_clock::now() - startTime;
}

void releaseBudget(uint64_t releasedBudget) {
const auto relBudget = asSigned(releasedBudget);
auto lock = std::unique_lock(mutex_);
availableBudget_ += relBudget;
cv_.notify_all();
}

private:
int64_t asSigned(uint64_t value) const {
VELOX_CHECK(value <= std::numeric_limits<int64_t>::max());
return static_cast<int64_t>(value);
}

void useBudget(int64_t requiredBudget) {
availableBudget_ -= requiredBudget;
}

bool hasBudget(int64_t requiredBudget) {
return updateBudget() >= requiredBudget;
}

int64_t updateBudget() {
const auto newBudget = asSigned(budgetCallback_());
if (newBudget != budget_) {
availableBudget_ += newBudget - budget_;
budget_ = newBudget;
}
return availableBudget_;
}

// The budget can only be > 0, but when we set a new budget, availableBudget_
// can go negative, if the budget is already being used and there isn't enough
// left.
uint64_t budget_;
int64_t availableBudget_;
std::function<uint64_t()> budgetCallback_;
std::condition_variable cv_;
std::mutex mutex_;
std::chrono::high_resolution_clock::duration lastDuration_;
};

} // namespace facebook::velox::dwio::common
145 changes: 145 additions & 0 deletions velox/dwio/common/tests/BudgetTests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include <atomic>
#include <chrono>
#include <mutex>

#include "folly/Synchronized.h"
#include "folly/executors/CPUThreadPoolExecutor.h"
#include "velox/dwio/common/Budget.h"
#include "velox/dwio/common/ExecutorBarrier.h"

using namespace ::testing;
using namespace ::facebook::velox::dwio::common;

TEST(BudgetTests, CanUseZero) {
Budget budget([&]() { return 0; });
EXPECT_FALSE(budget.waitForBudget(0).has_value());
}

TEST(BudgetTests, CanUseHalf) {
Budget budget([&]() { return 10; });
EXPECT_FALSE(budget.waitForBudget(5).has_value());
}

TEST(BudgetTests, CanUseAll) {
Budget budget([&]() { return 10; });
EXPECT_FALSE(budget.waitForBudget(10).has_value());
}

TEST(BudgetTests, TwoCanUse) {
Budget budget([&]() { return 10; });
EXPECT_FALSE(budget.waitForBudget(1).has_value());
EXPECT_FALSE(budget.waitForBudget(1).has_value());
}

TEST(BudgetTests, TwoCanUseAll) {
Budget budget([&]() { return 10; });
EXPECT_FALSE(budget.waitForBudget(5).has_value());
EXPECT_FALSE(budget.waitForBudget(5).has_value());
}

TEST(BudgetTests, WillRefresh) {
uint64_t budgetVar = 0;
Budget budget([&]() { return budgetVar; });
budgetVar = 1;
EXPECT_FALSE(budget.waitForBudget(1).has_value());
budgetVar += 1;
EXPECT_FALSE(budget.waitForBudget(1).has_value());
budgetVar += 1;
EXPECT_FALSE(budget.waitForBudget(1).has_value());
budgetVar += 1;
EXPECT_FALSE(budget.waitForBudget(1).has_value());
budgetVar += 1;
EXPECT_FALSE(budget.waitForBudget(1).has_value());
budgetVar += 1;
EXPECT_FALSE(budget.waitForBudget(1).has_value());
budget.releaseBudget(6);
EXPECT_FALSE(budget.waitForBudget(6).has_value());
}

TEST(BudgetTests, WillUnblock) {
folly::CPUThreadPoolExecutor executor(1);
ExecutorBarrier barrier(executor);
std::atomic_uint64_t budgetVar = 0;
Budget budget([&]() { return budgetVar.load(); });
barrier.add([&]() { EXPECT_TRUE(budget.waitForBudget(1).has_value()); });
std::this_thread::sleep_for(std::chrono::milliseconds(100));
budgetVar = 1;
barrier.waitAll();
}

TEST(BudgetTests, OutOfLimits) {
uint64_t budgetVar = std::numeric_limits<uint64_t>::max() - 1;
auto budgetBig = [&]() { return budgetVar; };
EXPECT_THAT(
[&]() { Budget budget(budgetBig); },
Throws<facebook::velox::VeloxRuntimeError>(Property(
&facebook::velox::VeloxRuntimeError::failingExpression,
HasSubstr("value <= std::numeric_limits<int64_t>::max()"))));

budgetVar = 1;
Budget budget(budgetBig);
budgetVar = std::numeric_limits<uint64_t>::max() - 1;
EXPECT_THAT(
[&]() { budget.waitForBudget(1); },
Throws<facebook::velox::VeloxRuntimeError>(Property(
&facebook::velox::VeloxRuntimeError::failingExpression,
HasSubstr("value <= std::numeric_limits<int64_t>::max()"))));
}

TEST(BudgetTests, ContentionForBudget) {
std::atomic_uint64_t simultaneous = 0;
std::atomic_uint64_t maxSimultaneous = 0;
std::atomic_uint64_t timesDidntWaitForBudget = 0;
folly::CPUThreadPoolExecutor executor(3);
ExecutorBarrier barrier(executor);
Budget budget([]() { return 2; });
folly::Synchronized<std::chrono::high_resolution_clock::duration>
timeWaitingForBudget;
// I tested with i = [0 1] and it already achieved maxSimultaneous = 2. So
// setting 100 should be enough for this test not to be flaky, and to try to
// run 3 simultaneously if it were possible (if there's a bug in the budget).
for (int i = 0; i < 100; ++i) {
barrier.add([&]() {
auto timeWaiting = budget.waitForBudget(1);
++simultaneous;
if (timeWaiting.has_value()) {
timeWaitingForBudget.withWLock([&](auto& timeWaitingForBudget) {
timeWaitingForBudget += timeWaiting.value();
});
} else {
timesDidntWaitForBudget += 1;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
maxSimultaneous = std::max(maxSimultaneous.load(), simultaneous.load());
--simultaneous;
budget.releaseBudget(1);
});
}
barrier.waitAll();
EXPECT_EQ(maxSimultaneous.load(), 2);
EXPECT_GT(
std::chrono::duration_cast<std::chrono::milliseconds>(
*timeWaitingForBudget.rlock())
.count(),
0);
EXPECT_GT(timesDidntWaitForBudget.load(), 0);
}
1 change: 1 addition & 0 deletions velox/dwio/common/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ add_executable(
velox_dwio_common_test
BitConcatenationTest.cpp
BitPackDecoderTest.cpp
BudgetTests.cpp
ChainedBufferTests.cpp
ColumnSelectorTests.cpp
DataBufferTests.cpp
Expand Down

0 comments on commit a950c53

Please sign in to comment.