Skip to content

Commit

Permalink
Ignore MQTT messages with the retain flag set
Browse files Browse the repository at this point in the history
  • Loading branch information
chriadam committed Jun 7, 2023
1 parent 51e8ff0 commit d9d618c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 26 deletions.
4 changes: 4 additions & 0 deletions inc/veutil/qt/ve_qitems_mqtt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
#include <veutil/qt/ve_qitem.hpp>

#include <QMqttClient>
#include <QMqttSubscription>

#include <QPointer>
#include <QHostAddress>
#include <QTimer>
#include <QSet>
Expand Down Expand Up @@ -97,6 +99,7 @@ private Q_SLOTS:
void onErrorChanged(QMqttClient::ClientError error);
void onStateChanged(QMqttClient::ClientState state);
void onMessageReceived(const QByteArray &message, const QMqttTopicName &topic);
void onSubscriptionMessageReceived(const QMqttMessage &message);
void doKeepAlive();

private:
Expand All @@ -108,6 +111,7 @@ private Q_SLOTS:
QTimer *mReadyStateTimer;
QTimer *mReadyStateFallbackTimer;
QMqttClient *mMqttConnection;
QPointer<QMqttSubscription> mMqttSubscription;
#ifdef MQTT_WEBSOCKETS_ENABLED
WebSocketDevice *mWebSocket = nullptr;
#endif // MQTT_WEBSOCKETS_ENABLED
Expand Down
73 changes: 47 additions & 26 deletions src/qt/ve_qitems_mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,11 @@ void VeQItemMqttProducer::onConnected()
if (mPortalId.isEmpty()) {
mMqttConnection->subscribe(QStringLiteral("N/+/system/0/Serial"));
} else {
mMqttConnection->subscribe(QStringLiteral("N/%1/#").arg(mPortalId));
QObject::disconnect(mMqttConnection, &QMqttClient::messageReceived,
this, &VeQItemMqttProducer::onMessageReceived);
mMqttSubscription = mMqttConnection->subscribe(QStringLiteral("N/%1/#").arg(mPortalId));
QObject::connect(mMqttSubscription.data(), &QMqttSubscription::messageReceived,
this, &VeQItemMqttProducer::onSubscriptionMessageReceived, Qt::UniqueConnection);
mMqttConnection->publish(QMqttTopicName(QStringLiteral("R/%1/system/0/Serial").arg(mPortalId)), QByteArray());
doKeepAlive();
}
Expand All @@ -254,6 +258,10 @@ void VeQItemMqttProducer::onDisconnected()
mKeepAliveTimer->stop();
mReadyStateTimer->stop();
mReceivedMessage = false;
if (mMqttSubscription.data()) {
QObject::disconnect(mMqttSubscription.data(), &QMqttSubscription::messageReceived,
this, &VeQItemMqttProducer::onSubscriptionMessageReceived);
}

if (mAutoReconnectAttemptCounter < mAutoReconnectMaxAttempts) {
// Attempt to reconnect. We use a staggered exponential backoff interval.
Expand Down Expand Up @@ -333,40 +341,53 @@ void VeQItemMqttProducer::onMessageReceived(const QByteArray &message, const QMq
if (parts.length() == 5 && parts[1] == payload.value(QStringLiteral("value")).toString()) {
setPortalId(parts[1]);
mMqttConnection->unsubscribe(QStringLiteral("N/+/system/0/Serial"));
mMqttConnection->subscribe(QStringLiteral("N/%1/#").arg(mPortalId));
QObject::disconnect(mMqttConnection, &QMqttClient::messageReceived,
this, &VeQItemMqttProducer::onMessageReceived);
mMqttSubscription = mMqttConnection->subscribe(QStringLiteral("N/%1/#").arg(mPortalId));
QObject::connect(mMqttSubscription.data(), &QMqttSubscription::messageReceived,
this, &VeQItemMqttProducer::onSubscriptionMessageReceived, Qt::UniqueConnection);
doKeepAlive();
} else {
qWarning() << "VeQItemMqttProducer::onMessageReceived(): portal id mismatch: "
<< topicName << " -> " << QString::fromUtf8(message);
}
}
} else {
const QString notificationPrefix = QStringLiteral("N/%1").arg(mPortalId);
if (topicName.startsWith(notificationPrefix)) {
const QString keepaliveTopic = notificationPrefix + QStringLiteral("/keepalive");
if (topicName.compare(keepaliveTopic, Qt::CaseInsensitive) == 0) {
// ignore keepalive topic.
} else {
// we have a topic message which we need to expose via VeQItem.
const QString path = topicName.mid(notificationPrefix.size() + 1);
parseMessage(path, message);
}
}
}

void VeQItemMqttProducer::onSubscriptionMessageReceived(const QMqttMessage &message)
{
const QString topicName(message.topic().name());

const QString notificationPrefix = QStringLiteral("N/%1").arg(mPortalId);
if (topicName.startsWith(notificationPrefix)) {
const QString keepaliveTopic = notificationPrefix + QStringLiteral("/keepalive");
if (topicName.compare(keepaliveTopic, Qt::CaseInsensitive) == 0) {
// ignore keepalive topic.
} else if (message.retain()) {
// ignore retained messages, as for internet brokers (VRM)
// nothing will "unpublish" the topic for a device which goes offline.
// see issue #313 in gui-v2.
} else {
// we have a topic message which we need to expose via VeQItem.
const QString path = topicName.mid(notificationPrefix.size() + 1);
parseMessage(path, message.payload());
}
}

// Once we have received a message, perform KeepAlive.
if (!mReceivedMessage) {
mReceivedMessage = true;
doKeepAlive();
} else if (connectionState() == VeQItemMqttProducer::Initializing) {
// We will receive a flurry of messages upon initial connection.
// Once they subside we should transition to Ready state.
if (!mReadyStateTimer->isActive()) {
// transition to Ready state after 5 seconds
// even if we are still receiving initial messages.
mReadyStateFallbackTimer->start();
}
mReadyStateTimer->start(); // restart the timer.
// Once we have received a message, perform KeepAlive.
if (!mReceivedMessage) {
mReceivedMessage = true;
doKeepAlive();
} else if (connectionState() == VeQItemMqttProducer::Initializing) {
// We will receive a flurry of messages upon initial connection.
// Once they subside we should transition to Ready state.
if (!mReadyStateTimer->isActive()) {
// transition to Ready state after 5 seconds
// even if we are still receiving initial messages.
mReadyStateFallbackTimer->start();
}
mReadyStateTimer->start(); // restart the timer.
}
}

Expand Down

0 comments on commit d9d618c

Please sign in to comment.