diff --git a/cpp/examples/ExampleFifoProducer.cpp b/cpp/examples/ExampleFifoProducer.cpp index ff3a14db..9d99be36 100644 --- a/cpp/examples/ExampleFifoProducer.cpp +++ b/cpp/examples/ExampleFifoProducer.cpp @@ -106,8 +106,8 @@ int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); auto& logger = getLogger(); - logger.setConsoleLevel(Level::Info); - logger.setLevel(Level::Info); + logger.setConsoleLevel(Level::Debug); + logger.setLevel(Level::Debug); logger.init(); // Access Key/Secret pair may be acquired from management console @@ -173,6 +173,7 @@ int main(int argc, char* argv[]) { semaphore->acquire(); producer.send(std::move(message), callback); + std::cout << "Cached No." << i << " message" << std::endl; } } catch (...) { std::cerr << "Ah...No!!!" << std::endl; diff --git a/cpp/source/rocketmq/FifoProducerPartition.cpp b/cpp/source/rocketmq/FifoProducerPartition.cpp index 37526f76..94e1c722 100644 --- a/cpp/source/rocketmq/FifoProducerPartition.cpp +++ b/cpp/source/rocketmq/FifoProducerPartition.cpp @@ -19,6 +19,7 @@ void FifoProducerPartition::add(FifoContext&& context) { { absl::MutexLock lk(&messages_mtx_); messages_.emplace_back(std::move(context)); + SPDLOG_DEBUG("{} has {} pending messages after #add", name_, messages_.size()); } trySend(); @@ -29,6 +30,11 @@ void FifoProducerPartition::trySend() { if (inflight_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) { absl::MutexLock lk(&messages_mtx_); + if (messages_.empty()) { + SPDLOG_DEBUG("There is no more messages to send"); + return; + } + FifoContext& ctx = messages_.front(); MessageConstPtr message = std::move(ctx.message); SendCallback send_callback = ctx.callback; @@ -37,12 +43,22 @@ void FifoProducerPartition::trySend() { auto fifo_callback = [=](const std::error_code& ec, const SendReceipt& receipt) mutable { partition->onComplete(ec, receipt, send_callback); }; + SPDLOG_DEBUG("Sending FIFO message from {}", name_); producer_->send(std::move(message), fifo_callback); messages_.pop_front(); + SPDLOG_DEBUG("In addition to the inflight one, there is {} messages pending in {}", messages_.size(), name_); + } else { + SPDLOG_DEBUG("There is an inflight message"); } } void FifoProducerPartition::onComplete(const std::error_code& ec, const SendReceipt& receipt, SendCallback& callback) { + if (ec) { + SPDLOG_INFO("{} completed with a failure: {}", name_, ec.message()); + } else { + SPDLOG_DEBUG("{} completed OK", name_); + } + if (!ec) { callback(ec, receipt); // update inflight status @@ -50,7 +66,7 @@ void FifoProducerPartition::onComplete(const std::error_code& ec, const SendRece if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) { trySend(); } else { - SPDLOG_ERROR("Unexpected inflight status"); + SPDLOG_ERROR("{}: Unexpected inflight status", name_); } return; } diff --git a/cpp/source/rocketmq/include/FifoProducerImpl.h b/cpp/source/rocketmq/include/FifoProducerImpl.h index cc11dcf6..180c3f93 100644 --- a/cpp/source/rocketmq/include/FifoProducerImpl.h +++ b/cpp/source/rocketmq/include/FifoProducerImpl.h @@ -6,6 +6,7 @@ #include "FifoProducerPartition.h" #include "ProducerImpl.h" +#include "fmt/format.h" #include "rocketmq/Message.h" #include "rocketmq/SendCallback.h" @@ -16,7 +17,7 @@ class FifoProducerImpl : std::enable_shared_from_this { FifoProducerImpl(std::shared_ptr producer, std::size_t concurrency) : producer_(producer), concurrency_(concurrency), partitions_(concurrency) { for (auto i = 0; i < concurrency; i++) { - partitions_[i] = std::make_shared(producer_); + partitions_[i] = std::make_shared(producer_, fmt::format("slot-{}", i)); } }; diff --git a/cpp/source/rocketmq/include/FifoProducerPartition.h b/cpp/source/rocketmq/include/FifoProducerPartition.h index 406b8fa6..96bb96f6 100644 --- a/cpp/source/rocketmq/include/FifoProducerPartition.h +++ b/cpp/source/rocketmq/include/FifoProducerPartition.h @@ -18,7 +18,8 @@ ROCKETMQ_NAMESPACE_BEGIN class FifoProducerPartition : public std::enable_shared_from_this { public: - FifoProducerPartition(std::shared_ptr producer) : producer_(producer) { + FifoProducerPartition(std::shared_ptr producer, std::string&& name) + : producer_(producer), name_(std::move(name)) { } void add(FifoContext&& context) LOCKS_EXCLUDED(messages_mtx_); @@ -32,6 +33,7 @@ class FifoProducerPartition : public std::enable_shared_from_this messages_ GUARDED_BY(messages_mtx_); absl::Mutex messages_mtx_; std::atomic_bool inflight_{false}; + std::string name_; }; ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/tools/trouble_shooting.sh b/cpp/tools/trouble_shooting.sh old mode 100644 new mode 100755