Skip to content

Commit

Permalink
Introduce Printer component
Browse files Browse the repository at this point in the history
Signed-off-by: Anton Pryakhin <[email protected]>
  • Loading branch information
waldgange committed Feb 3, 2025
1 parent 0e8bbf2 commit ae4bdbe
Show file tree
Hide file tree
Showing 30 changed files with 3,245 additions and 1,627 deletions.
6 changes: 6 additions & 0 deletions src/applications/bmqstoragetool/bmqstoragetool.m.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ static bool parseArgs(CommandLineArguments& arguments,
balcl::TypeInfo(&arguments.d_cslFile,
CommandLineArguments::isValidFileName),
balcl::OccurrenceInfo::e_OPTIONAL},
{"print-mode",
"print mode",
"can be one of the following {human|json-prety|json-line}",
balcl::TypeInfo(&arguments.d_printMode,
CommandLineArguments::isValidPrintMode),
balcl::OccurrenceInfo("human")},
{"guid",
"guid",
"message guid",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,28 @@ CommandProcessorFactory::createCommandProcessor(

bslma::Allocator* alloc = bslma::Default::allocator(allocator);

// Create printer
bsl::shared_ptr<Printer> printer = createPrinter(params->d_printMode,
ostream,
allocator);

// Create payload dumper
bslma::ManagedPtr<PayloadDumper> payloadDumper;
if (params->d_dumpPayload) {
payloadDumper.load(new (*alloc)
PayloadDumper(ostream,
fileManager->dataFileIterator(),
params->d_dumpLimit,
alloc),
alloc);
}

// Create searchResult for given 'params'.
bsl::shared_ptr<SearchResult> searchResult =
SearchResultFactory::createSearchResult(params,
fileManager,
ostream,
printer,
payloadDumper,
alloc);
// Create commandProcessor.
bslma::ManagedPtr<CommandProcessor> commandProcessor(
Expand Down
8 changes: 5 additions & 3 deletions src/applications/bmqstoragetool/m_bmqstoragetool_filters.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2014-2023 Bloomberg Finance L.P.
// Copyright 2014-2023 Bloomberg Finance L.P.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -44,8 +44,10 @@ Filters::Filters(const bsl::vector<bsl::string>& queueKeys,
mqbu::StorageKey key;
bsl::vector<bsl::string>::const_iterator uriIt = queueUris.cbegin();
for (; uriIt != queueUris.cend(); ++uriIt) {
if (queueMap.findKeyByUri(&key, *uriIt)) {
d_queueKeys.insert(key);
bsl::optional<mqbu::StorageKey> key = queueMap.findKeyByUri(
*uriIt);
if (key.has_value()) {
d_queueKeys.insert(key.value());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace m_bmqstoragetool {
/// invalidated. Behavior is undefined unless last call to `nextRecord` or
/// 'advance' returned '1' and the iterator points to a valid record.
int moveToLowerBound(mqbs::JournalFileIterator* jit,
LessThanLowerBoundFn& lessThanLowerBoundFn)
MoreThanLowerBoundFn& moreThanLowerBoundFn)
{
// PRECONDITIONS
BSLS_ASSERT(jit);
Expand All @@ -60,7 +60,7 @@ int moveToLowerBound(mqbs::JournalFileIterator* jit,
bsls::Types::Uint64 left = 0;
bsls::Types::Uint64 right = recordsNumber;
while (right > left + 1) {
const bool goBackwards = lessThanLowerBoundFn(jit, true);
const bool goBackwards = moreThanLowerBoundFn(jit);
if (goBackwards != jit->isReverseMode()) {
jit->flipDirection();
}
Expand All @@ -76,6 +76,9 @@ int moveToLowerBound(mqbs::JournalFileIterator* jit,
}
left = jit->recordIndex();
}
static int i = 0;
bsl::cout << (++i) << " advance(" << bsl::max((right - left) / 2, 1ULL)
<< ")\n";
rc = jit->advance(bsl::max((right - left) / 2, 1ULL));
if (rc != 1) {
return rc; // RETURN
Expand All @@ -85,7 +88,7 @@ int moveToLowerBound(mqbs::JournalFileIterator* jit,
jit->flipDirection();
}
// Move to next record if value <= lower bound) {
if (lessThanLowerBoundFn(jit) || !lessThanLowerBoundFn(jit, true)) {
if (!moreThanLowerBoundFn(jit)) {
if (jit->recordIndex() < recordsNumber) {
rc = jit->nextRecord();
}
Expand All @@ -100,41 +103,37 @@ int moveToLowerBound(mqbs::JournalFileIterator* jit,
}

// ==========================
// class LessThanLowerBoundFn
// class MoreThanLowerBoundFn
// ==========================

LessThanLowerBoundFn::LessThanLowerBoundFn(const Parameters::Range& range)
MoreThanLowerBoundFn::MoreThanLowerBoundFn(const Parameters::Range& range)
: d_range(range)
{
// One of the `greater than` range values must be set
BSLS_ASSERT(d_range.d_timestampGt || d_range.d_offsetGt ||
d_range.d_seqNumGt);
}

bool LessThanLowerBoundFn::operator()(const mqbs::JournalFileIterator* jit,
bool inverseOrder) const
bool MoreThanLowerBoundFn::operator()(
const mqbs::JournalFileIterator* jit) const
{
// PRECONDITIONS
BSLS_ASSERT(jit);

if (d_range.d_timestampGt) {
const bsl::uint64_t timestamp = jit->recordHeader().timestamp();
return inverseOrder ? *d_range.d_timestampGt < timestamp
: timestamp < *d_range.d_timestampGt; // RETURN
return d_range.d_timestampGt.value() <
jit->recordHeader().timestamp(); // RETURN
}

if (d_range.d_offsetGt) {
const bsl::uint64_t offset = jit->recordOffset();
return inverseOrder ? *d_range.d_offsetGt < offset
: offset < *d_range.d_offsetGt; // RETURN
return d_range.d_offsetGt.value() < jit->recordOffset(); // RETURN
}

if (d_range.d_seqNumGt) {
const CompositeSequenceNumber seqNum(
jit->recordHeader().primaryLeaseId(),
jit->recordHeader().sequenceNumber());
return inverseOrder ? *d_range.d_seqNumGt < seqNum
: seqNum < *d_range.d_seqNumGt; // RETURN
return d_range.d_seqNumGt.value() < seqNum; // RETURN
}

return false;
Expand Down Expand Up @@ -188,8 +187,8 @@ void JournalFileProcessor::process()
}

if (needMoveToLowerBound) {
LessThanLowerBoundFn lessThanLowerBoundFn(d_parameters->d_range);
rc = moveToLowerBound(iter, lessThanLowerBoundFn);
MoreThanLowerBoundFn moreThanLowerBoundFn(d_parameters->d_range);
rc = moveToLowerBound(iter, moreThanLowerBoundFn);
if (rc == 0) {
stopSearch = true;
continue; // CONTINUE
Expand All @@ -207,7 +206,6 @@ void JournalFileProcessor::process()
// MessageRecord
if (iter->recordType() == mqbs::RecordType::e_MESSAGE) {
const mqbs::MessageRecord& record = iter->asMessageRecord();

// Apply filters
if (filters.apply(iter->recordHeader(),
iter->recordOffset(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,32 +41,31 @@ namespace BloombergLP {
namespace m_bmqstoragetool {

// ==========================
// class LessThanLowerBoundFn
// class MoreThanLowerBoundFn
// ==========================

// Functor to perform comparison with range lower bound.
class LessThanLowerBoundFn {
class MoreThanLowerBoundFn {
private:
// PRIVATE DATA
const Parameters::Range& d_range;

public:
// CREATORS

explicit LessThanLowerBoundFn(const Parameters::Range& range);
explicit MoreThanLowerBoundFn(const Parameters::Range& range);

// ACCESSORS

bool operator()(const mqbs::JournalFileIterator* jit,
bool inverseOrder = false) const;
bool operator()(const mqbs::JournalFileIterator* jit) const;
// Return true if value specified by `jit` is less than range lower bound
// when the specified `inverseOrder` is false, false otherwise.
// Return true if range lower bound is less than value specified by `jit`
// when the specified `inverseOrder` is true, false otherwise.
};

int moveToLowerBound(mqbs::JournalFileIterator* jit,
LessThanLowerBoundFn& lessThanLowerBoundFn);
MoreThanLowerBoundFn& lessThanLowerBoundFn);

// ==========================
// class JournalFileProcessor
Expand Down
Loading

0 comments on commit ae4bdbe

Please sign in to comment.