diff --git a/inc/veutil/qt/ve_qitems_mqtt.hpp b/inc/veutil/qt/ve_qitems_mqtt.hpp index 6b62039..ee7edc0 100755 --- a/inc/veutil/qt/ve_qitems_mqtt.hpp +++ b/inc/veutil/qt/ve_qitems_mqtt.hpp @@ -45,6 +45,7 @@ class VeQItemMqtt : public VeQItem class VeQItemMqttProducer : public VeQItemProducer { Q_OBJECT + Q_PROPERTY(HeartbeatState heartbeatState READ heartbeatState NOTIFY heartbeatStateChanged) Q_PROPERTY(ConnectionState connectionState READ connectionState NOTIFY connectionStateChanged) Q_PROPERTY(QMqttClient::ClientError error READ error NOTIFY errorChanged) Q_PROPERTY(QString portalId READ portalId WRITE setPortalId NOTIFY portalIdChanged) @@ -63,6 +64,16 @@ class VeQItemMqttProducer : public VeQItemProducer }; Q_ENUM(ConnectionState) + // The heartbeat state tracks whether the CerboGX is actively alive + // and writing heartbeat timestamp updates to the MQTT broker. + // We might be actively connected to the VRM broker, but the actual + // device itself might have gone offline and so the data is "not live". + enum HeartbeatState { + HeartbeatActive, + HeartbeatInactive, + }; + Q_ENUM(HeartbeatState) + VeQItemMqttProducer(VeQItem *root, const QString &id, const QString &clientIdPrefix, QObject *parent = nullptr); VeQItem *createItem() override; @@ -79,6 +90,7 @@ class VeQItemMqttProducer : public VeQItemProducer void setCredentials(const QString &username, const QString &password); bool publishValue(const QString &uid, const QVariant &value); bool requestValue(const QString &uid); + HeartbeatState heartbeatState() const; ConnectionState connectionState() const; QMqttClient::ClientError error() const; @@ -86,6 +98,7 @@ class VeQItemMqttProducer : public VeQItemProducer void setPortalId(const QString &portalId); Q_SIGNALS: + void heartbeatStateChanged(); void connectionStateChanged(); void errorChanged(); void portalIdChanged(); @@ -106,11 +119,13 @@ private Q_SLOTS: void doKeepAlive(bool suppressRepublish = false); private: + void setHeartbeatState(HeartbeatState heartbeatState); void setConnectionState(ConnectionState connectionState); void setError(QMqttClient::ClientError error); void parseMessage(const QString &path, const QByteArray &message); QTimer *mKeepAliveTimer; + QTimer *mHeartBeatTimer; QTimer *mReadyStateTimer; QTimer *mReadyStateFallbackTimer; QMqttClient *mMqttConnection; @@ -123,12 +138,14 @@ private Q_SLOTS: QUrl mUrl; QString mHostName; int mPort; + HeartbeatState mHeartbeatState; ConnectionState mConnectionState; const int mReconnectAttemptIntervals[6] = { 250, 1000, 2000, 5000, 10000, 30000 }; quint16 mAutoReconnectAttemptCounter; const quint16 mAutoReconnectMaxAttempts; QMqttClient::ClientError mError; QMqttClient::ProtocolVersion mProtocolVersion; + int mMissedHeartbeats; bool mReceivedMessage; bool mIsVrmBroker; }; diff --git a/src/qt/ve_qitems_mqtt.cpp b/src/qt/ve_qitems_mqtt.cpp index 4eedbd4..5c5c2b1 100755 --- a/src/qt/ve_qitems_mqtt.cpp +++ b/src/qt/ve_qitems_mqtt.cpp @@ -68,15 +68,18 @@ VeQItemMqttProducer::VeQItemMqttProducer( VeQItem *root, const QString &id, const QString &clientIdPrefix, QObject *parent) : VeQItemProducer(root, id, parent), mKeepAliveTimer(new QTimer(this)), + mHeartBeatTimer(new QTimer(this)), mReadyStateTimer(new QTimer(this)), mReadyStateFallbackTimer(new QTimer(this)), mMqttConnection(nullptr), mPort(0), + mHeartbeatState(HeartbeatInactive), mConnectionState(Idle), mAutoReconnectAttemptCounter(0), mAutoReconnectMaxAttempts(sizeof(mReconnectAttemptIntervals)/sizeof(mReconnectAttemptIntervals[0])), mError(QMqttClient::NoError), mProtocolVersion(QMqttClient::MQTT_3_1_1), + mMissedHeartbeats(0), mReceivedMessage(false), mIsVrmBroker(false) { @@ -93,12 +96,28 @@ VeQItemMqttProducer::VeQItemMqttProducer( const quint64 uniqueId = QRandomGenerator::global()->generate64(); mClientId.append(QStringLiteral("%1").arg(uniqueId, 16, 16, QLatin1Char('0'))); + // start the timer once we have sent the first (empty) keepalive after subscribing. mKeepAliveTimer->setInterval(1000 * 30); connect(mKeepAliveTimer, &QTimer::timeout, this, [this] { doKeepAlive(/* suppressRepublish = */ true); }); - // start the timer once we have sent the first (empty) keepalive after subscribing. + + // start the timer once we have received the first heartbeat. + mHeartBeatTimer->setInterval(3500); // heartbeat interval is 3s, but allow extra. + connect(mHeartBeatTimer, &QTimer::timeout, + this, [this] { + ++mMissedHeartbeats; +#ifdef MQTT_WEBSOCKETS_ENABLED + if (mWebSocket && !mWebSocket->isOpen()) { + qWarning() << "Missed heartbeat due to websocket disconnection"; + onDisconnected(); + return; + } +#endif + qWarning() << "Missed heartbeat! Device data is not real-time."; + setHeartbeatState(HeartbeatInactive); + }); mReadyStateTimer->setSingleShot(true); mReadyStateTimer->setInterval(1000); @@ -264,13 +283,16 @@ void VeQItemMqttProducer::onConnected() void VeQItemMqttProducer::onDisconnected() { + setHeartbeatState(HeartbeatInactive); setConnectionState(Disconnected); if (error() == QMqttClient::NoError && mMqttConnection && mMqttConnection->error() != QMqttClient::NoError) { setError(mMqttConnection->error()); } + mMissedHeartbeats = 0; mKeepAliveTimer->stop(); + mHeartBeatTimer->stop(); mReadyStateTimer->stop(); mReceivedMessage = false; if (mMqttSubscription.data()) { @@ -377,8 +399,14 @@ void VeQItemMqttProducer::onSubscriptionMessageReceived(const QMqttMessage &mess const QString notificationPrefix = QStringLiteral("N/%1").arg(mPortalId); if (topicName.startsWith(notificationPrefix)) { const QString keepaliveTopic = notificationPrefix + QStringLiteral("/keepalive"); + const QString heartbeatTopic = notificationPrefix + QStringLiteral("/heartbeat"); if (topicName.compare(keepaliveTopic, Qt::CaseInsensitive) == 0) { // ignore keepalive topic. + } else if (topicName.compare(heartbeatTopic, Qt::CaseInsensitive) == 0) { + // (re)start our heartbeat timer. + mHeartBeatTimer->start(); + mMissedHeartbeats = 0; + setHeartbeatState(HeartbeatActive); } else { // we have a topic message which we need to expose via VeQItem. const QString path = topicName.mid(notificationPrefix.size() + 1); @@ -443,6 +471,19 @@ void VeQItemMqttProducer::doKeepAlive(bool suppressRepublish) } } +VeQItemMqttProducer::HeartbeatState VeQItemMqttProducer::heartbeatState() const +{ + return mHeartbeatState; +} + +void VeQItemMqttProducer::setHeartbeatState(VeQItemMqttProducer::HeartbeatState heartbeatState) +{ + if (mHeartbeatState != heartbeatState) { + mHeartbeatState = heartbeatState; + Q_EMIT heartbeatStateChanged(); + } +} + VeQItemMqttProducer::ConnectionState VeQItemMqttProducer::connectionState() const { return mConnectionState;