Skip to content
This repository has been archived by the owner on Jan 11, 2020. It is now read-only.

Commit

Permalink
Add mqttGuard and Interface
Browse files Browse the repository at this point in the history
  • Loading branch information
Dirk007 committed Mar 10, 2018
1 parent 0979ef1 commit 036a791
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 0 deletions.
35 changes: 35 additions & 0 deletions log.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#ifndef BASECAMP_LOG_HPP
#define BASECAMP_LOG_HPP

#include <functional>
#include <string>
#include <map>

// Header-Only definition of log functionality for reuse.
namespace basecampLog
{
/// Severity of log messages.
enum class Severity
{
trace,
debug,
info,
warning,
error,
fatal,
};

const std::map<Severity, std::string> SeverityText {
{Severity::trace, "TRACE"},
{Severity::debug, "DEBUG"},
{Severity::info, "INFO"},
{Severity::warning, "WARNING"},
{Severity::error, "ERROR"},
{Severity::fatal, "FATAL"},
};

/// Definition of a generic log callback.
using LogCallback = std::function<void(basecampLog::Severity severity, const std::string& message)>;
}

#endif // BASECAMP_LOG_HPP
85 changes: 85 additions & 0 deletions mqttGuard.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#include "mqttGuard.hpp"

#include <algorithm>
#include <sstream>

MqttGuard::MqttGuard(basecampLog::LogCallback logCallback)
: logCallback_(std::move(logCallback))
{
}

void MqttGuard::registerPacket(IdType packetId)
{
if (!isValidPacketId(packetId)) {
tryLog(basecampLog::Severity::info, "Not registering invlaid MQTT-packet.", packetId);
return;
}

packets_.emplace_back(packetId);
}

void MqttGuard::unregisterPacket(IdType packetId)
{
auto found = std::find(packets_.begin(), packets_.end(), packetId);

if (found == packets_.end()) {
tryLog(basecampLog::Severity::info, "Not unregistering unknown MQTT-packet.", packetId);
return;
}

packets_.erase(found);
}

size_t MqttGuard::remainingPacketCount() const
{
return packets_.size();
}

bool MqttGuard::allSent() const
{
return (remainingPacketCount() == 0);
}

bool MqttGuard::isValidPacketId(IdType packetId) const
{
// Generally invalid packet id
if (packetId == 0)
{
return false;
}

// Do not allow duplicates
if (std::find(packets_.begin(), packets_.end(), packetId) != packets_.end())
{
return false;
}

return true;
}

void MqttGuard::reset()
{
tryLog(basecampLog::Severity::warning, "MqttGuard has been manually reset.");

packets_.clear();
}

void MqttGuard::tryLog(basecampLog::Severity severity, const std::string &message)
{
if (!logCallback_) {
return;
}

logCallback_(severity, message);
}

void MqttGuard::tryLog(basecampLog::Severity severity, const std::string &message, IdType packetId)
{
if (!logCallback_) {
return;
}

std::ostringstream text;
text << message << " - Packet-ID: " << packetId;
logCallback_(severity, text.str());
}
70 changes: 70 additions & 0 deletions mqttGuard.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#ifndef BASECAMP_MQTT_GUARD_HPP
#define BASECAMP_MQTT_GUARD_HPP

#include "log.hpp"

#include <functional>
#include <vector>

/**
Helper to guard outgoing mqtt packets.
On packet sending, register the packets within this class and
pull the empty() function to see if everything has been sent completely.
*/
class MqttGuard
{
public:
/// Typesafety forward of AsyncMqttClients packet_id type
using IdType = uint16_t;

/**
Construct a new guard.
@param LogCallback Optional log callback.
*/
explicit MqttGuard(basecampLog::LogCallback logCallback = {});
~MqttGuard() = default;

/**
Register a new packet that is going to sent by mqtt.
@param packetId Packet-ID returned by AsyncMqttClient::publish()
*/
void registerPacket(IdType packetId);

/**
Unregister a packet after it has been sent successfully.
@param packetId Packet-ID returned by AsyncMqttClient::onPublish()
*/
void unregisterPacket(IdType packetId);

/// Returns the amount of packets waiting to be sent.
size_t remainingPacketCount() const;

/// Returns true if all packets have been sent.
bool allSent() const;

/**
Reset to empty state (force).
Maybe necessary if getting disconnected.
*/
void reset();
private:
/**
Check a packetId for validity.
@param packetId Packet-ID to be checked.
@return True if the packetId can be safely added.
*/
bool isValidPacketId(IdType packetId) const;

/// Try to log message if logCallback_ has been set in ctor.
void tryLog(basecampLog::Severity severity, const std::string& message);

/// Try to log message with packetId if logCallback_ has been set in ctor.
void tryLog(basecampLog::Severity severity, const std::string& message, IdType packetId);

/// Optional callback for log-messages
basecampLog::LogCallback logCallback_;
/// List of all remaining packets
std::vector<IdType> packets_;
};

