Skip to content

Commit

Permalink
Support heartbeat status detection
Browse files Browse the repository at this point in the history
  • Loading branch information
chriadam committed Aug 18, 2023
1 parent e18d682 commit 9ab8023
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 1 deletion.
17 changes: 17 additions & 0 deletions inc/veutil/qt/ve_qitems_mqtt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class VeQItemMqtt : public VeQItem
class VeQItemMqttProducer : public VeQItemProducer
{
Q_OBJECT
Q_PROPERTY(HeartbeatState heartbeatState READ heartbeatState NOTIFY heartbeatStateChanged)
Q_PROPERTY(ConnectionState connectionState READ connectionState NOTIFY connectionStateChanged)
Q_PROPERTY(QMqttClient::ClientError error READ error NOTIFY errorChanged)
Q_PROPERTY(QString portalId READ portalId WRITE setPortalId NOTIFY portalIdChanged)
Expand All @@ -63,6 +64,16 @@ class VeQItemMqttProducer : public VeQItemProducer
};
Q_ENUM(ConnectionState)

// The heartbeat state tracks whether the CerboGX is actively alive
// and writing heartbeat timestamp updates to the MQTT broker.
// We might be actively connected to the VRM broker, but the actual
// device itself might have gone offline and so the data is "not live".
enum HeartbeatState {
HeartbeatActive,
HeartbeatInactive,
};
Q_ENUM(HeartbeatState)

VeQItemMqttProducer(VeQItem *root, const QString &id, const QString &clientIdPrefix, QObject *parent = nullptr);

VeQItem *createItem() override;
Expand All @@ -79,13 +90,15 @@ class VeQItemMqttProducer : public VeQItemProducer
void setCredentials(const QString &username, const QString &password);
bool publishValue(const QString &uid, const QVariant &value);
bool requestValue(const QString &uid);
HeartbeatState heartbeatState() const;
ConnectionState connectionState() const;
QMqttClient::ClientError error() const;

QString portalId() const;
void setPortalId(const QString &portalId);

Q_SIGNALS:
void heartbeatStateChanged();
void connectionStateChanged();
void errorChanged();
void portalIdChanged();
Expand All @@ -106,11 +119,13 @@ private Q_SLOTS:
void doKeepAlive(bool suppressRepublish = false);

private:
void setHeartbeatState(HeartbeatState heartbeatState);
void setConnectionState(ConnectionState connectionState);
void setError(QMqttClient::ClientError error);
void parseMessage(const QString &path, const QByteArray &message);

