Skip to content

Commit

Permalink
Merge branch 'publish_mode'
Browse files Browse the repository at this point in the history
  • Loading branch information
BlackZork committed Aug 23, 2024
2 parents f2d6f76 + ae42d4d commit 1ef0307
Show file tree
Hide file tree
Showing 21 changed files with 286 additions and 72 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,13 @@ The mqtt section contains broker definition and modbus register mappings. Mappin
down to an object and register definitions. If this value is less than the target network can handle
then newly scheduled read commands will be merged with those already in modbus command queue.
* **publish_mode** (string, optional, default on_change)
A default mode for publishing mqtt values for all topics, that do not have their own `publish_mode` declared.
* **on_change**: publish new mqtt value only if it is different from the last published one.
* **every_poll**: publish new mqtt value after every modbus register read.
* **broker** (required)
This section contains configuration settings used to connect to MQTT broker.
Expand Down Expand Up @@ -379,6 +386,11 @@ A list of topics where modbus values are published to MQTT broker and subscribed
For examples see [Multi-device definitions](#multi-device-definitions) section.
* **publish_mode** (optional)
Overrides `mqtt.publish_mode` for this topic. See `mqtt.publish_mode` for available modes.
### A *commands* section.
A single command is defined using following settings.
Expand Down
9 changes: 9 additions & 0 deletions libmodmqttsrv/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,12 @@
#define __MUSL__
#endif
#endif

namespace modmqttd {

typedef enum {
ON_CHANGE=1,
EVERY_POLL=2
} PublishMode;

}
3 changes: 3 additions & 0 deletions libmodmqttsrv/modbus_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ ModbusExecutor::pollRegisters(RegisterPoll& reg, bool forceSend) {
BOOST_LOG_SEV(log, Log::trace) << "Register " << reg.mSlaveId << "." << reg.mRegister << " (0x" << std::hex << reg.mSlaveId << ".0x" << std::hex << reg.mRegister << ")"
<< " polled in " << std::dec << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << "ms";

if (reg.mPublishMode == PublishMode::EVERY_POLL)
forceSend = true;

if ((reg.getValues() != newValues) || forceSend || (reg.mReadErrors != 0)) {
MsgRegisterValues val(reg.mSlaveId, reg.mRegisterType, reg.mRegister, newValues);
sendMessage(QueueItem::create(val));
Expand Down
5 changes: 4 additions & 1 deletion libmodmqttsrv/modbus_messages.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
#include <chrono>
#include <vector>

#include "modbus_types.hpp"
#include "libmodmqttconv/modbusregisters.hpp"

#include "common.hpp"
#include "modbus_types.hpp"
#include "debugtools.hpp"

namespace modmqttd {
Expand Down Expand Up @@ -68,6 +70,7 @@ class MsgRegisterPoll : public ModbusSlaveAddressRange {
void merge(const MsgRegisterPoll& other);

std::chrono::milliseconds mRefreshMsec = INVALID_REFRESH;
PublishMode mPublishMode = PublishMode::ON_CHANGE;
};

class MsgRegisterPollSpecification {
Expand Down
3 changes: 2 additions & 1 deletion libmodmqttsrv/modbus_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ ModbusThread::setPollSpecification(const MsgRegisterPollSpecification& spec) {
// do not poll a poll group declared in modbus config section
// that was not merged with any mqtt register declaration
if (it->mRefreshMsec != MsgRegisterPoll::INVALID_REFRESH) {
std::shared_ptr<RegisterPoll> reg(new RegisterPoll(it->mSlaveId, it->mRegister, it->mRegisterType, it->mCount, it->mRefreshMsec));
std::shared_ptr<RegisterPoll> reg(new RegisterPoll(it->mSlaveId, it->mRegister, it->mRegisterType, it->mCount, it->mRefreshMsec, it->mPublishMode));
std::map<int, ModbusSlaveConfig>::const_iterator slave_cfg = mSlaves.find(reg->mSlaveId);

setCommandDelays(*reg, mDelayBeforeCommand, mDelayBeforeFirstCommand);
Expand All @@ -85,6 +85,7 @@ ModbusThread::setPollSpecification(const MsgRegisterPollSpecification& spec) {
<< ", register " << (*it)->mRegister << ":" << (*it)->mRegisterType
<< ", count=" << (*it)->getCount()
<< ", poll every " << std::chrono::duration_cast<std::chrono::milliseconds>((*it)->mRefresh).count() << "ms"
<< ", queue " << ((*it)->mPublishMode == PublishMode::ON_CHANGE ? "on change" : "always")
<< ", min f_delay " << std::chrono::duration_cast<std::chrono::milliseconds>((*it)->getDelayBeforeFirstCommand()).count() << "ms"
<< ", min delay " << std::chrono::duration_cast<std::chrono::milliseconds>((*it)->getDelayBeforeCommand()).count() << "ms";
}
Expand Down
47 changes: 39 additions & 8 deletions libmodmqttsrv/modmqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,21 @@ parseRegisterType(const YAML::Node& data) {
throw ConfigurationException(data.Mark(), std::string("Unknown register type ") + rtype);
}

PublishMode
parsePublishMode(const YAML::Node& data, PublishMode pDefault = PublishMode::ON_CHANGE) {
std::string pmode;
if (!ConfigTools::readOptionalValue<std::string>(pmode, data, "publish_mode"))
return pDefault;

if (pmode == "on_change") {
return PublishMode::ON_CHANGE;
} else if (pmode == "every_poll") {
return PublishMode::EVERY_POLL;
}

throw ConfigurationException(data.Mark(), std::string("Invalid publish mode '") + pmode + "', valid values are: on_change, every_poll");
}

MqttObjectCommand::PayloadType
parsePayloadType(const YAML::Node& data) {
//for future support for int and float mqtt command payload types
Expand Down Expand Up @@ -412,6 +427,7 @@ ModMqtt::parseObject(
int pDefaultSlaveId,
const std::string& pSlaveName,
std::chrono::milliseconds pDefaultRefresh,
PublishMode pDefaultPublishMode,
std::vector<MsgRegisterPollSpecification>& pSpecsOut)
{
std::string topic(ConfigTools::readRequiredString(pData, "topic"));
Expand Down Expand Up @@ -458,11 +474,13 @@ ModMqtt::parseObject(
MqttObject ret(topic);
BOOST_LOG_SEV(log, Log::debug) << "processing object " << ret.getTopic();

ret.setPublishMode(parsePublishMode(pData, pDefaultPublishMode));

const YAML::Node& yState = pData["state"];

if (yState.IsDefined()) {
if (yState.IsMap()) {
MqttObjectDataNode node(parseObjectDataNode(yState, pDefaultNetwork, pDefaultSlaveId, pDefaultRefresh, pSpecsOut));
MqttObjectDataNode node(parseObjectDataNode(yState, pDefaultNetwork, pDefaultSlaveId, pDefaultRefresh, ret.getPublishMode(), pSpecsOut));
// a map that contains register with optional count
// should output a list or a scalar value
// in this case we do not need parsed parent level
Expand All @@ -476,7 +494,7 @@ ModMqtt::parseObject(
bool isUnnamed = false;
for(size_t i = 0; i < yState.size(); i++) {
const YAML::Node& yData = yState[i];
MqttObjectDataNode node(parseObjectDataNode(yData, pDefaultNetwork, pDefaultSlaveId, pDefaultRefresh, pSpecsOut));
MqttObjectDataNode node(parseObjectDataNode(yData, pDefaultNetwork, pDefaultSlaveId, pDefaultRefresh, ret.getPublishMode(), pSpecsOut));
//the first element defines if we have named or unnamed list
if (i == 0)
isUnnamed = node.isUnnamed();
Expand All @@ -501,7 +519,7 @@ ModMqtt::parseObject(
ret.setAvailableValue(MqttValue::fromString(availValue));

if (yAvail.IsMap()) {
MqttObjectDataNode node(parseObjectDataNode(yAvail, pDefaultNetwork, pDefaultSlaveId, pDefaultRefresh, pSpecsOut));
MqttObjectDataNode node(parseObjectDataNode(yAvail, pDefaultNetwork, pDefaultSlaveId, pDefaultRefresh, ret.getPublishMode(), pSpecsOut));
if (!node.isScalar() && !node.hasConverter())
throw ConfigurationException(yAvail.Mark(), "multiple registers availability must use a converter");
ret.addAvailabilityDataNode(node);
Expand All @@ -518,6 +536,7 @@ ModMqtt::parseObjectDataNode(
const std::string& pDefaultNetwork,
int pDefaultSlaveId,
std::chrono::milliseconds pRefresh,
PublishMode pMode,
std::vector<MsgRegisterPollSpecification>& pSpecsOut
)
{
Expand All @@ -543,7 +562,7 @@ ModMqtt::parseObjectDataNode(
throw ConfigurationException(yRegisters.Mark(), "'registers' must be a list");
for(size_t i = 0; i < yRegisters.size(); i++) {
const YAML::Node& yData = yRegisters[i];
MqttObjectDataNode childNode(parseObjectDataNode(yData, pDefaultNetwork, pDefaultSlaveId, pRefresh, pSpecsOut));
MqttObjectDataNode childNode(parseObjectDataNode(yData, pDefaultNetwork, pDefaultSlaveId, pRefresh, pMode, pSpecsOut));
//the first element defines if we have named or unnamed list
if (i == 0)
isUnnamed = node.isUnnamed();
Expand All @@ -557,7 +576,7 @@ ModMqtt::parseObjectDataNode(
int count = 1;
ConfigTools::readOptionalValue<int>(count, pNode, "count");

MqttObjectRegisterIdent first_ident = updateSpecification(pNode, count, pRefresh, pDefaultNetwork, pDefaultSlaveId, pSpecsOut);
MqttObjectRegisterIdent first_ident = updateSpecification(pNode, count, pRefresh, pDefaultNetwork, pDefaultSlaveId, pMode, pSpecsOut);
if (count == 1) {
node.setScalarNode(first_ident);
} else {
Expand Down Expand Up @@ -689,6 +708,8 @@ ModMqtt::initObjects(const YAML::Node& config, const ModMqtt::ModbusInitData& mo
auto defaultRefresh = std::chrono::milliseconds(5000);
ConfigTools::readOptionalValue<std::chrono::milliseconds>(defaultRefresh, mqtt, "refresh");

PublishMode defaultPublishMode = parsePublishMode(mqtt);

const YAML::Node& config_objects = mqtt["objects"];
if (!config_objects.IsDefined())
throw ConfigurationException(mqtt.Mark(), "objects section is missing");
Expand Down Expand Up @@ -733,7 +754,15 @@ ModMqtt::initObjects(const YAML::Node& config, const ModMqtt::ModbusInitData& mo
if (created.find(defaultSlaveId) != created.end())
throw ConfigurationException(objdata["slave"].Mark(), std::string("Slave with id=") + std::to_string(defaultSlaveId) + " is duplicated in the list");

MqttObject object(parseObject(objdata, defaultNetwork, defaultSlaveId, modbusData.getSlaveName(defaultNetwork, defaultSlaveId), defaultRefresh, pSpecsOut));
MqttObject object(parseObject(
objdata,
defaultNetwork,
defaultSlaveId,
modbusData.getSlaveName(defaultNetwork, defaultSlaveId),
defaultRefresh,
defaultPublishMode,
pSpecsOut)
);
const std::string& baseTopic(object.getTopic());

std::vector<MqttObject>::const_iterator oit = std::find_if(
Expand All @@ -760,15 +789,17 @@ MqttObjectRegisterIdent
ModMqtt::updateSpecification(
const YAML::Node& data,
int pRegisterCount,
const std::chrono::milliseconds& currentRefresh,
const std::chrono::milliseconds& pCurrentRefresh,
const std::string& pDefaultNetwork,
int pDefaultSlaveId,
PublishMode pCurrentMode,
std::vector<MsgRegisterPollSpecification>& specs)
{
const RegisterConfigName rname(data, pDefaultNetwork, pDefaultSlaveId);

MsgRegisterPoll poll(rname.mSlaveId, rname.mRegisterNumber, parseRegisterType(data), pRegisterCount);
poll.mRefreshMsec = currentRefresh;
poll.mRefreshMsec = pCurrentRefresh;
poll.mPublishMode = pCurrentMode;

// find network poll specification or create one
std::vector<MsgRegisterPollSpecification>::iterator spec_it = std::find_if(
Expand Down
5 changes: 4 additions & 1 deletion libmodmqttsrv/modmqtt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ class ModMqtt {
MqttObjectRegisterIdent updateSpecification(
const YAML::Node& pData,
int pRegisterCount,
const std::chrono::milliseconds& pRefresh,
const std::chrono::milliseconds& pCurrentRefresh,
const std::string& pDefaultNetwork,
int pDefaultSlave,
PublishMode pCurrentMode,
std::vector<MsgRegisterPollSpecification>& specs
);

Expand All @@ -94,6 +95,7 @@ class ModMqtt {
int pDefaultSlave,
const std::string& pSlaveName,
std::chrono::milliseconds pDefaultRefresh,
PublishMode pDefaultPublishMode,
std::vector<MsgRegisterPollSpecification>& pSpecsOut
);

Expand All @@ -102,6 +104,7 @@ class ModMqtt {
const std::string& pDefaultNetwork,
int pDefaultSlave,
std::chrono::milliseconds refresh,
PublishMode pMode,
std::vector<MsgRegisterPollSpecification>& pSpecs
);

Expand Down
30 changes: 16 additions & 14 deletions libmodmqttsrv/mqttclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,27 +148,31 @@ MqttClient::processRegisterValues(const std::string& pModbusNetworkName, const M

for (std::shared_ptr<MqttObject>& obj: *affectedObjects) {
AvailableFlag oldAvail = obj->getAvailableFlag();
bool stateChanged = obj->updateRegisterValues(pModbusNetworkName, pSlaveData);
obj->updateRegisterValues(pModbusNetworkName, pSlaveData);
AvailableFlag newAvail = obj->getAvailableFlag();

if (oldAvail != newAvail) {
publishState(*obj);
if (newAvail == AvailableFlag::True)
publishState(*obj, true);

publishAvailabilityChange(*obj);
} else {
if (stateChanged)
publishState(*obj);
bool force = obj->getPublishMode() == PublishMode::EVERY_POLL;
publishState(*obj, force);
}
}
}

void
MqttClient::publishState(const MqttObject& obj) {
MqttClient::publishState(MqttObject& obj, bool force) {
if (obj.getAvailableFlag() != AvailableFlag::True)
return;
int msgId;
std::string messageData(MqttPayload::generate(obj));
BOOST_LOG_SEV(log, Log::debug) << "Publish on topic " << obj.getStateTopic() << ": " << messageData;
mMqttImpl->publish(obj.getStateTopic().c_str(), messageData.length(), messageData.c_str());
if (messageData != obj.getLastPublishedPayload() || force) {
BOOST_LOG_SEV(log, Log::debug) << "Publish on topic " << obj.getStateTopic() << ": " << messageData;
mMqttImpl->publish(obj.getStateTopic().c_str(), messageData.length(), messageData.c_str());
obj.setLastPublishedPayload(messageData);
}
}

void
Expand All @@ -183,15 +187,13 @@ MqttClient::processRegistersOperationFailed(const std::string& pModbusNetworkNam

for (std::shared_ptr<MqttObject>& obj: it->second) {
AvailableFlag oldAvail = obj->getAvailableFlag();
bool stateChanged = obj->updateRegistersReadFailed(pModbusNetworkName, pSlaveData);
obj->updateRegistersReadFailed(pModbusNetworkName, pSlaveData);
AvailableFlag newAvail = obj->getAvailableFlag();

publishState(*obj);

if (oldAvail != newAvail) {
publishState(*obj);
publishAvailabilityChange(*obj);
} else {
if (stateChanged)
publishState(*obj);
}
}
}
Expand Down Expand Up @@ -238,7 +240,7 @@ MqttClient::publishAll() {
const std::shared_ptr<MqttObject>& optr = *oit;
if (published.find(optr) == published.end()) {
if ((*oit)->getAvailableFlag() == AvailableFlag::True)
publishState(**oit);
publishState(**oit, true);
publishAvailabilityChange(**oit);
published.insert(*oit);
}
Expand Down
2 changes: 1 addition & 1 deletion libmodmqttsrv/mqttclient.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class MqttClient {

//publish all data after broker is reconnected
void publishAll();
void publishState(const MqttObject& obj);
void publishState(MqttObject& obj, bool force=false);
void publishAvailabilityChange(const MqttObject& obj);

void processRegisterValues(const std::string& modbusNetworkName, const MsgRegisterValues& values);
Expand Down
16 changes: 3 additions & 13 deletions libmodmqttsrv/mqttobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,33 +269,23 @@ MqttObject::hasRegisterIn(const std::string& pNetworkName, const ModbusSlaveAddr
}


bool
void
MqttObject::updateRegisterValues(const std::string& pNetworkName, const MsgRegisterValues& pSlaveData) {
bool stateChanged = mState.updateRegisterValues(pNetworkName, pSlaveData);
bool availChanged = mAvailability.updateRegisterValues(pNetworkName, pSlaveData);
if (stateChanged || availChanged) {
updateAvailablityFlag();
return true;
} else if (!mIsAvailable) {
// read was successfull, so availability flag
// may need to be updated if it depends only on state register data
// avaiablity.
if (stateChanged || availChanged || !mIsAvailable) {
updateAvailablityFlag();
return mIsAvailable;
}
return false;
}


bool
void
MqttObject::updateRegistersReadFailed(const std::string& pNetworkName, const ModbusSlaveAddressRange& pSlaveData) {
bool stateChanged = mState.updateRegistersReadFailed(pNetworkName, pSlaveData);
bool availChanged = mAvailability.updateRegistersReadFailed(pNetworkName, pSlaveData);
if (stateChanged || availChanged) {
updateAvailablityFlag();
return true;
}
return false;
}


Expand Down
Loading

0 comments on commit 1ef0307

Please sign in to comment.