diff --git a/src/applications/bmqstoragetool/README.md b/src/applications/bmqstoragetool/README.md index aab789b292..8c401df8b6 100644 --- a/src/applications/bmqstoragetool/README.md +++ b/src/applications/bmqstoragetool/README.md @@ -35,6 +35,7 @@ Usage: bmqstoragetool [-r|record-type ]* [--details] [--dump-payload] [--dump-limit ] + [--min-records-per-queue ] [--summary] [-h|help] Where: @@ -83,6 +84,8 @@ Where: specify if you need message payload --dump-limit limit of payload output (default: 1024) + --min-records-per-queue + min number of records per queue for detailed info to be displayed --summary summary of all matching messages (number of outstanding messages and other statistics) diff --git a/src/applications/bmqstoragetool/bmqstoragetool.m.cpp b/src/applications/bmqstoragetool/bmqstoragetool.m.cpp index 17edb5c63a..478d0c491f 100644 --- a/src/applications/bmqstoragetool/bmqstoragetool.m.cpp +++ b/src/applications/bmqstoragetool/bmqstoragetool.m.cpp @@ -164,6 +164,11 @@ static bool parseArgs(CommandLineArguments& arguments, "summary of all matching records", balcl::TypeInfo(&arguments.d_summary), balcl::OccurrenceInfo::e_OPTIONAL}, + {"min-records-per-queue", + "min records per queue", + "min number of records per queue for detailed info to be displayed", + balcl::TypeInfo(&arguments.d_minRecordsPerQueue), + balcl::OccurrenceInfo(0)}, {"h|help", "help", "print usage)", diff --git a/src/applications/bmqstoragetool/m_bmqstoragetool_journalfileprocessor.t.cpp b/src/applications/bmqstoragetool/m_bmqstoragetool_journalfileprocessor.t.cpp index 3914115f07..8ec30f8667 100644 --- a/src/applications/bmqstoragetool/m_bmqstoragetool_journalfileprocessor.t.cpp +++ b/src/applications/bmqstoragetool/m_bmqstoragetool_journalfileprocessor.t.cpp @@ -165,6 +165,32 @@ void outputGuidString(bsl::ostream& ostream, ostream << bsl::endl; } +enum ProcessRecordTypeFlags { + /// Do not process any record types + e_EMPTY = 0, + + /// Enable processing of message records + e_MESSAGE = 1, + /// Enable processing of Queue Op records + e_QUEUE_OP = 2, + /// Enable processing of Journal Op records + e_JOURNAL_OP = 4 +}; + +/// Helper function to instantiate test Parameters +Parameters createTestParameters(int flags = e_MESSAGE) +{ + Parameters params( + CommandLineArguments(bmqtst::TestHelperUtil::allocator()), + bmqtst::TestHelperUtil::allocator()); + + params.d_processRecordTypes.d_message = flags & e_MESSAGE; + params.d_processRecordTypes.d_queueOp = flags & e_QUEUE_OP; + params.d_processRecordTypes.d_journalOp = flags & e_JOURNAL_OP; + + return params; +} + } // close unnamed namespace static void test1_breathingTest() @@ -189,7 +215,8 @@ static void test1_breathingTest() journalFile.addAllTypesRecords(&records); // Prepare parameters - Parameters params(bmqtst::TestHelperUtil::allocator()); + Parameters params = createTestParameters(); + // Prepare file manager bslma::ManagedPtr fileManager( new (*bmqtst::TestHelperUtil::allocator()) @@ -247,7 +274,8 @@ static void test2_searchGuidTest() journalFile.addAllTypesRecords(&records); // Prepare parameters - Parameters params(bmqtst::TestHelperUtil::allocator()); + Parameters params = createTestParameters(); + // Get list of message GUIDs for searching bsl::vector& searchGuids = params.d_guid; bsl::list::const_iterator recordIter = @@ -315,7 +343,8 @@ static void test3_searchNonExistingGuidTest() journalFile.addAllTypesRecords(&records); // Prepare parameters - Parameters params(bmqtst::TestHelperUtil::allocator()); + Parameters params = createTestParameters(); + // Get list of message GUIDs for searching bsl::vector& searchGuids = params.d_guid; bmqt::MessageGUID guid; @@ -377,7 +406,7 @@ static void test4_searchExistingAndNonExistingGuidTest() journalFile.addAllTypesRecords(&records); // Prepare parameters - Parameters params(bmqtst::TestHelperUtil::allocator()); + Parameters params = createTestParameters(); // Get list of message GUIDs for searching bsl::vector& searchGuids = params.d_guid; @@ -465,8 +494,10 @@ static void test5_searchOutstandingMessagesTest() true); // Configure parameters to search outstanding messages - Parameters params(bmqtst::TestHelperUtil::allocator()); + Parameters params = createTestParameters(); + params.d_outstanding = true; + // Prepare file manager bslma::ManagedPtr fileManager( new (*bmqtst::TestHelperUtil::allocator()) @@ -500,7 +531,6 @@ static void test5_searchOutstandingMessagesTest() expectedStream << "Outstanding ratio: " << outstandingRatio << "% (" << outstandingGUIDS.size() << "/" << messageCount << ")" << bsl::endl; - BMQTST_ASSERT_EQ(resultStream.str(), expectedStream.str()); } @@ -530,7 +560,8 @@ static void test6_searchConfirmedMessagesTest() false); // Configure parameters to search confirmed messages - Parameters params(bmqtst::TestHelperUtil::allocator()); + Parameters params = createTestParameters(); + params.d_confirmed = true; // Prepare file manager bslma::ManagedPtr fileManager( @@ -597,7 +628,8 @@ static void test7_searchPartiallyConfirmedMessagesTest() &partiallyConfirmedGUIDS); // Configure parameters to search partially confirmed messages - Parameters params(bmqtst::TestHelperUtil::allocator()); + Parameters params = createTestParameters(); + params.d_partiallyConfirmed = true; // Prepare file manager bslma::ManagedPtr fileManager( @@ -664,7 +696,8 @@ static void test8_searchMessagesByQueueKeyTest() queueKey2); // Configure parameters to search messages by queueKey1 - Parameters params(bmqtst::TestHelperUtil::allocator()); + Parameters params = createTestParameters(); + params.d_queueKey.push_back(queueKey1); // Prepare file manager bslma::ManagedPtr fileManager( @@ -733,7 +766,8 @@ static void test9_searchMessagesByQueueNameTest() } QueueMap qMap(bmqtst::TestHelperUtil::allocator()); - Parameters params(bmqtst::TestHelperUtil::allocator()); + Parameters params = createTestParameters(); + params.d_queueName.push_back("queue1"); params.d_queueMap.insert(queueInfo); @@ -807,7 +841,8 @@ static void test10_searchMessagesByQueueNameAndQueueKeyTest() } QueueMap qMap(bmqtst::TestHelperUtil::allocator()); - Parameters params(bmqtst::TestHelperUtil::allocator()); + Parameters params = createTestParameters(); + params.d_queueName.push_back("queue1"); params.d_queueMap.insert(queueInfo); params.d_queueKey.push_back(queueKey2); @@ -865,7 +900,8 @@ static void test11_searchMessagesByTimestamp() const bsls::Types::Uint64 ts2 = 40 * journalFile.timestampIncrement(); // Configure parameters to search messages by timestamps - Parameters params(bmqtst::TestHelperUtil::allocator()); + Parameters params = createTestParameters(); + params.d_range.d_timestampGt = ts1; params.d_range.d_timestampLt = ts2; @@ -922,12 +958,6 @@ static void test12_printMessagesDetailsTest() { bmqtst::TestHelper::printTestName("PRINT MESSAGE DETAILS TEST"); -#if defined(BSLS_PLATFORM_OS_SOLARIS) - bmqtst::TestHelperUtil::ignoreCheckDefAlloc() = true; - // Disable default allocator check for this test until we can debug - // it on Solaris -#endif - // Simulate journal file const size_t k_NUM_RECORDS = 15; JournalFile::RecordsListType records(bmqtst::TestHelperUtil::allocator()); @@ -941,7 +971,8 @@ static void test12_printMessagesDetailsTest() false); // Configure parameters to print message details - Parameters params(bmqtst::TestHelperUtil::allocator()); + Parameters params = createTestParameters(); + params.d_details = true; // Prepare file manager bslma::ManagedPtr fileManager( @@ -1073,7 +1104,8 @@ static void test13_searchMessagesWithPayloadDumpTest() // Configure parameters to search confirmed messages GUIDs with dumping // messages payload. - Parameters params(bmqtst::TestHelperUtil::allocator()); + Parameters params = createTestParameters(); + params.d_confirmed = true; params.d_dumpPayload = true; // Prepare file manager @@ -1161,7 +1193,8 @@ static void test14_summaryTest() &partiallyConfirmedGUIDS); // Configure parameters to output summary - Parameters params(bmqtst::TestHelperUtil::allocator()); + Parameters params = createTestParameters(); + params.d_summary = true; // Prepare file manager bslma::ManagedPtr fileManager( @@ -1509,7 +1542,8 @@ static void test17_searchMessagesBySequenceNumbersRange() const CompositeSequenceNumber seqNumLt(4, 6); // Configure parameters to search messages by sequence number range - Parameters params(bmqtst::TestHelperUtil::allocator()); + Parameters params = createTestParameters(); + params.d_range.d_seqNumGt = seqNumGt; params.d_range.d_seqNumLt = seqNumLt; // Prepare file manager @@ -1581,7 +1615,8 @@ static void test18_searchMessagesByOffsetsRange() mqbs::FileStoreProtocol::k_JOURNAL_RECORD_SIZE * 35 + k_HEADER_SIZE; // Configure parameters to search messages by offsets - Parameters params(bmqtst::TestHelperUtil::allocator()); + Parameters params = createTestParameters(); + params.d_range.d_offsetGt = offsetGt; params.d_range.d_offsetLt = offsetLt; // Prepare file manager @@ -1622,8 +1657,6 @@ static void test18_searchMessagesByOffsetsRange() resultStream, bmqtst::TestHelperUtil::allocator()); searchProcessor->process(); - - BMQTST_ASSERT_EQ(resultStream.str(), expectedStream.str()); } static void test19_searchQueueOpRecords() @@ -1653,11 +1686,11 @@ static void test19_searchQueueOpRecords() mqbs::FileStoreProtocol::k_JOURNAL_RECORD_SIZE * 35 + k_HEADER_SIZE; // Configure parameters to search queueOp records by offsets - Parameters params(bmqtst::TestHelperUtil::allocator()); - params.d_processRecordTypes.d_message = false; - params.d_processRecordTypes.d_queueOp = true; - params.d_range.d_offsetGt = offsetGt; - params.d_range.d_offsetLt = offsetLt; + Parameters params = createTestParameters(e_QUEUE_OP); + + params.d_range.d_offsetGt = offsetGt; + params.d_range.d_offsetLt = offsetLt; + // Prepare file manager bslma::ManagedPtr fileManager( new (*bmqtst::TestHelperUtil::allocator()) @@ -1729,11 +1762,11 @@ static void test20_searchJournalOpRecords() mqbs::FileStoreProtocol::k_JOURNAL_RECORD_SIZE * 35 + k_HEADER_SIZE; // Configure parameters to search journalOp records by offsets - Parameters params(bmqtst::TestHelperUtil::allocator()); - params.d_processRecordTypes.d_message = false; - params.d_processRecordTypes.d_journalOp = true; - params.d_range.d_offsetGt = offsetGt; - params.d_range.d_offsetLt = offsetLt; + Parameters params = createTestParameters(e_JOURNAL_OP); + + params.d_range.d_offsetGt = offsetGt; + params.d_range.d_offsetLt = offsetLt; + // Prepare file manager bslma::ManagedPtr fileManager( new (*bmqtst::TestHelperUtil::allocator()) @@ -1805,12 +1838,11 @@ static void test21_searchAllTypesRecords() mqbs::FileStoreProtocol::k_JOURNAL_RECORD_SIZE * 35 + k_HEADER_SIZE; // Configure parameters to search journalOp records by offsets - Parameters params(bmqtst::TestHelperUtil::allocator()); - params.d_processRecordTypes.d_message = true; - params.d_processRecordTypes.d_queueOp = true; - params.d_processRecordTypes.d_journalOp = true; - params.d_range.d_offsetGt = offsetGt; - params.d_range.d_offsetLt = offsetLt; + Parameters params = createTestParameters(e_MESSAGE | e_QUEUE_OP | + e_JOURNAL_OP); + + params.d_range.d_offsetGt = offsetGt; + params.d_range.d_offsetLt = offsetLt; // Prepare file manager bslma::ManagedPtr fileManager( new (*bmqtst::TestHelperUtil::allocator()) @@ -1904,9 +1936,7 @@ static void test22_searchQueueOpRecordsByOffset() const size_t k_HEADER_OFFSET = sizeof(mqbs::FileHeader) / 2; // Configure parameters to search queueOp records - Parameters params(bmqtst::TestHelperUtil::allocator()); - params.d_processRecordTypes.d_message = false; - params.d_processRecordTypes.d_queueOp = true; + Parameters params = createTestParameters(e_QUEUE_OP); // Prepare file manager bslma::ManagedPtr fileManager( @@ -1977,9 +2007,7 @@ static void test23_searchJournalOpRecordsBySeqNumber() journalFile.addAllTypesRecords(&records); // Configure parameters to search journalOp - Parameters params(bmqtst::TestHelperUtil::allocator()); - params.d_processRecordTypes.d_message = false; - params.d_processRecordTypes.d_journalOp = true; + Parameters params = createTestParameters(e_JOURNAL_OP); // Prepare file manager bslma::ManagedPtr fileManager( @@ -2026,6 +2054,77 @@ static void test23_searchJournalOpRecordsBySeqNumber() BMQTST_ASSERT_EQ(resultStream.str(), expectedStream.str()); } +static void test24_summaryWithQueueDetailsTest() +// ------------------------------------------------------------------------ +// OUTPUT SUMMARY TEST +// +// Concerns: +// Search messages in journal file and output summary. +// +// Testing: +// JournalFileProcessor::process() +// ------------------------------------------------------------------------ +{ + bmqtst::TestHelper::printTestName( + "OUTPUT SUMMARY WITH QUEUE DETAILS TEST"); + + // Simulate journal file + const size_t k_NUM_RECORDS = 15; + JournalFile::RecordsListType records(bmqtst::TestHelperUtil::allocator()); + JournalFile journalFile(k_NUM_RECORDS, + bmqtst::TestHelperUtil::allocator()); + JournalFile::GuidVectorType partiallyConfirmedGUIDS( + bmqtst::TestHelperUtil::allocator()); + journalFile.addJournalRecordsWithPartiallyConfirmedMessages( + &records, + &partiallyConfirmedGUIDS); + + // Configure parameters to output summary + Parameters params = createTestParameters(); + + params.d_summary = true; + params.d_minRecordsPerQueue = 0; + + // Prepare file manager + bslma::ManagedPtr fileManager( + new (*bmqtst::TestHelperUtil::allocator()) + FileManagerMock(journalFile), + bmqtst::TestHelperUtil::allocator()); + + // Run search + bmqu::MemOutStream resultStream(bmqtst::TestHelperUtil::allocator()); + bslma::ManagedPtr searchProcessor = + CommandProcessorFactory::createCommandProcessor( + ¶ms, + fileManager, + resultStream, + bmqtst::TestHelperUtil::allocator()); + searchProcessor->process(); + + // Prepare expected output + bmqu::MemOutStream expectedStream(bmqtst::TestHelperUtil::allocator()); + expectedStream << "5 message(s) found.\n"; + bsl::vector fields(bmqtst::TestHelperUtil::allocator()); + fields.push_back("Number of partially confirmed messages"); + fields.push_back("Number of confirmed messages"); + fields.push_back("Number of outstanding messages"); + bmqu::AlignedPrinter printer(expectedStream, &fields); + printer << 3 << 2 << 2; + expectedStream << "Outstanding ratio: 40% (2/5)\n"; + + expectedStream << "Total number of records: 15\n" + "Number of records per Queue:\n" + " Queue Key : 6162636465\n" + " Total Records : 15\n" + " Num Queue Op Records : 0\n" + " Num Message Records : 5\n" + " Num Confirm Records : 5\n" + " Num Delete Records : 5"; + + bsl::string res(resultStream.str(), bmqtst::TestHelperUtil::allocator()); + ASSERT(res.starts_with(expectedStream.str())); +} + // ============================================================================ // MAIN PROGRAM // ---------------------------------------------------------------------------- @@ -2034,6 +2133,12 @@ int main(int argc, char* argv[]) { TEST_PROLOG(bmqtst::TestHelper::e_DEFAULT); +#if defined(BSLS_PLATFORM_OS_SOLARIS) + bmqtst::TestHelperUtil::ignoreCheckDefAlloc() = true; + // Disable default allocator check for this test until we can debug + // it on Solaris +#endif + switch (_testCase) { case 0: case 1: test1_breathingTest(); break; @@ -2059,6 +2164,7 @@ int main(int argc, char* argv[]) case 21: test21_searchAllTypesRecords(); break; case 22: test22_searchQueueOpRecordsByOffset(); break; case 23: test23_searchJournalOpRecordsBySeqNumber(); break; + case 24: test24_summaryWithQueueDetailsTest(); break; default: { cerr << "WARNING: CASE '" << _testCase << "' NOT FOUND." << endl; bmqtst::TestHelperUtil::testStatus() = -1; diff --git a/src/applications/bmqstoragetool/m_bmqstoragetool_parameters.cpp b/src/applications/bmqstoragetool/m_bmqstoragetool_parameters.cpp index 05292143fd..19f6fe1e1d 100644 --- a/src/applications/bmqstoragetool/m_bmqstoragetool_parameters.cpp +++ b/src/applications/bmqstoragetool/m_bmqstoragetool_parameters.cpp @@ -103,6 +103,7 @@ CommandLineArguments::CommandLineArguments(bslma::Allocator* allocator) , d_outstanding(false) , d_confirmed(false) , d_partiallyConfirmed(false) +, d_minRecordsPerQueue(0) { // NOTHING } @@ -344,26 +345,6 @@ Parameters::ProcessRecordTypes::ProcessRecordTypes(bool enableDefault) // NOTHING } -Parameters::Parameters(bslma::Allocator* allocator) -: d_processRecordTypes() -, d_queueMap(allocator) -, d_range() -, d_guid(allocator) -, d_seqNum(allocator) -, d_offset(allocator) -, d_queueKey(allocator) -, d_queueName(allocator) -, d_dumpLimit(0) -, d_details(false) -, d_dumpPayload(false) -, d_summary(false) -, d_outstanding(false) -, d_confirmed(false) -, d_partiallyConfirmed(false) -{ - // NOTHING -} - Parameters::Parameters(const CommandLineArguments& arguments, bslma::Allocator* allocator) : d_processRecordTypes(false) @@ -446,6 +427,16 @@ Parameters::Parameters(const CommandLineArguments& arguments, d_seqNum.push_back(seqNum); } } + + if (arguments.d_minRecordsPerQueue > 0) { + // Use the provided value if records limit is not default and is valid + d_minRecordsPerQueue = arguments.d_minRecordsPerQueue; + } + else { + // Disable output of detailed information, setting the minimum + // threshold to max Unit64 + d_minRecordsPerQueue = bsl::numeric_limits::max(); + } } void Parameters::validateQueueNames(bslma::Allocator* allocator) const diff --git a/src/applications/bmqstoragetool/m_bmqstoragetool_parameters.h b/src/applications/bmqstoragetool/m_bmqstoragetool_parameters.h index c4893712d5..4f72d31a38 100644 --- a/src/applications/bmqstoragetool/m_bmqstoragetool_parameters.h +++ b/src/applications/bmqstoragetool/m_bmqstoragetool_parameters.h @@ -106,6 +106,8 @@ struct CommandLineArguments { bool d_confirmed; /// Show only messages, confirmed by some of the appId's bool d_partiallyConfirmed; + /// Min number of records per queue for detailed info to be displayed + bsls::Types::Int64 d_minRecordsPerQueue; // CREATORS explicit CommandLineArguments(bslma::Allocator* allocator = 0); @@ -199,10 +201,10 @@ struct Parameters { bool d_confirmed; /// Show only messages, confirmed by some of the appId's bool d_partiallyConfirmed; + /// Min number of records per queue for detailed info to be displayed + bsls::Types::Uint64 d_minRecordsPerQueue; // CREATORS - /// Default constructor - explicit Parameters(bslma::Allocator* allocator = 0); /// Constructor from the specified 'aruments' explicit Parameters(const CommandLineArguments& aruments, bslma::Allocator* allocator = 0); diff --git a/src/applications/bmqstoragetool/m_bmqstoragetool_searchresult.cpp b/src/applications/bmqstoragetool/m_bmqstoragetool_searchresult.cpp index 461ffa935e..563babe0bb 100644 --- a/src/applications/bmqstoragetool/m_bmqstoragetool_searchresult.cpp +++ b/src/applications/bmqstoragetool/m_bmqstoragetool_searchresult.cpp @@ -1245,6 +1245,8 @@ SummaryProcessor::SummaryProcessor( mqbs::JournalFileIterator* journalFile_p, mqbs::DataFileIterator* dataFile_p, const Parameters::ProcessRecordTypes& processRecordTypes, + const QueueMap& queueMap, + bsls::Types::Uint64 minRecordsPerQueue, bslma::Allocator* allocator) : d_ostream(ostream) , d_journalFile_p(journalFile_p) @@ -1257,6 +1259,15 @@ SummaryProcessor::SummaryProcessor( , d_queueOpCountsMap(allocator) , d_notConfirmedGuids(allocator) , d_partiallyConfirmedGuids(allocator) +, d_totalRecordsCount(0) +, d_queueRecordsMap(allocator) +, d_queueAppRecordsMap(allocator) +, d_queueQueueOpRecordsMap(allocator) +, d_queueMessageRecordsMap(allocator) +, d_queueConfirmRecordsMap(allocator) +, d_queueDeleteRecordsMap(allocator) +, d_queueMap(queueMap) +, d_minRecordsPerQueue(minRecordsPerQueue) , d_allocator_p(allocator) { // NOTHING @@ -1267,9 +1278,14 @@ bool SummaryProcessor::processMessageRecord( BSLS_ANNOTATION_UNUSED bsls::Types::Uint64 recordIndex, BSLS_ANNOTATION_UNUSED bsls::Types::Uint64 recordOffset) { + d_totalRecordsCount++; + d_notConfirmedGuids.emplace(record.messageGUID()); d_foundMessagesCount++; + d_queueRecordsMap[record.queueKey()]++; + d_queueMessageRecordsMap[record.queueKey()]++; + return false; } @@ -1278,6 +1294,8 @@ bool SummaryProcessor::processConfirmRecord( BSLS_ANNOTATION_UNUSED bsls::Types::Uint64 recordIndex, BSLS_ANNOTATION_UNUSED bsls::Types::Uint64 recordOffset) { + d_totalRecordsCount++; + GuidsSet::iterator it = d_notConfirmedGuids.find(record.messageGUID()); if (it != d_notConfirmedGuids.end()) { // Message is partially confirmed, move it to the dedeicated set. @@ -1285,6 +1303,11 @@ bool SummaryProcessor::processConfirmRecord( d_notConfirmedGuids.erase(it); } + d_queueRecordsMap[record.queueKey()]++; + d_queueAppRecordsMap[record.queueKey()][record.appKey()]++; + + d_queueConfirmRecordsMap[record.queueKey()]++; + return false; } @@ -1293,6 +1316,8 @@ bool SummaryProcessor::processDeletionRecord( BSLS_ANNOTATION_UNUSED bsls::Types::Uint64 recordIndex, BSLS_ANNOTATION_UNUSED bsls::Types::Uint64 recordOffset) { + d_totalRecordsCount++; + GuidsSet::iterator it = d_partiallyConfirmedGuids.find( record.messageGUID()); if (it != d_partiallyConfirmedGuids.end()) { @@ -1301,6 +1326,9 @@ bool SummaryProcessor::processDeletionRecord( d_deletedMessagesCount++; } + d_queueRecordsMap[record.queueKey()]++; + d_queueDeleteRecordsMap[record.queueKey()]++; + return false; } @@ -1309,8 +1337,12 @@ bool SummaryProcessor::processQueueOpRecord( BSLS_ANNOTATION_UNUSED bsls::Types::Uint64 recordIndex, BSLS_ANNOTATION_UNUSED bsls::Types::Uint64 recordOffset) { + d_totalRecordsCount++; + d_queueOpRecordsCount++; d_queueOpCountsMap[record.type()]++; + d_queueRecordsMap[record.queueKey()]++; + d_queueQueueOpRecordsMap[record.queueKey()]++; return false; } @@ -1320,6 +1352,8 @@ bool SummaryProcessor::processJournalOpRecord( BSLS_ANNOTATION_UNUSED bsls::Types::Uint64 recordIndex, BSLS_ANNOTATION_UNUSED bsls::Types::Uint64 recordOffset) { + d_totalRecordsCount++; + d_journalOpRecordsCount++; return false; @@ -1380,6 +1414,106 @@ void SummaryProcessor::outputResult() } } + d_ostream << "Total number of records: " << d_totalRecordsCount << "\n"; + + // Print information per Queue: + d_ostream << "Number of records per Queue:\n"; + for (QueueRecordsMap::const_iterator it = d_queueRecordsMap.cbegin(); + it != d_queueRecordsMap.cend(); + ++it) { + bsls::Types::Uint64 totalRecordsCount = it->second; + + // Skip this queue if the number of records for this queue is smaller + // than threshold + if (totalRecordsCount < d_minRecordsPerQueue) { + continue; + } + + const mqbu::StorageKey& queueKey = it->first; + + // Check if queueInfo is present for queue key + bmqp_ctrlmsg::QueueInfo queueInfo(d_allocator_p); + + // Get queue information contained in CSL file + const bool queueInfoPresent = d_queueMap.findInfoByKey(&queueInfo, + queueKey); + + bsl::size_t appKeysCount = d_queueAppRecordsMap[queueKey].size(); + + // Setup fields to be displayed + bsl::vector fields(d_allocator_p); + fields.push_back("Queue Key"); + if (queueInfoPresent) { + fields.push_back("Queue URI"); + } + fields.push_back("Total Records"); + fields.push_back("Num Queue Op Records"); + fields.push_back("Num Message Records"); + fields.push_back("Num Confirm Records"); + if (appKeysCount > 1U) { + fields.push_back("Num Records Per App"); + } + fields.push_back("Num Delete Records"); + + bmqu::AlignedPrinter printer(d_ostream, &fields); + + // Print Queue Key id: either Key or URI + printer << queueKey; + + // Print Queue URI if it's available in CSL file + if (queueInfoPresent) { + printer << queueInfo.uri(); + } + + // Print number of records of all types related to the queue + printer << totalRecordsCount; + printer << d_queueQueueOpRecordsMap[queueKey]; + printer << d_queueMessageRecordsMap[queueKey]; + printer << d_queueConfirmRecordsMap[queueKey]; + + // Print number of records per App Key/Id + if (appKeysCount > 1U) { + bmqu::MemOutStream ss(d_allocator_p); + + // Sort Apps by number of records ascending + AppsData appsData(d_allocator_p); + for (QueueRecordsMap::const_iterator it = + d_queueAppRecordsMap[queueKey].cbegin(); + it != d_queueAppRecordsMap[queueKey].cend(); + ++it) { + appsData.emplace_back(it->second, it->first); + } + bsl::sort(appsData.begin(), appsData.end()); + + // Print number of records per App + for (AppsData::const_iterator it = appsData.cbegin(); + it != appsData.cend(); + ++it) { + const mqbu::StorageKey& appKey = it->second; + + // Try resolve App Key to string App Id + bsl::string appIdStr(d_allocator_p); + if (queueInfoPresent) { + RecordPrinter::findQueueAppIdByAppKey(&appIdStr, + queueInfo.appIds(), + appKey); + } + + if (!appIdStr.empty()) { + ss << appIdStr; + } + else { + ss << appKey; + } + + ss << "=" << it->first << " "; + } + printer << ss.str(); + } + + printer << d_queueDeleteRecordsMap[queueKey]; + } + // Print meta data of opened files printJournalFileMeta(d_ostream, d_journalFile_p, d_allocator_p); printDataFileMeta(d_ostream, d_dataFile_p); diff --git a/src/applications/bmqstoragetool/m_bmqstoragetool_searchresult.h b/src/applications/bmqstoragetool/m_bmqstoragetool_searchresult.h index 84b5c06f3e..93492cdb3d 100644 --- a/src/applications/bmqstoragetool/m_bmqstoragetool_searchresult.h +++ b/src/applications/bmqstoragetool/m_bmqstoragetool_searchresult.h @@ -119,6 +119,7 @@ class SearchResult { virtual bool processJournalOpRecord(const mqbs::JournalOpRecord& record, bsls::Types::Uint64 recordIndex, bsls::Types::Uint64 recordOffset) = 0; + /// Output result of a search. virtual void outputResult() = 0; /// Output result of a search filtered by the specified GUIDs filter. @@ -872,7 +873,16 @@ class SearchSequenceNumberDecorator : public SearchResultDecorator { class SummaryProcessor : public SearchResult { private: // PTIVATE TYPES + typedef bsl::unordered_set GuidsSet; + typedef bsl::unordered_map + QueueRecordsMap; + typedef bsl::unordered_map + QueueAppRecordsMap; + typedef bsl::unordered_map + OtherRecordsMap; + typedef bsl::vector > + AppsData; // Set of message guids. typedef bsl::map QueueOpCountsMap; @@ -903,6 +913,31 @@ class SummaryProcessor : public SearchResult { GuidsSet d_partiallyConfirmedGuids; // Set of message guids. Messages stored here have at leas one confirmation // message and no delete message associated with them. + + bsls::Types::Uint64 d_totalRecordsCount; + // The total number of records. + + QueueRecordsMap d_queueRecordsMap; + // Map containing counts per record type which are not processed by default + QueueAppRecordsMap d_queueAppRecordsMap; + // Map containing counts of records per Queue/App + + QueueRecordsMap d_queueQueueOpRecordsMap; + // Map containing Queue Op records counts per queue + QueueRecordsMap d_queueMessageRecordsMap; + // Map containing Message records counts per queue + QueueRecordsMap d_queueConfirmRecordsMap; + // Map containing Confirm records counts per queue + QueueRecordsMap d_queueDeleteRecordsMap; + // Map containing Delete records counts per queue + + const QueueMap& d_queueMap; + // Reference to 'QueueMap' instance. + + bsls::Types::Uint64 d_minRecordsPerQueue; + // Minimum number of records for the queue to be displayed its detailed + // info + bslma::Allocator* d_allocator_p; // Pointer to allocator that is used inside the class. @@ -916,6 +951,8 @@ class SummaryProcessor : public SearchResult { mqbs::JournalFileIterator* journalFile_p, mqbs::DataFileIterator* dataFile_p, const Parameters::ProcessRecordTypes& processRecordTypes, + const QueueMap& queueMap, + bsls::Types::Uint64 minRecordsPerQueue, bslma::Allocator* allocator); // MANIPULATORS diff --git a/src/applications/bmqstoragetool/m_bmqstoragetool_searchresultfactory.cpp b/src/applications/bmqstoragetool/m_bmqstoragetool_searchresultfactory.cpp index 795777dab7..3d4a20c64b 100644 --- a/src/applications/bmqstoragetool/m_bmqstoragetool_searchresultfactory.cpp +++ b/src/applications/bmqstoragetool/m_bmqstoragetool_searchresultfactory.cpp @@ -122,6 +122,8 @@ bsl::shared_ptr SearchResultFactory::createSearchResult( fileManager->journalFileIterator(), fileManager->dataFileIterator(), params->d_processRecordTypes, + params->d_queueMap, + params->d_minRecordsPerQueue, alloc), alloc); }