diff --git a/src/groups/mqb/mqba/mqba_application.cpp b/src/groups/mqb/mqba/mqba_application.cpp index 055c33d2ec..d7c6d5e52a 100644 --- a/src/groups/mqb/mqba/mqba_application.cpp +++ b/src/groups/mqb/mqba/mqba_application.cpp @@ -309,6 +309,7 @@ int Application::start(bsl::ostream& errorDescription) // Start dispatcher d_dispatcher_mp.load(new (*d_allocator_p) Dispatcher( mqbcfg::BrokerConfig::get().dispatcherConfig(), + d_statController_mp->dispatcherStatContext(), d_scheduler_p, d_allocators.get("Dispatcher")), d_allocator_p); diff --git a/src/groups/mqb/mqba/mqba_dispatcher.cpp b/src/groups/mqb/mqba/mqba_dispatcher.cpp index 3547509f61..11a3b3e1bb 100644 --- a/src/groups/mqb/mqba/mqba_dispatcher.cpp +++ b/src/groups/mqb/mqba/mqba_dispatcher.cpp @@ -256,6 +256,12 @@ int Dispatcher::startContext(bsl::ostream& errorDescription, DispatcherContext(config, d_allocator_p), d_allocator_p); + context->d_statContext_mp = + mqbstat::DispatcherStatsUtil::initializeClientStatContext( + d_statContext_p, + mqbi::DispatcherClientType::toAscii(type), + d_allocator_p); + // Create and start the threadPool context->d_threadPool_mp.load( new (*d_allocator_p) @@ -360,6 +366,8 @@ void Dispatcher::queueEventCb(mqbi::DispatcherClientType::Enum type, case ProcessorPool::Event::MWCC_USER: { BALL_LOG_TRACE << "Dispatching Event to queue " << processorId << " of " << type << " dispatcher: " << event->object(); + DispatcherContext& dispatcherContext = *(d_contexts[type]); + if (event->object().type() == mqbi::DispatcherEventType::e_DISPATCHER) { const mqbi::DispatcherDispatcherEvent* realEvent = @@ -380,7 +388,6 @@ void Dispatcher::queueEventCb(mqbi::DispatcherClientType::Enum type, } } else { - DispatcherContext& dispatcherContext = *(d_contexts[type]); event->object().destination()->onDispatcherEvent(event->object()); if (!event->object() .destination() @@ -394,6 +401,11 @@ void Dispatcher::queueEventCb(mqbi::DispatcherClientType::Enum type, .setAddedToFlushList(true); } } + + dispatcherContext.d_statContext_mp->adjustValue( + mqbstat::DispatcherStats::Stat::e_DONE_START + + event->object().type(), + 1); } break; case ProcessorPool::Event::MWCC_QUEUE_EMPTY: { flushClients(type, processorId); @@ -449,11 +461,13 @@ void Dispatcher::onNewClient(mqbi::DispatcherClientType::Enum type, } Dispatcher::Dispatcher(const mqbcfg::DispatcherConfig& config, + mwcst::StatContext* statContext, bdlmt::EventScheduler* scheduler, bslma::Allocator* allocator) : d_allocator_p(allocator) , d_isStarted(false) , d_config(config) +, d_statContext_p(statContext) , d_scheduler_p(scheduler) , d_contexts(allocator) { @@ -582,6 +596,9 @@ Dispatcher::registerClient(mqbi::DispatcherClient* client, .setClientType(type) .setProcessorHandle(processor); + context.d_statContext_mp->adjustValue( + mqbstat::DispatcherStats::Stat::e_CLIENT_COUNT, + 1); BALL_LOG_DEBUG << "Registered a new client to the dispatcher " << "[Client: " << client->description() << ", type: " << type << ", processor: " << processor @@ -628,6 +645,9 @@ void Dispatcher::unregisterClient(mqbi::DispatcherClient* client) case mqbi::DispatcherClientType::e_QUEUE: case mqbi::DispatcherClientType::e_CLUSTER: { d_contexts[type]->d_loadBalancer.removeClient(client); + d_contexts[type]->d_statContext_mp->adjustValue( + mqbstat::DispatcherStats::Stat::e_CLIENT_COUNT, + -1); } break; case mqbi::DispatcherClientType::e_UNDEFINED: case mqbi::DispatcherClientType::e_ALL: diff --git a/src/groups/mqb/mqba/mqba_dispatcher.h b/src/groups/mqb/mqba/mqba_dispatcher.h index 153c84ac2b..65bfa9a088 100644 --- a/src/groups/mqb/mqba/mqba_dispatcher.h +++ b/src/groups/mqb/mqba/mqba_dispatcher.h @@ -52,11 +52,13 @@ #include #include +#include #include // MWC #include #include +#include // BDE #include @@ -245,6 +247,9 @@ class Dispatcher BSLS_CPP11_FINAL : public mqbi::Dispatcher { // for distributing clients // across processors + /// Stat context for this client type + bslma::ManagedPtr d_statContext_mp; + bsl::vector d_flushList; // Vector of vector of // pointers to @@ -277,21 +282,23 @@ class Dispatcher BSLS_CPP11_FINAL : public mqbi::Dispatcher { private: // DATA + /// Allocator to use bslma::Allocator* d_allocator_p; - // Allocator to use + /// True if this component is started bool d_isStarted; - // True if this component is started + /// Configuration for the dispatcher mqbcfg::DispatcherConfig d_config; - // Configuration for the dispatcher + /// Top-level stat context for all dispatcher client types + mwcst::StatContext* d_statContext_p; + + /// Event scheduler to use bdlmt::EventScheduler* d_scheduler_p; - // Event scheduler to use + /// The various contexts, one for each ClientType bsl::vector d_contexts; - // The various context, one for each - // ClientType // FRIENDS friend class Dispatcher_ClientExecutor; @@ -348,6 +355,7 @@ class Dispatcher BSLS_CPP11_FINAL : public mqbi::Dispatcher { /// All memory allocation will be performed using the specified /// `allocator`. Dispatcher(const mqbcfg::DispatcherConfig& config, + mwcst::StatContext* statContext, bdlmt::EventScheduler* scheduler, bslma::Allocator* allocator); @@ -549,6 +557,9 @@ inline void Dispatcher::dispatchEvent(mqbi::DispatcherEvent* event, case mqbi::DispatcherClientType::e_QUEUE: case mqbi::DispatcherClientType::e_CLUSTER: { d_contexts[type]->d_processorPool_mp->enqueueEvent(event, handle); + d_contexts[type]->d_statContext_mp->adjustValue( + mqbstat::DispatcherStats::Stat::e_ENQ_START + event->type(), + 1); } break; case mqbi::DispatcherClientType::e_UNDEFINED: case mqbi::DispatcherClientType::e_ALL: diff --git a/src/groups/mqb/mqba/mqba_dispatcher.t.cpp b/src/groups/mqb/mqba/mqba_dispatcher.t.cpp index 79b827eb23..83100459c4 100644 --- a/src/groups/mqb/mqba/mqba_dispatcher.t.cpp +++ b/src/groups/mqb/mqba/mqba_dispatcher.t.cpp @@ -19,6 +19,7 @@ // MQB #include #include +#include // MWC #include @@ -118,8 +119,15 @@ static void test1_breathingTest() s_allocator_p); eventScheduler.start(); + bsl::shared_ptr statContext_sp( + mqbstat::DispatcherStatsUtil::initializeStatContext(10, + s_allocator_p)); + { - mqba::Dispatcher obj(dispatcherConfig, &eventScheduler, s_allocator_p); + mqba::Dispatcher obj(dispatcherConfig, + statContext_sp.get(), + &eventScheduler, + s_allocator_p); } eventScheduler.stop(); @@ -211,7 +219,12 @@ static void test3_executorsSupport() dispatcherConfig.clusters().processorConfig().queueSizeHighWatermark() = 100; + bsl::shared_ptr statContext_sp( + mqbstat::DispatcherStatsUtil::initializeStatContext(10, + s_allocator_p)); + mqba::Dispatcher dispatcher(dispatcherConfig, + statContext_sp.get(), &eventScheduler, s_allocator_p); diff --git a/src/groups/mqb/mqbstat/mqbstat_dispatcherstats.cpp b/src/groups/mqb/mqbstat/mqbstat_dispatcherstats.cpp new file mode 100644 index 0000000000..b393e24fa1 --- /dev/null +++ b/src/groups/mqb/mqbstat/mqbstat_dispatcherstats.cpp @@ -0,0 +1,101 @@ +// Copyright 2018-2023 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// mqbstat_dispatcherstats.cpp -*-C++-*- +#include + +#include +// BMQ +#include + +// MQB +#include +#include + +// MWC +#include +#include +#include + +// BDE +#include +#include +#include +#include + +namespace BloombergLP { +namespace mqbstat { + +// --------------------- +// class DomainStatsUtil +// --------------------- + +bsl::shared_ptr +DispatcherStatsUtil::initializeStatContext(int historySize, + bslma::Allocator* allocator) +{ + bdlma::LocalSequentialAllocator<2048> localAllocator(allocator); + + mwcst::StatContextConfiguration config("dispatcher", &localAllocator); + config.defaultHistorySize(historySize) + .statValueAllocator(allocator) + .storeExpiredSubcontextValues(true); + + return bsl::shared_ptr( + new (*allocator) mwcst::StatContext(config, allocator), + allocator); +} + +bslma::ManagedPtr +DispatcherStatsUtil::initializeClientStatContext(mwcst::StatContext* parent, + const bslstl::StringRef& name, + bslma::Allocator* allocator) +{ + bdlma::LocalSequentialAllocator<2048> localAllocator(allocator); + + mwcst::StatContextConfiguration statConfig(name, &localAllocator); + statConfig.isTable(true) + .value("enq_undefined") + .value("enq_dispatcher") + .value("enq_callback") + .value("enq_control_msg") + .value("enq_confirm") + .value("enq_reject") + .value("enq_push") + .value("enq_put") + .value("enq_ack") + .value("enq_cluster_state") + .value("enq_storage") + .value("enq_recovery") + .value("enq_replication_receipt") + .value("done_undefined") + .value("done_dispatcher") + .value("done_callback") + .value("done_control_msg") + .value("done_confirm") + .value("done_reject") + .value("done_push") + .value("done_put") + .value("done_ack") + .value("done_cluster_state") + .value("done_storage") + .value("done_recovery") + .value("done_replication_receipt") + .value("nb_client"); + return parent->addSubcontext(statConfig); +} + +} // close package namespace +} // close enterprise namespace diff --git a/src/groups/mqb/mqbstat/mqbstat_dispatcherstats.h b/src/groups/mqb/mqbstat/mqbstat_dispatcherstats.h new file mode 100644 index 0000000000..2f5c9f2126 --- /dev/null +++ b/src/groups/mqb/mqbstat/mqbstat_dispatcherstats.h @@ -0,0 +1,142 @@ +// Copyright 2024 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// mqbstat_dispatcherstats.h -*-C++-*- +#ifndef INCLUDED_MQBSTAT_DISPATCHERSTATS +#define INCLUDED_MQBSTAT_DISPATCHERSTATS + +//@PURPOSE: Provide mechanism to keep track of Dispatcher statistics. +// +//@CLASSES: +// mqbstat::DispatcherStats: Mechanism to maintain stats of a dispatcher +// mqbstat::DomainStatsUtil: Utilities to initialize statistics +// +//@DESCRIPTION: 'mqbstat::DomainStats' provides a mechanism to keep track of +// domain level statistics. 'mqbstat::DomainStatsUtil' is a utility namespace +// exposing methods to initialize the stat contexts. + +// MQB + +// BDE +#include +#include +#include +#include +#include +#include + +namespace BloombergLP { + +// FORWARD DECLARATION +namespace mqbi { +class Domain; +} +namespace mwcst { +class StatContext; +} + +namespace mqbstat { + +// ===================== +// class DispatcherStats +// ===================== + +/// Mechanism to keep track of individual overall statistics of a dispatcher +class DispatcherStats { + public: + // TYPES + + /// Enum representing the various type of stats that can be obtained + /// from this object. + struct Stat { + // TYPES + enum Enum { + e_ENQ_START = 0, + + e_ENQ_UNDEFINED = e_ENQ_START, + e_ENQ_DISPATCHER, + e_ENQ_CALLBACK, + e_ENQ_CONTROL_MSG, + e_ENQ_CONFIRM, + e_ENQ_REJECT, + e_ENQ_PUSH, + e_ENQ_PUT, + e_ENQ_ACK, + e_ENQ_CLUSTER_STATE, + e_ENQ_STORAGE, + e_ENQ_RECOVERY, + e_ENQ_REPLICATION_RECEIPT, + + e_ENQ_END = e_ENQ_REPLICATION_RECEIPT, + + e_DONE_START = e_ENQ_END + 1, + + e_DONE_UNDEFINED = e_DONE_START, + e_DONE_DISPATCHER, + e_DONE_CALLBACK, + e_DONE_CONTROL_MSG, + e_DONE_CONFIRM, + e_DONE_REJECT, + e_DONE_PUSH, + e_DONE_PUT, + e_DONE_ACK, + e_DONE_CLUSTER_STATE, + e_DONE_STORAGE, + e_DONE_RECOVERY, + e_DONE_REPLICATION_RECEIPT, + + e_DONE_END = e_DONE_REPLICATION_RECEIPT, + + e_CLIENT_COUNT + }; + }; + + private: + // NOT IMPLEMENTED + DispatcherStats(const DispatcherStats&) BSLS_CPP11_DELETED; + + /// Copy constructor and assignment operator are not implemented. + DispatcherStats& operator=(const DispatcherStats&) BSLS_CPP11_DELETED; +}; + +// ========================== +// struct DispatcherStatsUtil +// ========================== + +/// Utility namespace of methods to initialize dispatcher stats. +struct DispatcherStatsUtil { + // CLASS METHODS + + /// Initialize the statistics for the domain stat context, keeping the + /// specified `historySize` of history. Return the created top level + /// stat context to use for all domain level statistics. Use the + /// specified `allocator` for all stat context and stat values. + static bsl::shared_ptr + initializeStatContext(int historySize, bslma::Allocator* allocator); + + /// Initialize the statistics for the domain stat context, keeping the + /// specified `historySize` of history. Return the created top level + /// stat context to use for all domain level statistics. Use the + /// specified `allocator` for all stat context and stat values. + static bslma::ManagedPtr + initializeClientStatContext(mwcst::StatContext* parent, + const bslstl::StringRef& name, + bslma::Allocator* allocator); +}; + +} // close package namespace +} // close enterprise namespace + +#endif diff --git a/src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp b/src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp index aa499e8e99..7401157dbb 100644 --- a/src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp +++ b/src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -188,6 +189,16 @@ void StatController::initializeStats() brokerAllocator), false))); + // ---------- + // Dispatcher + StatContextSp dispatcher( + mqbstat::DispatcherStatsUtil::initializeStatContext( + historySize, + d_allocators.get("DispatcherStats"))); + d_statContextsMap.insert( + bsl::make_pair(bsl::string("dispatcher"), + StatContextDetails(dispatcher, false))); + // ------- // Domains bslma::Allocator* domainsAllocator = d_allocators.get("DomainsStats"); diff --git a/src/groups/mqb/mqbstat/mqbstat_statcontroller.h b/src/groups/mqb/mqbstat/mqbstat_statcontroller.h index 14a7fb60ae..832b35fa10 100644 --- a/src/groups/mqb/mqbstat/mqbstat_statcontroller.h +++ b/src/groups/mqb/mqbstat/mqbstat_statcontroller.h @@ -321,6 +321,9 @@ class StatController { const mqbcmd::StatCommand& command, const mqbcmd::EncodingFormat::Value& encoding); + /// Retrieve the dispatcher top-level stat context. + mwcst::StatContext* dispatcherStatContext(); + /// Retrieve the domains top-level stat context. mwcst::StatContext* domainsStatContext(); @@ -349,6 +352,11 @@ class StatController { // class StatController // -------------------- +inline mwcst::StatContext* StatController::dispatcherStatContext() +{ + return d_statContextsMap["dispatcher"].d_statContext_sp.get(); +} + inline mwcst::StatContext* StatController::domainsStatContext() { return d_statContextsMap["domains"].d_statContext_sp.get(); diff --git a/src/groups/mqb/mqbstat/package/mqbstat.mem b/src/groups/mqb/mqbstat/package/mqbstat.mem index 0dd4cde8e7..afc260aa2d 100644 --- a/src/groups/mqb/mqbstat/package/mqbstat.mem +++ b/src/groups/mqb/mqbstat/package/mqbstat.mem @@ -1,5 +1,6 @@ mqbstat_brokerstats mqbstat_clusterstats +mqbstat_dispatcherstats mqbstat_domainstats mqbstat_jsonprinter mqbstat_printer