Skip to content

Commit

Permalink
Not delivering old data to new Apps
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Jan 27, 2025
1 parent c18ff5c commit 3543f04
Show file tree
Hide file tree
Showing 40 changed files with 1,259 additions and 423 deletions.
22 changes: 14 additions & 8 deletions src/groups/bmq/bmqc/bmqc_orderedhashmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
#include <bsl_stdexcept.h>
#include <bsl_utility.h>
#include <bslalg_scalarprimitives.h>
#include <bslalg_typetraithasstliterators.h>
#include <bslma_allocator.h>
#include <bslma_default.h>
#include <bslma_usesbslmaallocator.h>
Expand Down Expand Up @@ -683,8 +684,7 @@ class OrderedHashMap {
/// is `true` if a new value was inserted, and `false` if the value was
/// already present. Note that this method requires that the (template
/// parameter) types `KEY` and `VALUE` both be "copy-constructible".
template <class SOURCE_TYPE>
bsl::pair<iterator, bool> insert(const SOURCE_TYPE& value);
bsl::pair<iterator, bool> insert(const VALUE_TYPE& value);

/// Insert the specified `value` into this container at the beginning of
/// the underlying sequential list if the key (the `first` element) of a
Expand All @@ -698,8 +698,7 @@ class OrderedHashMap {
/// inserted, and `false` if the value was already present. Note that
/// this method requires that the (template parameter) types `KEY` and
/// `VALUE` both be "copy-constructible".
template <class SOURCE_TYPE>
bsl::pair<iterator, bool> rinsert(const SOURCE_TYPE& value);
bsl::pair<iterator, bool> rinsert(const VALUE_TYPE& value);

// void reserve(int numElements);
// Increase the number of buckets of this set to a quantity such that
Expand Down Expand Up @@ -1475,11 +1474,10 @@ OrderedHashMap<KEY, VALUE, HASH, VALUE_TYPE>::find(const key_type& key)
}

template <class KEY, class VALUE, class HASH, class VALUE_TYPE>
template <class SOURCE_TYPE>
inline bsl::pair<
typename OrderedHashMap<KEY, VALUE, HASH, VALUE_TYPE>::iterator,
bool>
OrderedHashMap<KEY, VALUE, HASH, VALUE_TYPE>::insert(const SOURCE_TYPE& value)
OrderedHashMap<KEY, VALUE, HASH, VALUE_TYPE>::insert(const VALUE_TYPE& value)
{
Bucket* bucket = getBucketForKey(get_key(value));
Link* foundLink = 0;
Expand Down Expand Up @@ -1507,11 +1505,10 @@ OrderedHashMap<KEY, VALUE, HASH, VALUE_TYPE>::insert(const SOURCE_TYPE& value)
}

template <class KEY, class VALUE, class HASH, class VALUE_TYPE>
template <class SOURCE_TYPE>
inline bsl::pair<
typename OrderedHashMap<KEY, VALUE, HASH, VALUE_TYPE>::iterator,
bool>
OrderedHashMap<KEY, VALUE, HASH, VALUE_TYPE>::rinsert(const SOURCE_TYPE& value)
OrderedHashMap<KEY, VALUE, HASH, VALUE_TYPE>::rinsert(const VALUE_TYPE& value)
{
Bucket* bucket = getBucketForKey(get_key(value));
Link* foundLink = 0;
Expand Down Expand Up @@ -1629,6 +1626,15 @@ OrderedHashMap<KEY, VALUE, HASH, VALUE_TYPE>::get_allocator() const
}

} // close package namespace

namespace bslalg {

template <class KEY, class VALUE, class HASH, class VALUE_TYPE>
struct HasStlIterators<bmqc::OrderedHashMap<KEY, VALUE, HASH, VALUE_TYPE> >
: bsl::true_type {};

} // close namespace bslalg

} // close enterprise namespace

#endif
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4261,8 +4261,8 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
->domain());
}