#endif // #define BASECAMP_MQTT_GUARD_HPP
52 changes: 52 additions & 0 deletions mqttGuardInterface.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#include "MqttGuardInterface.hpp"

MqttGuardInterface::MqttGuardInterface(AsyncMqttClient& mqttClient)
: mqttClient_(mqttClient)
, mqttGuard_(std::make_shared<MqttGuard>())
{
// Default callbacks in case the user does not set one so internally everything is kept fine.
mqttOnDisconnect([](AsyncMqttClientDisconnectReason /*reason*/){});
mqttOnPublish([](uint16_t /*packetId*/) {});
}

uint16_t MqttGuardInterface::mqttPublish(const char* topic, uint8_t qos, bool retain,
const char* payload, size_t length, bool dup, uint16_t message_id)
{
mqttGuard_->registerPacket(mqttClient_.publish(topic, qos, retain, payload, length, dup, message_id));
}

AsyncMqttClient& MqttGuardInterface::mqttOnPublish(AsyncMqttClientInternals::OnPublishUserCallback callback)
{
// Intercept the onPublish and make sure that mqttGuard is held alive until the callback is fired.
auto mqttGuard = mqttGuard_;
return mqttClient_.onPublish([mqttGuard, callback](uint16_t packetId){
mqttGuard->unregisterPacket(packetId);
if (callback) {
callback(packetId);
}
});
}

AsyncMqttClient& MqttGuardInterface::mqttOnDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback)
{
// Intercept the onDisconnect and make sure that mqttGuard is held alive until the callback is fired.
auto mqttGuard = mqttGuard_;
return mqttClient_.onDisconnect([mqttGuard, callback]
(AsyncMqttClientDisconnectReason reason){
// Make sure that disconnet respects that there could be no more packets be sent.
mqttGuard->reset();
if (callback) {
callback(reason);
}
});
}

bool MqttGuardInterface::mqttAllSent() const
{
return mqttGuard_->allSent();
}

size_t MqttGuardInterface::mqttRemainingPackets() const
{
return mqttGuard_->remainingPacketCount();
}
43 changes: 43 additions & 0 deletions mqttGuardInterface.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#ifndef BASECAMP_MQTT_GUARD_INTERFACE_HPP
#define BASECAMP_MQTT_GUARD_INTERFACE_HPP

#include "mqttGuard.hpp"

#include <memory>

#include <AsyncMqttClient.h>

/**
Class to interface between AsyncMqttClient and MqttGuard.
Intercepts outgoing messages and incoming publish acknowledgements to keep track of packets to be sent.
Use mqttOnPublish instead of mqtt.OnPublish()
Use consequently mqttOnDisconnect instead of mqtt.OnDisconnect()
Use mqttPublish instead of mqtt.publish()
Use then mqttAllSent() to check if there are remaining packets to be sent.
Its up to the owner of this class to keep mqttClient valid in the lifetime of an instance of this class.
*/
class MqttGuardInterface
{
public:
explicit MqttGuardInterface(AsyncMqttClient& mqttClient);

AsyncMqttClient& mqttOnPublish(AsyncMqttClientInternals::OnPublishUserCallback callback);
AsyncMqttClient& mqttOnDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback);

uint16_t mqttPublish(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr, size_t length = 0, bool dup = false, uint16_t message_id = 0);

/// Returns true if all mqtt-packets have been sent.
bool mqttAllSent() const;

/// Returns the remaining to-be-transmitted packet count.
size_t mqttRemainingPackets() const;

private:
AsyncMqttClient& mqttClient_;
std::shared_ptr<MqttGuard> mqttGuard_;
};

#endif // BASECAMP_MQTT_GUARD_INTERFACE_HPP

0 comments on commit 036a791

Please sign in to comment.