From e8406d78814889030a79b5c302fecfb58a812e69 Mon Sep 17 00:00:00 2001 From: Lukasz Michalski Date: Fri, 10 May 2024 21:42:47 +0200 Subject: [PATCH 1/5] refactor: Add mSlaveId to RegisterCommand base class: moved from RegisterPoll, added to RegisterWrite --- libmodmqttsrv/modbus_executor.cpp | 4 ++-- libmodmqttsrv/modbus_executor.hpp | 2 +- libmodmqttsrv/modbus_thread.cpp | 8 +++---- libmodmqttsrv/register_poll.cpp | 8 +++---- libmodmqttsrv/register_poll.hpp | 16 ++++++++----- unittests/mockedmodbuscontext.cpp | 6 ++--- .../modbus_executor_single_delay_tests.cpp | 4 ++-- unittests/modbus_executor_tests.cpp | 24 +++++++++---------- unittests/modbus_utils.hpp | 10 ++++---- unittests/scheduler_tests.cpp | 4 ++-- 10 files changed, 46 insertions(+), 40 deletions(-) diff --git a/libmodmqttsrv/modbus_executor.cpp b/libmodmqttsrv/modbus_executor.cpp index 2b69653..65e0b42 100644 --- a/libmodmqttsrv/modbus_executor.cpp +++ b/libmodmqttsrv/modbus_executor.cpp @@ -116,8 +116,8 @@ ModbusExecutor::addPollList(const std::map& pCommand) { - ModbusRequestsQueues& queue = mSlaveQueues[slaveId]; +ModbusExecutor::addWriteCommand(const std::shared_ptr& pCommand) { + ModbusRequestsQueues& queue = mSlaveQueues[pCommand->mSlaveId]; queue.addWriteCommand(pCommand); if (mCurrentSlaveQueue == mSlaveQueues.end()) { mCurrentSlaveQueue = mSlaveQueues.begin(); diff --git a/libmodmqttsrv/modbus_executor.hpp b/libmodmqttsrv/modbus_executor.hpp index 5ab8c40..48fbdef 100644 --- a/libmodmqttsrv/modbus_executor.hpp +++ b/libmodmqttsrv/modbus_executor.hpp @@ -24,7 +24,7 @@ class ModbusExecutor { bool pollDone() const; void addPollList(const std::map>>& pRegisters, bool mInitialPoll = false); - void addWriteCommand(int slaveId, const std::shared_ptr& pCommand); + void addWriteCommand(const std::shared_ptr& pCommand); /** * Get next request R to send from modbus queues * If R needs delay then return how much time we should wait before diff --git a/libmodmqttsrv/modbus_thread.cpp b/libmodmqttsrv/modbus_thread.cpp index f15df2b..75b2ca9 100644 --- a/libmodmqttsrv/modbus_thread.cpp +++ b/libmodmqttsrv/modbus_thread.cpp @@ -66,8 +66,8 @@ ModbusThread::setPollSpecification(const MsgRegisterPollSpecification& spec) { // do not poll a poll group declared in modbus config section // that was not merged with any mqtt register declaration if (it->mRefreshMsec != MsgRegisterPoll::INVALID_REFRESH) { - std::shared_ptr reg(new RegisterPoll(it->mRegister, it->mRegisterType, it->mCount, it->mRefreshMsec)); - std::map::const_iterator slave_cfg = mSlaves.find(it->mSlaveId); + std::shared_ptr reg(new RegisterPoll(it->mSlaveId, it->mRegister, it->mRegisterType, it->mCount, it->mRefreshMsec)); + std::map::const_iterator slave_cfg = mSlaves.find(reg->mSlaveId); setCommandDelays(*reg, mDelayBeforeCommand, mDelayBeforeFirstCommand); reg->setMaxRetryCounts(mMaxReadRetryCount, mMaxWriteRetryCount, true); @@ -77,7 +77,7 @@ ModbusThread::setPollSpecification(const MsgRegisterPollSpecification& spec) { reg->setMaxRetryCounts(slave_cfg->second.mMaxReadRetryCount, slave_cfg->second.mMaxWriteRetryCount); } - registerMap[it->mSlaveId].push_back(reg); + registerMap[reg->mSlaveId].push_back(reg); } } @@ -120,7 +120,7 @@ ModbusThread::processWrite(const std::shared_ptr& msg) { cmd->setMaxRetryCounts(it->second.mMaxReadRetryCount, it->second.mMaxWriteRetryCount); } - mExecutor.addWriteCommand(msg->mSlaveId, cmd); + mExecutor.addWriteCommand(cmd); } void diff --git a/libmodmqttsrv/register_poll.cpp b/libmodmqttsrv/register_poll.cpp index da19bfb..d7f9cf5 100644 --- a/libmodmqttsrv/register_poll.cpp +++ b/libmodmqttsrv/register_poll.cpp @@ -12,12 +12,12 @@ RegisterCommand::setMaxRetryCounts(short pMaxRead, short pMaxWrite, bool pForce) mMaxWriteRetryCount = pMaxWrite; } -RegisterPoll::RegisterPoll(int regNum, RegisterType regType, int regCount, std::chrono::milliseconds refreshMsec) - : RegisterCommand(regNum, regType, regCount), +RegisterPoll::RegisterPoll(int pSlaveId, int pRegNum, RegisterType pRegType, int pRegCount, std::chrono::milliseconds pRefreshMsec) + : RegisterCommand(pSlaveId, pRegNum, pRegType, pRegCount), mLastRead(std::chrono::steady_clock::now() - std::chrono::hours(24)), - mLastValues(regCount) + mLastValues(pRegCount) { - mRefresh = refreshMsec; + mRefresh = pRefreshMsec; mReadErrors = 0; mFirstErrorTime = std::chrono::steady_clock::now(); mDelay = std::chrono::milliseconds::zero(); diff --git a/libmodmqttsrv/register_poll.hpp b/libmodmqttsrv/register_poll.hpp index 6845994..b20d32a 100644 --- a/libmodmqttsrv/register_poll.hpp +++ b/libmodmqttsrv/register_poll.hpp @@ -12,8 +12,9 @@ namespace modmqttd { class RegisterCommand : public ModbusAddressRange { public: - RegisterCommand(int pRegister, RegisterType pRegisterType, int pCount) - : ModbusAddressRange(pRegister, pRegisterType, pCount) + RegisterCommand(int pSlaveId, int pRegister, RegisterType pRegisterType, int pCount) + : ModbusAddressRange(pRegister, pRegisterType, pCount), + mSlaveId(pSlaveId) {} virtual int getRegister() const = 0; @@ -30,6 +31,8 @@ class RegisterCommand : public ModbusAddressRange { void setMaxRetryCounts(short pMaxRead, short pMaxWrite, bool pForce = false); + int mSlaveId; + short mMaxReadRetryCount; short mMaxWriteRetryCount; protected: @@ -44,7 +47,7 @@ class RegisterPoll : public RegisterCommand { // if we cannot read register in this time MsgRegisterReadFailed is sent static constexpr int DefaultReadErrorCount = 3; - RegisterPoll(int regNum, RegisterType regType, int regCount, std::chrono::milliseconds refreshMsec); + RegisterPoll(int pSlaveId, int regNum, RegisterType regType, int regCount, std::chrono::milliseconds refreshMsec); virtual int getRegister() const { return mRegister; }; virtual int getCount() const { return mLastValues.size(); } @@ -68,13 +71,14 @@ class RegisterPoll : public RegisterCommand { class RegisterWrite : public RegisterCommand { public: RegisterWrite(const MsgRegisterValues& msg) - : RegisterWrite(msg.mRegister, + : RegisterWrite(msg.mSlaveId, + msg.mRegister, msg.mRegisterType, msg.mRegisters ) {} - RegisterWrite(int pRegister, RegisterType pType, const ModbusRegisters& pValues) - : RegisterCommand(pRegister, pType, pValues.getCount()), + RegisterWrite(int pSlaveId, int pRegister, RegisterType pType, const ModbusRegisters& pValues) + : RegisterCommand(pSlaveId, pRegister, pType, pValues.getCount()), mValues(pValues) {} diff --git a/unittests/mockedmodbuscontext.cpp b/unittests/mockedmodbuscontext.cpp index ecba217..3a9e575 100644 --- a/unittests/mockedmodbuscontext.cpp +++ b/unittests/mockedmodbuscontext.cpp @@ -286,7 +286,7 @@ MockedModbusContext::getWriteCount(int slaveId) const { uint16_t MockedModbusContext::getModbusRegisterValue(int slaveId, int regNum, modmqttd::RegisterType regtype) { mInternalOperation = true; - modmqttd::RegisterPoll poll(--regNum, regtype, 1, std::chrono::milliseconds(0)); + modmqttd::RegisterPoll poll(slaveId, --regNum, regtype, 1, std::chrono::milliseconds(0)); auto vals = readModbusRegisters(slaveId, poll); return vals[0]; @@ -353,9 +353,9 @@ void MockedModbusFactory::setModbusRegisterValue(const char* network, int slaveId, int regNum, modmqttd::RegisterType regtype, uint16_t val) { regNum--; std::shared_ptr ctx = getOrCreateContext(network); - modmqttd::RegisterWrite msg(regNum, regtype, ModbusRegisters(val)); + modmqttd::RegisterWrite msg(slaveId, regNum, regtype, ModbusRegisters(val)); ctx->mInternalOperation = true; - ctx->writeModbusRegisters(slaveId, msg); + ctx->writeModbusRegisters(msg.mSlaveId, msg); } diff --git a/unittests/modbus_executor_single_delay_tests.cpp b/unittests/modbus_executor_single_delay_tests.cpp index 337acf9..263b206 100644 --- a/unittests/modbus_executor_single_delay_tests.cpp +++ b/unittests/modbus_executor_single_delay_tests.cpp @@ -125,8 +125,8 @@ TEST_CASE("ModbusExecutor for first delay config") { executor.executeNext(); // add write about to be executed - auto write = registers.createWriteDelayed(1,0x15, std::chrono::milliseconds(30)); - executor.addWriteCommand(1, write); + auto write = registers.createWriteDelayed(1, 1,0x15, std::chrono::milliseconds(30)); + executor.addWriteCommand(write); waitTime = executor.executeNext(); REQUIRE(executor.getWaitingCommand() == write); diff --git a/unittests/modbus_executor_tests.cpp b/unittests/modbus_executor_tests.cpp index 8a6d8f7..d5d198e 100644 --- a/unittests/modbus_executor_tests.cpp +++ b/unittests/modbus_executor_tests.cpp @@ -202,12 +202,12 @@ TEST_CASE("ModbusExecutor") { //mWaitingCommand is set to poll 1,1 executor.setupInitialPoll(registers); - executor.addWriteCommand(1, ModbusExecutorTestRegisters::createWrite(1, 100)); - executor.addWriteCommand(1, ModbusExecutorTestRegisters::createWrite(10, 101)); - executor.addWriteCommand(1, ModbusExecutorTestRegisters::createWrite(20, 102)); - executor.addWriteCommand(1, ModbusExecutorTestRegisters::createWrite(1, 200)); - executor.addWriteCommand(1, ModbusExecutorTestRegisters::createWrite(10, 201)); - executor.addWriteCommand(1, ModbusExecutorTestRegisters::createWrite(20, 202)); + executor.addWriteCommand(ModbusExecutorTestRegisters::createWrite(1, 1, 100)); + executor.addWriteCommand(ModbusExecutorTestRegisters::createWrite(1, 10, 101)); + executor.addWriteCommand(ModbusExecutorTestRegisters::createWrite(1, 20, 102)); + executor.addWriteCommand(ModbusExecutorTestRegisters::createWrite(1, 1, 200)); + executor.addWriteCommand(ModbusExecutorTestRegisters::createWrite(1, 10, 201)); + executor.addWriteCommand(ModbusExecutorTestRegisters::createWrite(1, 20, 202)); REQUIRE(executor.getCommandsLeft() == 6); @@ -262,9 +262,9 @@ TEST_CASE("ModbusExecutor") { modbus_factory.setModbusRegisterValue("test",2,2,modmqttd::RegisterType::HOLDING, 20); for (int i = 1; i <= modmqttd::ModbusExecutor::WRITE_BATCH_SIZE+1; i++) { - executor.addWriteCommand(1, ModbusExecutorTestRegisters::createWrite(1, i)); + executor.addWriteCommand(ModbusExecutorTestRegisters::createWrite(1, 1, i)); } - executor.addWriteCommand(2, ModbusExecutorTestRegisters::createWrite(2, 200)); + executor.addWriteCommand(ModbusExecutorTestRegisters::createWrite(2, 2, 200)); for (int i = 0; i < modmqttd::ModbusExecutor::WRITE_BATCH_SIZE; i++) { executor.executeNext(); //write to the first slave @@ -336,9 +336,9 @@ TEST_CASE("ModbusExecutor") { SECTION("should retry last write command mMaxWriteRetryCount times if failed") { modbus_factory.setModbusRegisterWriteError("test", 1, 1, modmqttd::RegisterType::HOLDING); - auto cmd(registers.createWrite(1, 0x3)); + auto cmd(registers.createWrite(1, 1, 0x3)); cmd->setMaxRetryCounts(0,1); - executor.addWriteCommand(1, cmd); + executor.addWriteCommand(cmd); executor.executeNext(); REQUIRE(!executor.allDone()); @@ -353,9 +353,9 @@ TEST_CASE("ModbusExecutor") { SECTION("should delay retry of last write command") { modbus_factory.setModbusRegisterWriteError("test", 1, 1, modmqttd::RegisterType::HOLDING); - auto cmd(registers.createWriteDelayed(1, 0x3, std::chrono::milliseconds(10))); + auto cmd(registers.createWriteDelayed(1, 1, 0x3, std::chrono::milliseconds(10))); cmd->setMaxRetryCounts(0,1); - executor.addWriteCommand(1, cmd); + executor.addWriteCommand(cmd); executor.executeNext(); REQUIRE(!executor.allDone()); diff --git a/unittests/modbus_utils.hpp b/unittests/modbus_utils.hpp index 227d662..a6ce5bc 100644 --- a/unittests/modbus_utils.hpp +++ b/unittests/modbus_utils.hpp @@ -12,7 +12,7 @@ class ModbusExecutorTestRegisters : public std::map reg(new modmqttd::RegisterPoll(number-1, modmqttd::RegisterType::HOLDING, 1, refresh)); + std::shared_ptr reg(new modmqttd::RegisterPoll(slave, number-1, modmqttd::RegisterType::HOLDING, 1, refresh)); (*this)[slave].push_back(reg); return reg; } @@ -25,7 +25,7 @@ class ModbusExecutorTestRegisters : public std::map reg(new modmqttd::RegisterPoll(number-1, modmqttd::RegisterType::HOLDING, 1, refresh)); + std::shared_ptr reg(new modmqttd::RegisterPoll(slave, number-1, modmqttd::RegisterType::HOLDING, 1, refresh)); modmqttd::ModbusCommandDelay md(delay); md.delay_type = delayType; reg->setDelay(md); @@ -35,20 +35,22 @@ class ModbusExecutorTestRegisters : public std::map createWrite( + int slave, int number, uint16_t value ) { - std::shared_ptr reg(new modmqttd::RegisterWrite(number-1, modmqttd::RegisterType::HOLDING, value)); + std::shared_ptr reg(new modmqttd::RegisterWrite(slave, number-1, modmqttd::RegisterType::HOLDING, value)); return reg; } static std::shared_ptr createWriteDelayed ( + int slave, int number, uint16_t value, std::chrono::steady_clock::duration delay = std::chrono::milliseconds::zero(), modmqttd::ModbusCommandDelay::DelayType delayType = modmqttd::ModbusCommandDelay::DelayType::EVERYTIME ) { - std::shared_ptr reg(new modmqttd::RegisterWrite(number-1, modmqttd::RegisterType::HOLDING, value)); + std::shared_ptr reg(new modmqttd::RegisterWrite(slave, number-1, modmqttd::RegisterType::HOLDING, value)); modmqttd::ModbusCommandDelay md(delay); md.delay_type = delayType; reg->setDelay(md); diff --git a/unittests/scheduler_tests.cpp b/unittests/scheduler_tests.cpp index 62ffa69..ff4627a 100644 --- a/unittests/scheduler_tests.cpp +++ b/unittests/scheduler_tests.cpp @@ -20,8 +20,8 @@ TEST_CASE("Modbus scheduler") { std::chrono::time_point now = std::chrono::steady_clock::now(); RegisterSpec source; - std::shared_ptr reg(new modmqttd::RegisterPoll(1, modmqttd::RegisterType::BIT, 1, std::chrono::milliseconds(1000))); - source[0].push_back(reg); + std::shared_ptr reg(new modmqttd::RegisterPoll(1, 1, modmqttd::RegisterType::BIT, 1, std::chrono::milliseconds(1000))); + source[reg->mSlaveId].push_back(reg); std::chrono::nanoseconds duration = std::chrono::seconds(1000); From 67eb6588e045a1af0734a5ba162c5d957f677291 Mon Sep 17 00:00:00 2001 From: Lukasz Michalski Date: Fri, 10 May 2024 21:59:35 +0200 Subject: [PATCH 2/5] refactor: remove requirement for the ModbusExecutor where mCurrentSlaveQueue should be in sync with mWaitingCommand --- libmodmqttsrv/modbus_executor.cpp | 34 +++++++++++++++---------------- libmodmqttsrv/modbus_executor.hpp | 6 +++--- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/libmodmqttsrv/modbus_executor.cpp b/libmodmqttsrv/modbus_executor.cpp index 65e0b42..33af628 100644 --- a/libmodmqttsrv/modbus_executor.cpp +++ b/libmodmqttsrv/modbus_executor.cpp @@ -127,32 +127,32 @@ ModbusExecutor::addWriteCommand(const std::shared_ptr& pCommand) void -ModbusExecutor::pollRegisters(int slaveId, RegisterPoll& reg, bool forceSend) { +ModbusExecutor::pollRegisters(RegisterPoll& reg, bool forceSend) { try { std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now(); - std::vector newValues(mModbus->readModbusRegisters(slaveId, reg)); + std::vector newValues(mModbus->readModbusRegisters(reg.mSlaveId, reg)); reg.mLastReadOk = true; std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now(); - BOOST_LOG_SEV(log, Log::trace) << "Register " << slaveId << "." << reg.mRegister << " (0x" << std::hex << slaveId << ".0x" << std::hex << reg.mRegister << ")" + BOOST_LOG_SEV(log, Log::trace) << "Register " << reg.mSlaveId << "." << reg.mRegister << " (0x" << std::hex << reg.mSlaveId << ".0x" << std::hex << reg.mRegister << ")" << " polled in " << std::dec << std::chrono::duration_cast(end - start).count() << "ms"; if ((reg.getValues() != newValues) || forceSend || (reg.mReadErrors != 0)) { - MsgRegisterValues val(slaveId, reg.mRegisterType, reg.mRegister, newValues); + MsgRegisterValues val(reg.mSlaveId, reg.mRegisterType, reg.mRegister, newValues); sendMessage(QueueItem::create(val)); reg.update(newValues); if (reg.mReadErrors != 0) { BOOST_LOG_SEV(log, Log::debug) << "Register " - << slaveId << "." << reg.mRegister + << reg.mSlaveId << "." << reg.mRegister << " read ok after " << reg.mReadErrors << " error(s)"; } reg.mReadErrors = 0; - BOOST_LOG_SEV(log, Log::trace) << "Register " << slaveId << "." << reg.mRegister + BOOST_LOG_SEV(log, Log::trace) << "Register " << reg.mSlaveId << "." << reg.mRegister << " values sent, data=" << DebugTools::registersToStr(reg.getValues()); }; } catch (const ModbusReadException& ex) { - handleRegisterReadError(slaveId, reg, ex.what()); + handleRegisterReadError(reg, ex.what()); } // set mLastRead regardless if modbus command was successful or not // ModbusScheduler should not reschedule again after failed read @@ -162,14 +162,14 @@ ModbusExecutor::pollRegisters(int slaveId, RegisterPoll& reg, bool forceSend) { }; void -ModbusExecutor::handleRegisterReadError(int slaveId, RegisterPoll& regPoll, const char* errorMessage) { +ModbusExecutor::handleRegisterReadError(RegisterPoll& regPoll, const char* errorMessage) { // avoid flooding logs with register read error messages - log last error every 5 minutes regPoll.mReadErrors++; regPoll.mLastReadOk = false; if (regPoll.mReadErrors == 1 || (std::chrono::steady_clock::now() - regPoll.mFirstErrorTime > RegisterPoll::DurationBetweenLogError)) { BOOST_LOG_SEV(log, Log::error) << regPoll.mReadErrors << " error(s) when reading register " - << slaveId << "." << regPoll.mRegister << ", last error: " << errorMessage; + << regPoll.mSlaveId << "." << regPoll.mRegister << ", last error: " << errorMessage; regPoll.mFirstErrorTime = std::chrono::steady_clock::now(); if (regPoll.mReadErrors != 1) regPoll.mReadErrors = 0; @@ -177,20 +177,20 @@ ModbusExecutor::handleRegisterReadError(int slaveId, RegisterPoll& regPoll, cons // start sending MsgRegisterReadFailed if we cannot read register DefaultReadErrorCount times if (regPoll.mReadErrors > RegisterPoll::DefaultReadErrorCount) { - MsgRegisterReadFailed msg(slaveId, regPoll.mRegisterType, regPoll.mRegister, regPoll.getCount()); + MsgRegisterReadFailed msg(regPoll.mSlaveId, regPoll.mRegisterType, regPoll.mRegister, regPoll.getCount()); sendMessage(QueueItem::create(msg)); } } void -ModbusExecutor::writeRegisters(int slaveId, RegisterWrite& cmd) { +ModbusExecutor::writeRegisters(RegisterWrite& cmd) { try { std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now(); - mModbus->writeModbusRegisters(slaveId, cmd); + mModbus->writeModbusRegisters(cmd.mSlaveId, cmd); cmd.mLastWriteOk = true; std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now(); - BOOST_LOG_SEV(log, Log::debug) << "Register " << slaveId << "." << cmd.mRegister << " (0x" << std::hex << slaveId << ".0x" << std::hex << cmd.mRegister << ")" + BOOST_LOG_SEV(log, Log::debug) << "Register " << cmd.mSlaveId << "." << cmd.mRegister << " (0x" << std::hex << cmd.mSlaveId << ".0x" << std::hex << cmd.mRegister << ")" << " written in " << std::dec << std::chrono::duration_cast(end - start).count() << "ms"; if (cmd.mReturnMessage != nullptr) { @@ -199,9 +199,9 @@ ModbusExecutor::writeRegisters(int slaveId, RegisterWrite& cmd) { } } catch (const ModbusWriteException& ex) { BOOST_LOG_SEV(log, Log::error) << "error writing register " - << slaveId << "." << cmd.mRegister << ": " << ex.what(); + << cmd.mSlaveId << "." << cmd.mRegister << ": " << ex.what(); cmd.mLastWriteOk = false; - MsgRegisterWriteFailed msg(slaveId, cmd.mRegisterType, cmd.mRegister, cmd.getCount()); + MsgRegisterWriteFailed msg(cmd.mSlaveId, cmd.mRegisterType, cmd.mRegister, cmd.getCount()); sendMessage(QueueItem::create(msg)); } mLastCommandTime = std::chrono::steady_clock::now(); @@ -305,7 +305,7 @@ ModbusExecutor::sendCommand() { if (typeid(*mWaitingCommand) == typeid(RegisterPoll)) { RegisterPoll& pollcmd(static_cast(*mWaitingCommand)); - pollRegisters(mCurrentSlaveQueue->first, pollcmd, mInitialPoll); + pollRegisters(pollcmd, mInitialPoll); if (!pollcmd.mLastReadOk) { if (mReadRetryCount != 0) { retry = true; @@ -316,7 +316,7 @@ ModbusExecutor::sendCommand() { } } else { RegisterWrite& writecmd(static_cast(*mWaitingCommand)); - writeRegisters(mCurrentSlaveQueue->first, writecmd); + writeRegisters(writecmd); if (!writecmd.mLastWriteOk) { if (mWriteRetryCount != 0) { retry = true; diff --git a/libmodmqttsrv/modbus_executor.hpp b/libmodmqttsrv/modbus_executor.hpp index 48fbdef..a46bf85 100644 --- a/libmodmqttsrv/modbus_executor.hpp +++ b/libmodmqttsrv/modbus_executor.hpp @@ -80,10 +80,10 @@ class ModbusExecutor { std::chrono::time_point mInitialPollStart; void sendCommand(); - void pollRegisters(int slaveId, RegisterPoll& reg_ptr, bool forceSend); - void writeRegisters(int slaveId, RegisterWrite& cmd); + void pollRegisters(RegisterPoll& reg_ptr, bool forceSend); + void writeRegisters(RegisterWrite& cmd); void sendMessage(const QueueItem& item); - void handleRegisterReadError(int slaveId, RegisterPoll& reg, const char* errorMessage); + void handleRegisterReadError(RegisterPoll& reg, const char* errorMessage); void resetCommandsCounter(); void setMaxReadRetryCount(short val) { mMaxReadRetryCount = mReadRetryCount = val; } From f6391bba759bb8e6ba4aa61a907ba2f88449a722 Mon Sep 17 00:00:00 2001 From: Lukasz Michalski Date: Fri, 10 May 2024 22:13:18 +0200 Subject: [PATCH 3/5] Added info about poll merges to documentation --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 91ba71f..d9eb744 100644 --- a/README.md +++ b/README.md @@ -271,7 +271,8 @@ The mqtt section contains broker definition and modbus register mappings. Mappin * **refresh** (timespan, optional, default 5s) A timespan used to poll modbus registers. This setting is propagated - down to object and register definitions + down to an object and register definitions. If this value is lower than the target network can handle + then newly scheduled read commands will be merged with those already queued. * **broker** (required) From 515334289cfc6ac66607e6547ae779c867f89a17 Mon Sep 17 00:00:00 2001 From: Lukasz Michalski Date: Sat, 11 May 2024 22:45:08 +0200 Subject: [PATCH 4/5] Added time that write command sits in internal queues to log debug --- libmodmqttsrv/modbus_executor.cpp | 3 ++- libmodmqttsrv/modbus_messages.hpp | 9 +++++++-- libmodmqttsrv/register_poll.hpp | 10 +++++----- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/libmodmqttsrv/modbus_executor.cpp b/libmodmqttsrv/modbus_executor.cpp index 33af628..22c3bf7 100644 --- a/libmodmqttsrv/modbus_executor.cpp +++ b/libmodmqttsrv/modbus_executor.cpp @@ -191,7 +191,8 @@ ModbusExecutor::writeRegisters(RegisterWrite& cmd) { std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now(); BOOST_LOG_SEV(log, Log::debug) << "Register " << cmd.mSlaveId << "." << cmd.mRegister << " (0x" << std::hex << cmd.mSlaveId << ".0x" << std::hex << cmd.mRegister << ")" - << " written in " << std::dec << std::chrono::duration_cast(end - start).count() << "ms"; + << " written in " << std::dec << std::chrono::duration_cast(end - start).count() << "ms" + << ", processing time " << std::dec << std::chrono::duration_cast(start - cmd.mCreationTime).count() << "ms"; if (cmd.mReturnMessage != nullptr) { cmd.mReturnMessage->mRegisters = ModbusRegisters(cmd.getValues()); diff --git a/libmodmqttsrv/modbus_messages.hpp b/libmodmqttsrv/modbus_messages.hpp index 7eb1592..2ad68de 100644 --- a/libmodmqttsrv/modbus_messages.hpp +++ b/libmodmqttsrv/modbus_messages.hpp @@ -33,11 +33,16 @@ class MsgRegisterValues : public MsgRegisterMessageBase { public: MsgRegisterValues(int slaveId, RegisterType regType, int registerNumber, const ModbusRegisters& registers) : MsgRegisterMessageBase(slaveId, registerNumber, regType, registers.getCount()), - mRegisters(registers) {} + mRegisters(registers), + mCreationTime(std::chrono::steady_clock::now()) + {} MsgRegisterValues(int slaveId, RegisterType regType, int registerNumber, const std::vector& registers) : MsgRegisterMessageBase(slaveId, registerNumber, regType, registers.size()), - mRegisters(registers) {} + mRegisters(registers), + mCreationTime(std::chrono::steady_clock::now()) + {} + std::chrono::steady_clock::time_point mCreationTime; ModbusRegisters mRegisters; }; diff --git a/libmodmqttsrv/register_poll.hpp b/libmodmqttsrv/register_poll.hpp index b20d32a..5ad0015 100644 --- a/libmodmqttsrv/register_poll.hpp +++ b/libmodmqttsrv/register_poll.hpp @@ -71,14 +71,13 @@ class RegisterPoll : public RegisterCommand { class RegisterWrite : public RegisterCommand { public: RegisterWrite(const MsgRegisterValues& msg) - : RegisterWrite(msg.mSlaveId, - msg.mRegister, - msg.mRegisterType, - msg.mRegisters - ) + : RegisterCommand(msg.mSlaveId, msg.mRegister, msg.mRegisterType, msg.mRegisters.getCount()), + mCreationTime(msg.mCreationTime), + mValues(msg.mRegisters) {} RegisterWrite(int pSlaveId, int pRegister, RegisterType pType, const ModbusRegisters& pValues) : RegisterCommand(pSlaveId, pRegister, pType, pValues.getCount()), + mCreationTime(std::chrono::steady_clock::now()), mValues(pValues) {} @@ -90,6 +89,7 @@ class RegisterWrite : public RegisterCommand { ModbusRegisters mValues; bool mLastWriteOk = false; + std::chrono::steady_clock::time_point mCreationTime; std::shared_ptr mReturnMessage; }; From 4ce7f1cd6f30305ccdb9f9acc88a12b5ebe04bee Mon Sep 17 00:00:00 2001 From: Lukasz Michalski Date: Sat, 11 May 2024 23:55:59 +0200 Subject: [PATCH 5/5] if write queues are empty then execute incoming write immediately, respecting configured slave delays only --- libmodmqttsrv/modbus_executor.cpp | 27 +++++++++++++++++++++---- libmodmqttsrv/modbus_executor.hpp | 2 ++ libmodmqttsrv/modbus_request_queues.cpp | 11 ++++++++++ libmodmqttsrv/modbus_request_queues.hpp | 2 ++ unittests/modbus_executor_tests.cpp | 13 ++++++------ 5 files changed, 45 insertions(+), 10 deletions(-) diff --git a/libmodmqttsrv/modbus_executor.cpp b/libmodmqttsrv/modbus_executor.cpp index 22c3bf7..7df5764 100644 --- a/libmodmqttsrv/modbus_executor.cpp +++ b/libmodmqttsrv/modbus_executor.cpp @@ -117,12 +117,29 @@ ModbusExecutor::addPollList(const std::map& pCommand) { - ModbusRequestsQueues& queue = mSlaveQueues[pCommand->mSlaveId]; - queue.addWriteCommand(pCommand); - if (mCurrentSlaveQueue == mSlaveQueues.end()) { - mCurrentSlaveQueue = mSlaveQueues.begin(); + if (mWriteCommandsQueued == 0) { + // skip queuing for if there is no queued write commands. + // This improves write latency in use case, where there is a lot of polling + // and sporadic write. I belive this is main use case for this gateway. + + // TODO this could leat to poll queue starvation when write commands + // arive in sync with slave execution time. Maybe there should be a + // configuration switch to turn it off? + if (mWaitingCommand != nullptr) + mSlaveQueues[pCommand->mSlaveId].readdCommand(mWaitingCommand); + + mWaitingCommand = pCommand; + mCurrentSlaveQueue = mSlaveQueues.find(pCommand->mSlaveId); resetCommandsCounter(); + } else { + ModbusRequestsQueues& queue = mSlaveQueues[pCommand->mSlaveId]; + queue.addWriteCommand(pCommand); + if (mCurrentSlaveQueue == mSlaveQueues.end()) { + mCurrentSlaveQueue = mSlaveQueues.find(pCommand->mSlaveId); + resetCommandsCounter(); + } } + mWriteCommandsQueued++; } @@ -325,6 +342,8 @@ ModbusExecutor::sendCommand() { } } else { mWriteRetryCount = mMaxWriteRetryCount; + mWriteCommandsQueued--; + assert(mWriteCommandsQueued >= 0); } } mLastQueue = mCurrentSlaveQueue; diff --git a/libmodmqttsrv/modbus_executor.hpp b/libmodmqttsrv/modbus_executor.hpp index a46bf85..1dc16bb 100644 --- a/libmodmqttsrv/modbus_executor.hpp +++ b/libmodmqttsrv/modbus_executor.hpp @@ -64,6 +64,8 @@ class ModbusExecutor { // is called faster than executor is able to handle them. int mCommandsLeft = 0; + int mWriteCommandsQueued = 0; + short mMaxReadRetryCount = 0; short mMaxWriteRetryCount = 0; short mWriteRetryCount; diff --git a/libmodmqttsrv/modbus_request_queues.cpp b/libmodmqttsrv/modbus_request_queues.cpp index abf1f2c..e5b81ee 100644 --- a/libmodmqttsrv/modbus_request_queues.cpp +++ b/libmodmqttsrv/modbus_request_queues.cpp @@ -95,4 +95,15 @@ ModbusRequestsQueues::addWriteCommand(const std::shared_ptr& pReq } +void +ModbusRequestsQueues::readdCommand(const std::shared_ptr& pCmd) { + if (typeid(*pCmd) == typeid(RegisterPoll)) { + mPollQueue.push_front(std::static_pointer_cast(pCmd)); + mPopFromPoll = true; + } else { + mWriteQueue.push_front(std::static_pointer_cast(pCmd)); + mPopFromPoll = false; + } +} + } diff --git a/libmodmqttsrv/modbus_request_queues.hpp b/libmodmqttsrv/modbus_request_queues.hpp index 12a3d93..6e39c49 100644 --- a/libmodmqttsrv/modbus_request_queues.hpp +++ b/libmodmqttsrv/modbus_request_queues.hpp @@ -17,6 +17,8 @@ class ModbusRequestsQueues { // to count and log write errors in 5min timeframes void addWriteCommand(const std::shared_ptr& pReq); + void readdCommand(const std::shared_ptr& pCmd); + // find the smallest positive difference between silence_period and delay need for register in queue. std::chrono::steady_clock::duration findForSilencePeriod(std::chrono::steady_clock::duration pPeriod, bool ignore_first_read); diff --git a/unittests/modbus_executor_tests.cpp b/unittests/modbus_executor_tests.cpp index d5d198e..76db219 100644 --- a/unittests/modbus_executor_tests.cpp +++ b/unittests/modbus_executor_tests.cpp @@ -211,20 +211,21 @@ TEST_CASE("ModbusExecutor") { REQUIRE(executor.getCommandsLeft() == 6); - executor.executeNext(); //poll 1,1 - REQUIRE(fromModbusQueue.size_approx() == 1); + //first write prioirty kicks in executor.executeNext(); //write 1,1 REQUIRE(modbus_factory.getModbusRegisterValue("test", 1, 1, modmqttd::RegisterType::HOLDING) == 100); + executor.executeNext(); //poll 1,1 + REQUIRE(fromModbusQueue.size_approx() == 1); - executor.executeNext(); //poll 1,10 - REQUIRE(fromModbusQueue.size_approx() == 2); executor.executeNext(); //write 1,10 REQUIRE(modbus_factory.getModbusRegisterValue("test", 1, 10, modmqttd::RegisterType::HOLDING) == 101); + executor.executeNext(); //poll 1,10 + REQUIRE(fromModbusQueue.size_approx() == 2); - executor.executeNext(); //poll 1,20 - REQUIRE(fromModbusQueue.size_approx() == 3); executor.executeNext(); //write 1,20 REQUIRE(modbus_factory.getModbusRegisterValue("test", 1, 20, modmqttd::RegisterType::HOLDING) == 102); + executor.executeNext(); //poll 1,20 + REQUIRE(fromModbusQueue.size_approx() == 3); // write only mode, start writing 20x values executor.executeNext(); //write 1,1