Skip to content

Commit

Permalink
More changes related to review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
martun committed Jan 30, 2024
1 parent 63fae20 commit 8828e2d
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ namespace nil {

// Here we can parallelize on the both loops with 'k' and 'm', because for each value of k and m
// the ranges of array 'a' used do not intersect. Think of these 2 loops as 1.
wait_for_all(ThreadPool::get_instance(ThreadPool::PoolID::LOW_LEVEL_POOL_ID).block_execution<void>(
wait_for_all(ThreadPool::get_instance(ThreadPool::PoolLevel::LOW).block_execution<void>(
m * count_k,
[&a, m, count_k, &w_m](std::size_t begin, std::size_t end) {
size_t current_index = begin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ namespace nil {
template<class InputIt1, class InputIt2, class OutputIt, class BinaryOperation>
void parallel_transform(InputIt1 first1, InputIt1 last1, InputIt2 first2,
OutputIt d_first, BinaryOperation binary_op,
ThreadPool::PoolID pool_id = ThreadPool::PoolID::LOW_LEVEL_POOL_ID) {
ThreadPool::PoolLevel pool_id = ThreadPool::PoolLevel::LOW) {

wait_for_all(ThreadPool::get_instance(pool_id).block_execution<void>(
std::distance(first1, last1),
Expand All @@ -65,7 +65,7 @@ namespace nil {
template< class InputIt, class OutputIt, class UnaryOperation >
void parallel_transform(InputIt first1, InputIt last1,
OutputIt d_first, UnaryOperation unary_op,
ThreadPool::PoolID pool_id = ThreadPool::PoolID::LOW_LEVEL_POOL_ID) {
ThreadPool::PoolLevel pool_id = ThreadPool::PoolLevel::LOW) {

wait_for_all(ThreadPool::get_instance(pool_id).block_execution<void>(
std::distance(first1, last1),
Expand All @@ -86,7 +86,7 @@ namespace nil {
template<class InputIt1, class InputIt2, class BinaryOperation>
void in_place_parallel_transform(InputIt1 first1, InputIt1 last1, InputIt2 first2,
BinaryOperation binary_op,
ThreadPool::PoolID pool_id = ThreadPool::PoolID::LOW_LEVEL_POOL_ID) {
ThreadPool::PoolLevel pool_id = ThreadPool::PoolLevel::LOW) {

wait_for_all(ThreadPool::get_instance(pool_id).block_execution<void>(
std::distance(first1, last1),
Expand All @@ -106,7 +106,7 @@ namespace nil {
// UnaryOperation is supposed to modify the object in-place.
template<class InputIt, class UnaryOperation>
void parallel_foreach(InputIt first1, InputIt last1, UnaryOperation unary_op,
ThreadPool::PoolID pool_id = ThreadPool::PoolID::LOW_LEVEL_POOL_ID) {
ThreadPool::PoolLevel pool_id = ThreadPool::PoolLevel::LOW) {

wait_for_all(ThreadPool::get_instance(pool_id).block_execution<void>(
std::distance(first1, last1),
Expand All @@ -122,7 +122,7 @@ namespace nil {

// Calls function func for each value between [start, end).
void parallel_for(std::size_t start, std::size_t end, std::function<void(std::size_t index)> func,
ThreadPool::PoolID pool_id = ThreadPool::PoolID::LOW_LEVEL_POOL_ID) {
ThreadPool::PoolLevel pool_id = ThreadPool::PoolLevel::LOW) {

wait_for_all(ThreadPool::get_instance(pool_id).block_execution<void>(
end - start,
Expand Down
41 changes: 20 additions & 21 deletions include/nil/crypto3/math/multithreading/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,22 @@ namespace nil {
class ThreadPool {
public:

enum class PoolID {
LOW_LEVEL_POOL_ID,
HIGH_LEVEL_POOL_ID
enum class PoolLevel {
LOW,
HIGH
};

/** Returns a thread pool, based on the pool_id. pool with LOW_LEVEL_POOL_ID is normally used for low-level operations, like polynomial
* operations and fft. Any code that uses these operations and needs to be parallel will submit it's tasks to pool with HIGH_LEVEL_POOL_ID.
/** Returns a thread pool, based on the pool_id. pool with LOW is normally used for low-level operations, like polynomial
* operations and fft. Any code that uses these operations and needs to be parallel will submit its tasks to pool with HIGH.
* Submission of higher level tasks to low level pool will immediately result to a deadlock.
*/
static ThreadPool& get_instance(PoolID pool_id, std::size_t pool_size = std::thread::hardware_concurrency()) {
static ThreadPool instance_for_low_level(PoolID::LOW_LEVEL_POOL_ID, pool_size);
static ThreadPool instance_for_higher_level(PoolID::HIGH_LEVEL_POOL_ID, pool_size);
static ThreadPool& get_instance(PoolLevel pool_id, std::size_t pool_size = std::thread::hardware_concurrency()) {
static ThreadPool instance_for_low_level(PoolLevel::LOW, pool_size);
static ThreadPool instance_for_higher_level(PoolLevel::HIGH, pool_size);

if (pool_id == PoolID::LOW_LEVEL_POOL_ID)
if (pool_id == PoolLevel::LOW)
return instance_for_low_level;
if (pool_id == PoolID::HIGH_LEVEL_POOL_ID)
if (pool_id == PoolLevel::HIGH)
return instance_for_higher_level;
throw std::invalid_argument("Invalid instance of thread pool requested.");
}
Expand Down Expand Up @@ -86,41 +86,40 @@ namespace nil {

std::vector<std::future<ReturnType>> fut;
std::size_t cpu_usage = std::max((size_t)1, std::min(elements_count, pool_size));
std::size_t element_per_cpu = elements_count / cpu_usage;

// Pool #0 will take care of the lowest level of operations, like polynomial operations.
// We want the minimal size of element_per_cpu to be 65536, otherwise the cores are not loaded.
if (pool_id == PoolID::LOW_LEVEL_POOL_ID && element_per_cpu < POOL_0_MIN_CHUNK_SIZE) {
// We want the minimal size of elements_per_cpu to be 'POOL_0_MIN_CHUNK_SIZE', otherwise the cores are not loaded.
if (pool_id == PoolLevel::LOW && elements_count / cpu_usage < POOL_0_MIN_CHUNK_SIZE) {
cpu_usage = elements_count / POOL_0_MIN_CHUNK_SIZE + elements_count % POOL_0_MIN_CHUNK_SIZE ? 1 : 0;
element_per_cpu = elements_count / cpu_usage;
}
const std::size_t elements_per_cpu = elements_count / cpu_usage;

std::size_t begin = 0;
for (int i = 0; i < cpu_usage; i++) {
auto begin = element_per_cpu * i;
auto end = (i == cpu_usage - 1) ? elements_count : element_per_cpu * (i + 1);
for (std::size_t i = 0; i < cpu_usage; i++) {
auto end = begin + (elements_count - begin) / (cpu_usage - i);
fut.emplace_back(post<ReturnType>([begin, end, func]() {
return func(begin, end);
}));
begin = end;
}
return fut;
}

private:
inline ThreadPool(PoolID pool_id, std::size_t pool_size)
inline ThreadPool(PoolLevel pool_id, std::size_t pool_size)
: pool(pool_size)
, pool_size(pool_size)
, pool_id(pool_id) {
}

boost::asio::thread_pool pool;
std::size_t pool_size;
const std::size_t pool_size;

PoolID pool_id;
const PoolLevel pool_id;

// For pool #0 we have experimentally found that operations over chunks of <65536 elements
// do not load the cores. In case we have smaller chunks, it's better to load less cores.
const std::size_t POOL_0_MIN_CHUNK_SIZE = 65536;
static constexpr std::size_t POOL_0_MIN_CHUNK_SIZE = 65536;
};

} // namespace crypto3
Expand Down
4 changes: 2 additions & 2 deletions test/polynomial_dfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1310,7 +1310,7 @@ BOOST_AUTO_TEST_CASE(polynomial_dfs_2_levels_test) {
// Inside this multiplication lower level pool is used.
poly4[i] *= poly;
},
nil::crypto3::ThreadPool::PoolID::HIGH_LEVEL_POOL_ID);
nil::crypto3::ThreadPool::PoolLevel::HIGH);

for (int i = 1; i < poly4.size(); ++i) {
BOOST_CHECK(poly4[i] == poly4[0]);
Expand Down Expand Up @@ -1340,7 +1340,7 @@ BOOST_AUTO_TEST_CASE(polynomial_dfs_multiplication_perf_test, *boost::unit_test:

auto start = std::chrono::high_resolution_clock::now();
nil::crypto3::wait_for_all(
nil::crypto3::ThreadPool::get_instance(nil::crypto3::ThreadPool::PoolID::HIGH_LEVEL_POOL_ID).block_execution<void>(
nil::crypto3::ThreadPool::get_instance(nil::crypto3::ThreadPool::PoolLevel::HIGH).block_execution<void>(
poly4.size(),
[&poly4, &poly](std::size_t begin, std::size_t end) {
for (std::size_t i = begin; i < end; i++) {
Expand Down

0 comments on commit 8828e2d

Please sign in to comment.