for (AppInfosCIter cit = removedAppIds.cbegin();
cit != removedAppIds.cend();
for (AppInfosCIter cit = removedAppIds.begin();
cit != removedAppIds.end();
++cit) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
// Note: In non-CSL mode, the queue deletion callback is
Expand Down
5 changes: 2 additions & 3 deletions src/groups/mqb/mqbblp/mqbblp_domain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ void Domain::updateAuthorizedAppIds(const AppInfos& addedAppIds,
}
bsl::vector<bsl::string>& authorizedAppIds = queueMode.fanout().appIDs();

for (AppInfosCIter cit = addedAppIds.cbegin(); cit != addedAppIds.cend();
for (AppInfosCIter cit = addedAppIds.begin(); cit != addedAppIds.end();
++cit) {
if (bsl::find(authorizedAppIds.begin(),
authorizedAppIds.end(),
Expand All @@ -260,8 +260,7 @@ void Domain::updateAuthorizedAppIds(const AppInfos& addedAppIds,
authorizedAppIds.push_back(cit->first);
}

for (AppInfosCIter cit = removedAppIds.cbegin();
cit != removedAppIds.cend();
for (AppInfosCIter cit = removedAppIds.begin(); cit != removedAppIds.end();
++cit) {
const bsl::vector<bsl::string>::const_iterator it = bsl::find(
authorizedAppIds.begin(),
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_domain.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain,
typedef QueueMap::iterator QueueMapIter;
typedef QueueMap::const_iterator QueueMapCIter;

typedef mqbi::Storage::AppInfos AppInfos;
typedef mqbc::ClusterState::AppInfos AppInfos;
typedef AppInfos::const_iterator AppInfosCIter;

enum DomainState {
Expand Down
3 changes: 2 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@ BMQTST_TEST_F(Test, putAliveIdleSendAliveTwoSubstreams)
{
bmqtst::ScopedLogObserver logObserver(ball::Severity::INFO,
bmqtst::TestHelperUtil::allocator());
size_t expectedLogRecords = 0U;

const bsls::Types::Int64 k_MAX_IDLE_TIME = 10;

Expand All @@ -440,6 +439,8 @@ BMQTST_TEST_F(Test, putAliveIdleSendAliveTwoSubstreams)
d_storage.addVirtualStorage(errorDescription, id1, key1);
d_storage.addVirtualStorage(errorDescription, id2, key2);

size_t expectedLogRecords = logObserver.records().size();

d_monitor.registerSubStream(id1);
d_monitor.registerSubStream(id2);

Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_queuestate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ void QueueState::loadInternals(mqbcmd::QueueState* out) const

size_t i = 0;
for (mqbi::Storage::AppInfos::const_iterator cit =
appIdKeyPairs.cbegin();
cit != appIdKeyPairs.cend();
appIdKeyPairs.begin();
cit != appIdKeyPairs.end();
++cit, ++i) {
virtualStorages[i].appId() = cit->first;
os.reset();
Expand Down
29 changes: 15 additions & 14 deletions src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,11 @@ mqbi::QueueHandle* RelayQueueEngine::getHandle(
BSLS_ASSERT_SAFE(app);

if (!app->isAuthorized()) {
app->authorize();
if (app->authorize()) {
BALL_LOG_INFO << "Queue '" << d_queueState_p->uri()
<< "' authorized App '" << downstreamInfo.appId()
<< "' with ordinal " << app->ordinal() << ".";
}
}

queueHandle->registerSubStream(
Expand Down Expand Up @@ -1613,8 +1617,8 @@ void RelayQueueEngine::loadInternals(mqbcmd::QueueEngine* out) const
subStreams.reserve(appIdKeyPairs.size());

for (mqbi::Storage::AppInfos::const_iterator cit =
appIdKeyPairs.cbegin();
cit != appIdKeyPairs.cend();
appIdKeyPairs.begin();
cit != appIdKeyPairs.end();
++cit) {
subStreams.resize(subStreams.size() + 1);
mqbcmd::RelayQueueEngineSubStream& subStream = subStreams.back();
Expand Down Expand Up @@ -1888,10 +1892,10 @@ bool RelayQueueEngine::checkForDuplicate(const App_State* app,
if (d_queueState_p->domain()->cluster()->isRemote()) {
d_realStorageIter_mp->reset(msgGUID);
if (!d_realStorageIter_mp->atEnd()) {
mqbi::AppMessage& appState = d_realStorageIter_mp->appMessageState(
app->ordinal());
const mqbi::AppMessage& appView =
d_realStorageIter_mp->appMessageView(app->ordinal());

if (appState.isPushing()) {
if (appView.isPushing()) {
BMQ_LOGTHROTTLE_INFO()
<< "Remote queue: " << d_queueState_p->uri()
<< " (id: " << d_queueState_p->id() << ", App '"
Expand Down Expand Up @@ -1931,16 +1935,13 @@ void RelayQueueEngine::storePush(
result != mqbi::StorageResult::e_SUCCESS)) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;

if (result != mqbi::StorageResult::e_GUID_NOT_UNIQUE ||
isOutOfOrder) {
BMQ_LOGTHROTTLE_INFO()
<< d_queueState_p->uri() << " failed to store GUID ["
<< msgGUID << "], result = " << result;
}
// A redelivery PUSH for one App in the presence of another App
// can result in 'e_GUID_NOT_UNIQUE'.
BMQ_LOGTHROTTLE_INFO()
<< d_queueState_p->uri() << " failed to store GUID ["
<< msgGUID << "], result = " << result;
}
else {
BSLS_ASSERT_SAFE(dataStreamMessage);

// Reusing previously cached ordinals.
for (bmqp::Protocol::SubQueueInfosArray::const_iterator cit =
subscriptions.begin();
Expand Down
5 changes: 3 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ void RemoteQueue::onHandleReleased(
d_state_p->storage()->hasVirtualStorage(appId,
&appKey);
BSLS_ASSERT_SAFE(hasVirtualStorage);
d_state_p->storage()->removeVirtualStorage(appKey);
d_state_p->storage()->removeVirtualStorage(appKey, false);

(void)
hasVirtualStorage; // Compiler happiness in opt build
Expand All @@ -749,7 +749,8 @@ void RemoteQueue::onHandleReleased(
d_state_p->handleParameters().flags())) {
// Lost last reader in non-fanout mode
d_state_p->storage()->removeVirtualStorage(
mqbi::QueueEngine::k_DEFAULT_APP_KEY);
mqbi::QueueEngine::k_DEFAULT_APP_KEY,
false);
d_state_p->storage()->removeAll(mqbu::StorageKey::k_NULL_KEY);
}
}
Expand Down
54 changes: 28 additions & 26 deletions src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1865,8 +1865,8 @@ void RootQueueEngine::afterAppIdRegistered(
return; // RETURN;
}

for (mqbi::Storage::AppInfos::const_iterator cit = addedAppIds.cbegin();
cit != addedAppIds.cend();
for (mqbi::Storage::AppInfos::const_iterator cit = addedAppIds.begin();
cit != addedAppIds.end();
++cit) {
// We need to handle 2 scenarios here: a consumer with the specified
// 'appId' may have already opened the queue, or otherwise.
Expand Down Expand Up @@ -1915,11 +1915,10 @@ void RootQueueEngine::afterAppIdUnregistered(
return; // RETURN
}

for (mqbi::Storage::AppInfos::const_iterator cit = removedAppIds.cbegin();
cit != removedAppIds.cend();
for (mqbi::Storage::AppInfos::const_iterator cit = removedAppIds.begin();
cit != removedAppIds.end();
++cit) {
const bsl::string& appId = cit->first;
const mqbu::StorageKey& appKey = cit->second;
const bsl::string& appId = cit->first;

Apps::iterator iter = d_apps.find(appId);
BSLS_ASSERT_SAFE(iter != d_apps.end());
Expand All @@ -1929,19 +1928,8 @@ void RootQueueEngine::afterAppIdUnregistered(
// we still keep the app but invalidate the authorization
iter->second->unauthorize();

// Do a best effort to confirm the messages and remove the storage. If
// either fails, just log the condition.

const mqbi::StorageResult::Enum rc =
d_queueState_p->storage()->removeAll(appKey);
if (rc != mqbi::StorageResult::e_SUCCESS) {
BALL_LOG_WARN << "#QUEUE_APPID_UNREGISTER_FAILURE "
<< "Failed to unregister appId '" << appId
<< "', appKey '" << appKey << "' of queue '"
<< d_queueState_p->queue()->description()
<< "' [reason: " << mqbi::StorageResult::toAscii(rc)
<< "]";
}
// There is no need to purge the storage. 'removeVirtualStorage' will
// do that.

d_consumptionMonitor.unregisterSubStream(appId);
}
Expand All @@ -1966,17 +1954,31 @@ void RootQueueEngine::registerStorage(const bsl::string& appId,
BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread(
d_queueState_p->queue()));

BALL_LOG_INFO << "Local queue: " << d_queueState_p->uri()
<< " (id: " << d_queueState_p->id()
<< ") now has storage: [App Id: " << appId
<< ", key: " << appKey << ", ordinal: " << appOrdinal << "]";

Apps::iterator iter = d_apps.find(appId);
BSLS_ASSERT_SAFE(iter != d_apps.end());

iter->second->authorize(appKey, appOrdinal);
AppState& app = *iter->second;

if (app.isAuthorized()) {
BSLS_ASSERT_SAFE(appKey == app.appKey());

BALL_LOG_INFO << "Local queue: " << d_queueState_p->uri()
<< " (id: " << d_queueState_p->id()
<< ") changing [App Id: " << appId << ", key: " << appKey
<< ", ordinal: " << app.ordinal()
<< "] to [ordinal: " << appOrdinal << "]";
}
else {
BALL_LOG_INFO << "Local queue: " << d_queueState_p->uri()
<< " (id: " << d_queueState_p->id()
<< ") now has storage: [App Id: " << appId
<< ", key: " << appKey << ", ordinal: " << appOrdinal
<< "]";

d_consumptionMonitor.registerSubStream(appId);
d_consumptionMonitor.registerSubStream(appId);
}

iter->second->authorize(appKey, appOrdinal);
}

void RootQueueEngine::unregisterStorage(
Expand Down
25 changes: 22 additions & 3 deletions src/groups/mqb/mqbc/mqbc_clusterstate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,25 @@ namespace mqbc {
// class ClusterStateQueueInfo
// ---------------------------

bool operator==(const ClusterStateQueueInfo::AppInfos& lhs,
const ClusterStateQueueInfo::AppInfos& rhs)
{
// This ignores the order

if (lhs.size() != rhs.size()) {
return false;
}
typedef mqbi::Storage::AppInfos::const_iterator Iter;

for (Iter iter = lhs.begin(); iter != lhs.end(); ++iter) {
if (rhs.count(iter->first) != 1) {
return false;
}
}

return true;
}

bsl::ostream& ClusterStateQueueInfo::print(bsl::ostream& stream,
int level,
int spacesPerLevel) const
Expand Down Expand Up @@ -493,8 +512,8 @@ int ClusterState::updateQueue(const bmqt::Uri& uri,
}

AppInfos& appIdInfos = iter->second->appInfos();
for (AppInfosCIter citer = addedAppIds.cbegin();
citer != addedAppIds.cend();
for (AppInfosCIter citer = addedAppIds.begin();
citer != addedAppIds.end();
++citer) {
if (!appIdInfos.insert(*citer).second) {
return rc_APPID_ALREADY_EXISTS; // RETURN
Expand All @@ -505,7 +524,7 @@ int ClusterState::updateQueue(const bmqt::Uri& uri,
citer != removedAppIds.end();
++citer) {
const AppInfosCIter appIdInfoCIter = appIdInfos.find(citer->first);
if (appIdInfoCIter == appIdInfos.cend()) {
if (appIdInfoCIter == appIdInfos.end()) {
return rc_APPID_NOT_FOUND; // RETURN
}
appIdInfos.erase(appIdInfoCIter);
Expand Down
6 changes: 6 additions & 0 deletions src/groups/mqb/mqbc/mqbc_clusterstate.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,12 @@ bool operator==(const ClusterStateQueueInfo& lhs,
bool operator!=(const ClusterStateQueueInfo& lhs,
const ClusterStateQueueInfo& rhs);

bool operator==(const ClusterStateQueueInfo::AppInfos& lhs,
const ClusterStateQueueInfo::AppInfos& rhs);

bool operator!=(const ClusterStateQueueInfo::AppInfos& lhs,
const ClusterStateQueueInfo::AppInfos& rhs);

// ==========================
// class ClusterStateObserver
// ==========================
Expand Down
Loading

0 comments on commit 3543f04

Please sign in to comment.