QTimer *mKeepAliveTimer;
QTimer *mHeartBeatTimer;
QTimer *mReadyStateTimer;
QTimer *mReadyStateFallbackTimer;
QMqttClient *mMqttConnection;
Expand All @@ -123,12 +138,14 @@ private Q_SLOTS:
QUrl mUrl;
QString mHostName;
int mPort;
HeartbeatState mHeartbeatState;
ConnectionState mConnectionState;
const int mReconnectAttemptIntervals[6] = { 250, 1000, 2000, 5000, 10000, 30000 };
quint16 mAutoReconnectAttemptCounter;
const quint16 mAutoReconnectMaxAttempts;
QMqttClient::ClientError mError;
QMqttClient::ProtocolVersion mProtocolVersion;
int mMissedHeartbeats;
bool mReceivedMessage;
bool mIsVrmBroker;
};
Expand Down
43 changes: 42 additions & 1 deletion src/qt/ve_qitems_mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,18 @@ VeQItemMqttProducer::VeQItemMqttProducer(
VeQItem *root, const QString &id, const QString &clientIdPrefix, QObject *parent)
: VeQItemProducer(root, id, parent),
mKeepAliveTimer(new QTimer(this)),
mHeartBeatTimer(new QTimer(this)),
mReadyStateTimer(new QTimer(this)),
mReadyStateFallbackTimer(new QTimer(this)),
mMqttConnection(nullptr),
mPort(0),
mHeartbeatState(HeartbeatInactive),
mConnectionState(Idle),
mAutoReconnectAttemptCounter(0),
mAutoReconnectMaxAttempts(sizeof(mReconnectAttemptIntervals)/sizeof(mReconnectAttemptIntervals[0])),
mError(QMqttClient::NoError),
mProtocolVersion(QMqttClient::MQTT_3_1_1),
mMissedHeartbeats(0),
mReceivedMessage(false),
mIsVrmBroker(false)
{
Expand All @@ -93,12 +96,28 @@ VeQItemMqttProducer::VeQItemMqttProducer(
const quint64 uniqueId = QRandomGenerator::global()->generate64();
mClientId.append(QStringLiteral("%1").arg(uniqueId, 16, 16, QLatin1Char('0')));

// start the timer once we have sent the first (empty) keepalive after subscribing.
mKeepAliveTimer->setInterval(1000 * 30);
connect(mKeepAliveTimer, &QTimer::timeout,
this, [this] {
doKeepAlive(/* suppressRepublish = */ true);
});
// start the timer once we have sent the first (empty) keepalive after subscribing.

// start the timer once we have received the first heartbeat.
mHeartBeatTimer->setInterval(3500); // heartbeat interval is 3s, but allow extra.
connect(mHeartBeatTimer, &QTimer::timeout,
this, [this] {
++mMissedHeartbeats;
#ifdef MQTT_WEBSOCKETS_ENABLED
if (mWebSocket && !mWebSocket->isOpen()) {
qWarning() << "Missed heartbeat due to websocket disconnection";
onDisconnected();
return;
}
#endif
qWarning() << "Missed heartbeat! Device data is not real-time.";
setHeartbeatState(HeartbeatInactive);
});

mReadyStateTimer->setSingleShot(true);
mReadyStateTimer->setInterval(1000);
Expand Down Expand Up @@ -264,13 +283,16 @@ void VeQItemMqttProducer::onConnected()

void VeQItemMqttProducer::onDisconnected()
{
setHeartbeatState(HeartbeatInactive);
setConnectionState(Disconnected);
if (error() == QMqttClient::NoError
&& mMqttConnection
&& mMqttConnection->error() != QMqttClient::NoError) {
setError(mMqttConnection->error());
}
mMissedHeartbeats = 0;
mKeepAliveTimer->stop();
mHeartBeatTimer->stop();
mReadyStateTimer->stop();
mReceivedMessage = false;
if (mMqttSubscription.data()) {
Expand Down Expand Up @@ -377,8 +399,14 @@ void VeQItemMqttProducer::onSubscriptionMessageReceived(const QMqttMessage &mess
const QString notificationPrefix = QStringLiteral("N/%1").arg(mPortalId);
if (topicName.startsWith(notificationPrefix)) {
const QString keepaliveTopic = notificationPrefix + QStringLiteral("/keepalive");
const QString heartbeatTopic = notificationPrefix + QStringLiteral("/heartbeat");
if (topicName.compare(keepaliveTopic, Qt::CaseInsensitive) == 0) {
// ignore keepalive topic.
} else if (topicName.compare(heartbeatTopic, Qt::CaseInsensitive) == 0) {
// (re)start our heartbeat timer.
mHeartBeatTimer->start();
mMissedHeartbeats = 0;
setHeartbeatState(HeartbeatActive);
} else {
// we have a topic message which we need to expose via VeQItem.
const QString path = topicName.mid(notificationPrefix.size() + 1);
Expand Down Expand Up @@ -443,6 +471,19 @@ void VeQItemMqttProducer::doKeepAlive(bool suppressRepublish)
}
}

VeQItemMqttProducer::HeartbeatState VeQItemMqttProducer::heartbeatState() const
{
return mHeartbeatState;
}

void VeQItemMqttProducer::setHeartbeatState(VeQItemMqttProducer::HeartbeatState heartbeatState)
{
if (mHeartbeatState != heartbeatState) {
mHeartbeatState = heartbeatState;
Q_EMIT heartbeatStateChanged();
}
}

VeQItemMqttProducer::ConnectionState VeQItemMqttProducer::connectionState() const
{
return mConnectionState;
Expand Down

0 comments on commit 9ab8023

Please sign in to comment.