From 036a7912980f435afe7a940ec7fac5a2b6900c80 Mon Sep 17 00:00:00 2001 From: Dirk Faust Date: Sat, 10 Mar 2018 13:10:09 +0100 Subject: [PATCH] Add mqttGuard and Interface --- log.hpp | 35 +++++++++++++++++ mqttGuard.cpp | 85 ++++++++++++++++++++++++++++++++++++++++++ mqttGuard.hpp | 70 ++++++++++++++++++++++++++++++++++ mqttGuardInterface.cpp | 52 ++++++++++++++++++++++++++ mqttGuardInterface.hpp | 43 +++++++++++++++++++++ 5 files changed, 285 insertions(+) create mode 100644 log.hpp create mode 100644 mqttGuard.cpp create mode 100644 mqttGuard.hpp create mode 100644 mqttGuardInterface.cpp create mode 100644 mqttGuardInterface.hpp diff --git a/log.hpp b/log.hpp new file mode 100644 index 0000000..48eb039 --- /dev/null +++ b/log.hpp @@ -0,0 +1,35 @@ +#ifndef BASECAMP_LOG_HPP +#define BASECAMP_LOG_HPP + +#include +#include +#include + +// Header-Only definition of log functionality for reuse. +namespace basecampLog +{ + /// Severity of log messages. + enum class Severity + { + trace, + debug, + info, + warning, + error, + fatal, + }; + + const std::map SeverityText { + {Severity::trace, "TRACE"}, + {Severity::debug, "DEBUG"}, + {Severity::info, "INFO"}, + {Severity::warning, "WARNING"}, + {Severity::error, "ERROR"}, + {Severity::fatal, "FATAL"}, + }; + + /// Definition of a generic log callback. + using LogCallback = std::function; +} + +#endif // BASECAMP_LOG_HPP diff --git a/mqttGuard.cpp b/mqttGuard.cpp new file mode 100644 index 0000000..bea4b38 --- /dev/null +++ b/mqttGuard.cpp @@ -0,0 +1,85 @@ +#include "mqttGuard.hpp" + +#include +#include + +MqttGuard::MqttGuard(basecampLog::LogCallback logCallback) + : logCallback_(std::move(logCallback)) +{ +} + +void MqttGuard::registerPacket(IdType packetId) +{ + if (!isValidPacketId(packetId)) { + tryLog(basecampLog::Severity::info, "Not registering invlaid MQTT-packet.", packetId); + return; + } + + packets_.emplace_back(packetId); +} + +void MqttGuard::unregisterPacket(IdType packetId) +{ + auto found = std::find(packets_.begin(), packets_.end(), packetId); + + if (found == packets_.end()) { + tryLog(basecampLog::Severity::info, "Not unregistering unknown MQTT-packet.", packetId); + return; + } + + packets_.erase(found); +} + +size_t MqttGuard::remainingPacketCount() const +{ + return packets_.size(); +} + +bool MqttGuard::allSent() const +{ + return (remainingPacketCount() == 0); +} + +bool MqttGuard::isValidPacketId(IdType packetId) const +{ + // Generally invalid packet id + if (packetId == 0) + { + return false; + } + + // Do not allow duplicates + if (std::find(packets_.begin(), packets_.end(), packetId) != packets_.end()) + { + return false; + } + + return true; +} + +void MqttGuard::reset() +{ + tryLog(basecampLog::Severity::warning, "MqttGuard has been manually reset."); + + packets_.clear(); +} + +void MqttGuard::tryLog(basecampLog::Severity severity, const std::string &message) +{ + if (!logCallback_) { + return; + } + + logCallback_(severity, message); +} + +void MqttGuard::tryLog(basecampLog::Severity severity, const std::string &message, IdType packetId) +{ + if (!logCallback_) { + return; + } + + std::ostringstream text; + text << message << " - Packet-ID: " << packetId; + logCallback_(severity, text.str()); +} diff --git a/mqttGuard.hpp b/mqttGuard.hpp new file mode 100644 index 0000000..a8e4120 --- /dev/null +++ b/mqttGuard.hpp @@ -0,0 +1,70 @@ +#ifndef BASECAMP_MQTT_GUARD_HPP +#define BASECAMP_MQTT_GUARD_HPP + +#include "log.hpp" + +#include +#include + +/** + Helper to guard outgoing mqtt packets. + On packet sending, register the packets within this class and + pull the empty() function to see if everything has been sent completely. +*/ +class MqttGuard +{ +public: + /// Typesafety forward of AsyncMqttClients packet_id type + using IdType = uint16_t; + + /** + Construct a new guard. + @param LogCallback Optional log callback. + */ + explicit MqttGuard(basecampLog::LogCallback logCallback = {}); + ~MqttGuard() = default; + + /** + Register a new packet that is going to sent by mqtt. + @param packetId Packet-ID returned by AsyncMqttClient::publish() + */ + void registerPacket(IdType packetId); + + /** + Unregister a packet after it has been sent successfully. + @param packetId Packet-ID returned by AsyncMqttClient::onPublish() + */ + void unregisterPacket(IdType packetId); + + /// Returns the amount of packets waiting to be sent. + size_t remainingPacketCount() const; + + /// Returns true if all packets have been sent. + bool allSent() const; + + /** + Reset to empty state (force). + Maybe necessary if getting disconnected. + */ + void reset(); +private: + /** + Check a packetId for validity. + @param packetId Packet-ID to be checked. + @return True if the packetId can be safely added. + */ + bool isValidPacketId(IdType packetId) const; + + /// Try to log message if logCallback_ has been set in ctor. + void tryLog(basecampLog::Severity severity, const std::string& message); + + /// Try to log message with packetId if logCallback_ has been set in ctor. + void tryLog(basecampLog::Severity severity, const std::string& message, IdType packetId); + + /// Optional callback for log-messages + basecampLog::LogCallback logCallback_; + /// List of all remaining packets + std::vector packets_; +}; + +#endif // #define BASECAMP_MQTT_GUARD_HPP diff --git a/mqttGuardInterface.cpp b/mqttGuardInterface.cpp new file mode 100644 index 0000000..aaafe7a --- /dev/null +++ b/mqttGuardInterface.cpp @@ -0,0 +1,52 @@ +#include "MqttGuardInterface.hpp" + +MqttGuardInterface::MqttGuardInterface(AsyncMqttClient& mqttClient) + : mqttClient_(mqttClient) + , mqttGuard_(std::make_shared()) +{ + // Default callbacks in case the user does not set one so internally everything is kept fine. + mqttOnDisconnect([](AsyncMqttClientDisconnectReason /*reason*/){}); + mqttOnPublish([](uint16_t /*packetId*/) {}); +} + +uint16_t MqttGuardInterface::mqttPublish(const char* topic, uint8_t qos, bool retain, + const char* payload, size_t length, bool dup, uint16_t message_id) +{ + mqttGuard_->registerPacket(mqttClient_.publish(topic, qos, retain, payload, length, dup, message_id)); +} + +AsyncMqttClient& MqttGuardInterface::mqttOnPublish(AsyncMqttClientInternals::OnPublishUserCallback callback) +{ + // Intercept the onPublish and make sure that mqttGuard is held alive until the callback is fired. + auto mqttGuard = mqttGuard_; + return mqttClient_.onPublish([mqttGuard, callback](uint16_t packetId){ + mqttGuard->unregisterPacket(packetId); + if (callback) { + callback(packetId); + } + }); +} + +AsyncMqttClient& MqttGuardInterface::mqttOnDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback) +{ + // Intercept the onDisconnect and make sure that mqttGuard is held alive until the callback is fired. + auto mqttGuard = mqttGuard_; + return mqttClient_.onDisconnect([mqttGuard, callback] + (AsyncMqttClientDisconnectReason reason){ + // Make sure that disconnet respects that there could be no more packets be sent. + mqttGuard->reset(); + if (callback) { + callback(reason); + } + }); +} + +bool MqttGuardInterface::mqttAllSent() const +{ + return mqttGuard_->allSent(); +} + +size_t MqttGuardInterface::mqttRemainingPackets() const +{ + return mqttGuard_->remainingPacketCount(); +} diff --git a/mqttGuardInterface.hpp b/mqttGuardInterface.hpp new file mode 100644 index 0000000..3673ae9 --- /dev/null +++ b/mqttGuardInterface.hpp @@ -0,0 +1,43 @@ +#ifndef BASECAMP_MQTT_GUARD_INTERFACE_HPP +#define BASECAMP_MQTT_GUARD_INTERFACE_HPP + +#include "mqttGuard.hpp" + +#include + +#include + +/** + Class to interface between AsyncMqttClient and MqttGuard. + Intercepts outgoing messages and incoming publish acknowledgements to keep track of packets to be sent. + + Use mqttOnPublish instead of mqtt.OnPublish() + Use consequently mqttOnDisconnect instead of mqtt.OnDisconnect() + Use mqttPublish instead of mqtt.publish() + + Use then mqttAllSent() to check if there are remaining packets to be sent. + + Its up to the owner of this class to keep mqttClient valid in the lifetime of an instance of this class. + */ +class MqttGuardInterface +{ +public: + explicit MqttGuardInterface(AsyncMqttClient& mqttClient); + + AsyncMqttClient& mqttOnPublish(AsyncMqttClientInternals::OnPublishUserCallback callback); + AsyncMqttClient& mqttOnDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback); + + uint16_t mqttPublish(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr, size_t length = 0, bool dup = false, uint16_t message_id = 0); + + /// Returns true if all mqtt-packets have been sent. + bool mqttAllSent() const; + + /// Returns the remaining to-be-transmitted packet count. + size_t mqttRemainingPackets() const; + +private: + AsyncMqttClient& mqttClient_; + std::shared_ptr mqttGuard_; +}; + +#endif // BASECAMP_MQTT_GUARD_INTERFACE_HPP