Skip to content

Commit

Permalink
MQTT: transition to Ready state after receiving "finished" message
Browse files Browse the repository at this point in the history
FlashMQ broker now sends a "full_publish_completed" message once
it has sent all of the initial-state messages.

So, there is no longer any need to use a timing heuristic to
determine when all initial-state messages have been received.
  • Loading branch information
chriadam authored and blammit committed Jan 8, 2024
1 parent f5a3dd7 commit e731a43
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 33 deletions.
1 change: 0 additions & 1 deletion inc/veutil/qt/ve_qitems_mqtt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ private Q_SLOTS:

QTimer *mKeepAliveTimer;
QTimer *mHeartBeatTimer;
QTimer *mReadyStateTimer;
QTimer *mReadyStateFallbackTimer;
QMqttClient *mMqttConnection;
QPointer<QMqttSubscription> mMqttSubscription;
Expand Down
51 changes: 19 additions & 32 deletions src/qt/ve_qitems_mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ VeQItemMqttProducer::VeQItemMqttProducer(
: VeQItemProducer(root, id, parent),
mKeepAliveTimer(new QTimer(this)),
mHeartBeatTimer(new QTimer(this)),
mReadyStateTimer(new QTimer(this)),
mReadyStateFallbackTimer(new QTimer(this)),
mMqttConnection(nullptr),
mPort(0),
Expand Down Expand Up @@ -123,21 +122,10 @@ VeQItemMqttProducer::VeQItemMqttProducer(
}
});

mReadyStateTimer->setSingleShot(true);
mReadyStateTimer->setInterval(1000);
connect(mReadyStateTimer, &QTimer::timeout,
this, [this] {
mReadyStateTimer->stop();
if (connectionState() == Initializing) {
setConnectionState(Ready);
}
});

mReadyStateFallbackTimer->setSingleShot(true);
mReadyStateFallbackTimer->setInterval(4000);
mReadyStateFallbackTimer->setInterval(5000);
connect(mReadyStateFallbackTimer, &QTimer::timeout,
this, [this] {
mReadyStateTimer->stop();
if (connectionState() == Initializing) {
setConnectionState(Ready);
}
Expand Down Expand Up @@ -312,7 +300,7 @@ void VeQItemMqttProducer::onDisconnected()
mMissedHeartbeats = 0;
mKeepAliveTimer->stop();
mHeartBeatTimer->stop();
mReadyStateTimer->stop();
mReadyStateFallbackTimer->stop();
mReceivedMessage = false;
if (mMqttSubscription.data()) {
mMqttSubscription->unsubscribe();
Expand Down Expand Up @@ -414,14 +402,30 @@ void VeQItemMqttProducer::onMessageReceived(const QByteArray &message, const QMq

void VeQItemMqttProducer::onSubscriptionMessageReceived(const QMqttMessage &message)
{
const QString topicName(message.topic().name());
// Once we have received a message, transition to Initializing state.
// Remain in this state while we wait for the flurry of initial messages to end.
// The broker should tell us (with "full_publish_completed" topic message) when
// we can transition to "Ready" state.
if (!mReceivedMessage) {
mReceivedMessage = true;
setConnectionState(VeQItemMqttProducer::Initializing);
// we will transition to Ready state after some time
// even if we are still receiving initial messages,
// just in case the broker forgets to send us the Ready message.
mReadyStateFallbackTimer->start();
}

const QString topicName(message.topic().name());
const QString notificationPrefix = QStringLiteral("N/%1").arg(mPortalId);
if (topicName.startsWith(notificationPrefix)) {
const QString keepaliveTopic = notificationPrefix + QStringLiteral("/keepalive");
const QString heartbeatTopic = notificationPrefix + QStringLiteral("/heartbeat");
const QString readyTopic = notificationPrefix + QStringLiteral("/full_publish_completed");
if (topicName.compare(keepaliveTopic, Qt::CaseInsensitive) == 0) {
// ignore keepalive topic.
} else if (topicName.compare(readyTopic, Qt::CaseInsensitive) == 0
&& connectionState() == VeQItemMqttProducer::Initializing) {
setConnectionState(VeQItemMqttProducer::Ready);
} else if (topicName.compare(heartbeatTopic, Qt::CaseInsensitive) == 0) {
// (re)start our heartbeat timer.
mHeartBeatTimer->start();
Expand All @@ -433,23 +437,6 @@ void VeQItemMqttProducer::onSubscriptionMessageReceived(const QMqttMessage &mess
parseMessage(path, message.payload());
}
}

// Once we have received a message, transition to Initializing state.
// Remain in this state while we wait for the flurry of initial messages to end.
if (!mReceivedMessage) {
mReceivedMessage = true;
setConnectionState(VeQItemMqttProducer::Initializing);
}
if (connectionState() == VeQItemMqttProducer::Initializing) {
// We will receive a flurry of messages upon initial connection.
// Once they subside we should transition to Ready state.
if (!mReadyStateTimer->isActive()) {
// transition to Ready state after 10 seconds
// even if we are still receiving initial messages.
mReadyStateFallbackTimer->start();
}
mReadyStateTimer->start(); // restart the timer.
}
}

void VeQItemMqttProducer::parseMessage(const QString &path, const QByteArray &message)
Expand Down

0 comments on commit e731a43

Please sign in to comment.