Skip to content

Commit

Permalink
fix: log sending sending stages
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui committed Apr 14, 2024
1 parent ebf2eb9 commit 8a5704c
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 5 deletions.
5 changes: 3 additions & 2 deletions cpp/examples/ExampleFifoProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);

auto& logger = getLogger();
logger.setConsoleLevel(Level::Info);
logger.setLevel(Level::Info);
logger.setConsoleLevel(Level::Debug);
logger.setLevel(Level::Debug);
logger.init();

// Access Key/Secret pair may be acquired from management console
Expand Down Expand Up @@ -173,6 +173,7 @@ int main(int argc, char* argv[]) {

semaphore->acquire();
producer.send(std::move(message), callback);
std::cout << "Cached No." << i << " message" << std::endl;
}
} catch (...) {
std::cerr << "Ah...No!!!" << std::endl;
Expand Down
18 changes: 17 additions & 1 deletion cpp/source/rocketmq/FifoProducerPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ void FifoProducerPartition::add(FifoContext&& context) {
{
absl::MutexLock lk(&messages_mtx_);
messages_.emplace_back(std::move(context));
SPDLOG_DEBUG("{} has {} pending messages after #add", name_, messages_.size());
}

trySend();
Expand All @@ -29,6 +30,11 @@ void FifoProducerPartition::trySend() {
if (inflight_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) {
absl::MutexLock lk(&messages_mtx_);

if (messages_.empty()) {
SPDLOG_DEBUG("There is no more messages to send");
return;
}

FifoContext& ctx = messages_.front();
MessageConstPtr message = std::move(ctx.message);
SendCallback send_callback = ctx.callback;
Expand All @@ -37,20 +43,30 @@ void FifoProducerPartition::trySend() {
auto fifo_callback = [=](const std::error_code& ec, const SendReceipt& receipt) mutable {
partition->onComplete(ec, receipt, send_callback);
};
SPDLOG_DEBUG("Sending FIFO message from {}", name_);
producer_->send(std::move(message), fifo_callback);
messages_.pop_front();
SPDLOG_DEBUG("In addition to the inflight one, there is {} messages pending in {}", messages_.size(), name_);
} else {
SPDLOG_DEBUG("There is an inflight message");
}
}

void FifoProducerPartition::onComplete(const std::error_code& ec, const SendReceipt& receipt, SendCallback& callback) {
if (ec) {
SPDLOG_INFO("{} completed with a failure: {}", name_, ec.message());
} else {
SPDLOG_DEBUG("{} completed OK", name_);
}

if (!ec) {
callback(ec, receipt);
// update inflight status
bool expected = true;
if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) {
trySend();
} else {
SPDLOG_ERROR("Unexpected inflight status");
SPDLOG_ERROR("{}: Unexpected inflight status", name_);
}
return;
}
Expand Down
3 changes: 2 additions & 1 deletion cpp/source/rocketmq/include/FifoProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "FifoProducerPartition.h"
#include "ProducerImpl.h"
#include "fmt/format.h"
#include "rocketmq/Message.h"
#include "rocketmq/SendCallback.h"

Expand All @@ -16,7 +17,7 @@ class FifoProducerImpl : std::enable_shared_from_this<FifoProducerImpl> {
FifoProducerImpl(std::shared_ptr<ProducerImpl> producer, std::size_t concurrency)
: producer_(producer), concurrency_(concurrency), partitions_(concurrency) {
for (auto i = 0; i < concurrency; i++) {
partitions_[i] = std::make_shared<FifoProducerPartition>(producer_);
partitions_[i] = std::make_shared<FifoProducerPartition>(producer_, fmt::format("slot-{}", i));
}
};

Expand Down
4 changes: 3 additions & 1 deletion cpp/source/rocketmq/include/FifoProducerPartition.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ ROCKETMQ_NAMESPACE_BEGIN

class FifoProducerPartition : public std::enable_shared_from_this<FifoProducerPartition> {
public:
FifoProducerPartition(std::shared_ptr<ProducerImpl> producer) : producer_(producer) {
FifoProducerPartition(std::shared_ptr<ProducerImpl> producer, std::string&& name)
: producer_(producer), name_(std::move(name)) {
}

void add(FifoContext&& context) LOCKS_EXCLUDED(messages_mtx_);
Expand All @@ -32,6 +33,7 @@ class FifoProducerPartition : public std::enable_shared_from_this<FifoProducerPa
std::list<FifoContext> messages_ GUARDED_BY(messages_mtx_);
absl::Mutex messages_mtx_;
std::atomic_bool inflight_{false};
std::string name_;
};

ROCKETMQ_NAMESPACE_END
Empty file modified cpp/tools/trouble_shooting.sh
100644 → 100755
Empty file.

0 comments on commit 8a5704c

Please sign in to comment.