Skip to content

Commit

Permalink
branch-3.0: [fix](local exchange) Use tokens to ensure that try_deque…
Browse files Browse the repository at this point in the history
…ue maintains strict order. #45741 (#45844)

Cherry-picked from #45741

Co-authored-by: Mryange <[email protected]>
github-actions[bot] and Mryange authored Jan 26, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 6856b74 commit ce8d389
Showing 2 changed files with 112 additions and 2 deletions.
5 changes: 3 additions & 2 deletions be/src/pipeline/local_exchange/local_exchanger.h
Original file line number Diff line number Diff line change
@@ -104,12 +104,13 @@ template <typename BlockType>
struct BlockQueue {
std::atomic<bool> eos = false;
moodycamel::ConcurrentQueue<BlockType> data_queue;
moodycamel::ProducerToken ptok {data_queue};
BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
BlockQueue(BlockQueue<BlockType>&& other)
: eos(other.eos.load()), data_queue(std::move(other.data_queue)) {}
inline bool enqueue(BlockType const& item) {
if (!eos) {
if (!data_queue.enqueue(item)) [[unlikely]] {
if (!data_queue.enqueue(ptok, item)) [[unlikely]] {
throw Exception(ErrorCode::INTERNAL_ERROR,
"Exception occurs in data queue [size = {}] of local exchange.",
data_queue.size_approx());
@@ -121,7 +122,7 @@ struct BlockQueue {

inline bool enqueue(BlockType&& item) {
if (!eos) {
if (!data_queue.enqueue(std::move(item))) [[unlikely]] {
if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] {
throw Exception(ErrorCode::INTERNAL_ERROR,
"Exception occurs in data queue [size = {}] of local exchange.",
data_queue.size_approx());
109 changes: 109 additions & 0 deletions be/test/vec/exec/concurrent_queue_order.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <concurrentqueue.h>
#include <gtest/gtest.h>

#include <memory>
#include <vector>

namespace doris::vectorized {

class ConcurrentQueueOrder : public testing::Test {
public:
ConcurrentQueueOrder() = default;
~ConcurrentQueueOrder() override = default;
};
// The previously used moodycamel::ConcurrentQueue does not guarantee that the enqueue order matches the dequeue order,
// even when there is only a single producer and a single consumer.
// Refer to this issue: https://github.com/cameron314/concurrentqueue/issues/316
// We can use tokens to ensure the correct order.
TEST_F(ConcurrentQueueOrder, test_not_guarantee_order) {
{
moodycamel::ConcurrentQueue<int> data_queue;
int num = 0;
std::mutex m;
std::atomic_bool flag = true;

auto task = [&](int thread_id) {
while (flag) {
std::lock_guard lc {m};
data_queue.enqueue(num++);
}
};
std::thread input1(task, 0);
std::thread input2(task, 1);
std::thread input3(task, 2);

std::this_thread::sleep_for(std::chrono::milliseconds(50));
flag = false;

input3.join();
input1.join();
input2.join();

std::cout << "queue size " << data_queue.size_approx() << "\n";
std::vector<int> outputs;
int output;
while (data_queue.try_dequeue(output)) {
outputs.push_back(output);
}

EXPECT_FALSE(std::is_sorted(outputs.begin(), outputs.end()));
std::cout << "output is sorted : " << std::is_sorted(outputs.begin(), outputs.end())
<< "\n";
}
}

TEST_F(ConcurrentQueueOrder, test_guarantee_order) {
{
moodycamel::ConcurrentQueue<int> data_queue;
moodycamel::ProducerToken ptok {data_queue};
int num = 0;
std::mutex m;
std::atomic_bool flag = true;

auto task = [&](int thread_id) {
while (flag) {
std::lock_guard lc {m};
data_queue.enqueue(ptok, num++);
}
};
std::thread input1(task, 0);
std::thread input2(task, 1);
std::thread input3(task, 2);

std::this_thread::sleep_for(std::chrono::milliseconds(50));
flag = false;

input3.join();
input1.join();
input2.join();

std::cout << "queue size " << data_queue.size_approx() << "\n";
std::vector<int> outputs;
int output;
while (data_queue.try_dequeue(output)) {
outputs.push_back(output);
}

EXPECT_TRUE(std::is_sorted(outputs.begin(), outputs.end()));
std::cout << "output is sorted : " << std::is_sorted(outputs.begin(), outputs.end())
<< "\n";
}
}
} // namespace doris::vectorized

0 comments on commit ce8d389

Please sign in to comment.