Skip to content

Commit

Permalink
Create Budget utility class (facebookincubator#9553)
Browse files Browse the repository at this point in the history
Summary:

Create a Budget class to allow threads to block until there's budget available.

This is though to be used as memory budget, but could be any kind of budget.

Differential Revision: D56379447
  • Loading branch information
Daniel Munoz authored and facebook-github-bot committed Apr 22, 2024
1 parent 019c8ad commit 28de417
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 0 deletions.
98 changes: 98 additions & 0 deletions velox/dwio/common/Budget.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <condition_variable>
#include <limits>
#include <mutex>
#include "velox/common/base/Exceptions.h"
#include "velox/dwio/common/MeasureTime.h"
#include "velox/dwio/common/UnitLoaderTools.h"

namespace facebook::velox::dwio::common {

// This class allows threads to block until enough memory budget for their
// request is available.
class Budget {
public:
explicit Budget(uint64_t budget)
: budget_{budget}, availableBudget_{asSigned(budget)} {}

void increaseBudget(int64_t delta) {
auto lock = std::scoped_lock(mutex_);
availableBudget_ += delta;
budget_ += delta;
cv_.notify_all();
}

void setBudget(uint64_t newValue) {
const auto value = asSigned(newValue);
auto lock = std::scoped_lock(mutex_);
availableBudget_ += value - budget_;
budget_ = newValue;
cv_.notify_all();
}

uint64_t getBudget() {
auto lock = std::scoped_lock(mutex_);
return budget_;
}

int64_t getAvailableBudget() {
auto lock = std::scoped_lock(mutex_);
return availableBudget_;
}

std::optional<std::chrono::high_resolution_clock::duration> waitForBudget(
uint64_t requiredBudget) {
const auto reqBudget = asSigned(requiredBudget);
auto lock = std::unique_lock(mutex_);
if (availableBudget_ >= reqBudget) {
availableBudget_ -= reqBudget;
return std::nullopt;
}
const std::chrono::time_point<std::chrono::high_resolution_clock>
startTime = std::chrono::high_resolution_clock::now();
cv_.wait(lock, [&]() { return availableBudget_ >= reqBudget; });
availableBudget_ -= reqBudget;
return std::chrono::high_resolution_clock::now() - startTime;
}

void releaseBudget(uint64_t releasedBudget) {
const auto relBudget = asSigned(releasedBudget);
auto lock = std::unique_lock(mutex_);
availableBudget_ += relBudget;
cv_.notify_all();
}

private:
int64_t asSigned(uint64_t value) const {
VELOX_CHECK(value <= std::numeric_limits<int64_t>::max());
return static_cast<int64_t>(value);
}

// The budget can only be > 0, but when we set a new budget, availableBudget_
// can go negative, if the budget is already being used and there isn't enough
// left.
uint64_t budget_;
int64_t availableBudget_;
std::condition_variable cv_;
std::mutex mutex_;
std::chrono::high_resolution_clock::duration lastDuration_;
};

} // namespace facebook::velox::dwio::common
103 changes: 103 additions & 0 deletions velox/dwio/common/tests/BudgetTests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include <atomic>
#include <chrono>
#include <mutex>

#include "folly/Synchronized.h"
#include "folly/executors/CPUThreadPoolExecutor.h"
#include "velox/dwio/common/Budget.h"
#include "velox/dwio/common/ExecutorBarrier.h"

using namespace ::testing;
using namespace ::facebook::velox::dwio::common;

TEST(BudgetTests, SetBudget) {
Budget budget(10);
EXPECT_EQ(budget.getBudget(), 10);
EXPECT_EQ(budget.getAvailableBudget(), 10);
budget.waitForBudget(3);
EXPECT_EQ(budget.getBudget(), 10);
EXPECT_EQ(budget.getAvailableBudget(), 7);
budget.setBudget(5);
EXPECT_EQ(budget.getBudget(), 5);
EXPECT_EQ(budget.getAvailableBudget(), 2);
budget.releaseBudget(2);
EXPECT_EQ(budget.getBudget(), 5);
EXPECT_EQ(budget.getAvailableBudget(), 4);
budget.increaseBudget(4);
EXPECT_EQ(budget.getBudget(), 9);
EXPECT_EQ(budget.getAvailableBudget(), 8);
}

TEST(BudgetTests, OutOfLimits) {
uint64_t budgetBig = std::numeric_limits<uint64_t>::max() - 1;
EXPECT_THAT(
[&]() { Budget budget(budgetBig); },
Throws<facebook::velox::VeloxRuntimeError>(Property(
&facebook::velox::VeloxRuntimeError::failingExpression,
HasSubstr("value <= std::numeric_limits<int64_t>::max()"))));

Budget budget(1);
EXPECT_THAT(
[&]() { budget.setBudget(budgetBig); },
Throws<facebook::velox::VeloxRuntimeError>(Property(
&facebook::velox::VeloxRuntimeError::failingExpression,
HasSubstr("value <= std::numeric_limits<int64_t>::max()"))));
}

TEST(BudgetTests, ContentionForBudget) {
std::atomic_uint64_t simultaneous = 0;
std::atomic_uint64_t maxSimultaneous = 0;
std::atomic_uint64_t timesDidntWaitForBudget = 0;
folly::CPUThreadPoolExecutor executor(3);
ExecutorBarrier barrier(executor);
Budget budget(2);
folly::Synchronized<std::chrono::high_resolution_clock::duration>
timeWaitingForBudget;
// I tested with i = [0 1] and it already achieved maxSimultaneous = 2. So
// setting 100 should be enough for this test not to be flaky, and to try to
// run 3 simultaneously if it were possible (if there's a bug in the budget).
for (int i = 0; i < 100; ++i) {
barrier.add([&]() {
auto timeWaiting = budget.waitForBudget(1);
++simultaneous;
if (timeWaiting.has_value()) {
timeWaitingForBudget.withWLock([&](auto& timeWaitingForBudget) {
timeWaitingForBudget += timeWaiting.value();
});
} else {
timesDidntWaitForBudget += 1;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
maxSimultaneous = std::max(maxSimultaneous.load(), simultaneous.load());
--simultaneous;
budget.releaseBudget(1);
});
}
barrier.waitAll();
EXPECT_EQ(maxSimultaneous.load(), 2);
EXPECT_GT(
std::chrono::duration_cast<std::chrono::milliseconds>(
*timeWaitingForBudget.rlock())
.count(),
0);
EXPECT_GT(timesDidntWaitForBudget.load(), 0);
}
1 change: 1 addition & 0 deletions velox/dwio/common/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ add_executable(
velox_dwio_common_test
BitConcatenationTest.cpp
BitPackDecoderTest.cpp
BudgetTests.cpp
ChainedBufferTests.cpp
ColumnSelectorTests.cpp
DataBufferTests.cpp
Expand Down

0 comments on commit 28de417

Please sign in to comment.