Skip to content

Commit

Permalink
Restore write priority behaviour from 2.2.0. Works only if there are …
Browse files Browse the repository at this point in the history
…no queued writes in ModbusExecutor queues.
  • Loading branch information
BlackZork committed May 13, 2024
2 parents a2fe853 + 4ce7f1c commit 6deb0d7
Show file tree
Hide file tree
Showing 14 changed files with 124 additions and 76 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ The mqtt section contains broker definition and modbus register mappings. Mappin
* **refresh** (timespan, optional, default 5s)
A timespan used to poll modbus registers. This setting is propagated
down to object and register definitions
down to an object and register definitions. If this value is less than the target network can handle
then newly scheduled read commands will be merged with those already in modbus command queue.
* **broker** (required)
Expand Down
66 changes: 43 additions & 23 deletions libmodmqttsrv/modbus_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,43 +116,60 @@ ModbusExecutor::addPollList(const std::map<int, std::vector<std::shared_ptr<Regi


void
ModbusExecutor::addWriteCommand(int slaveId, const std::shared_ptr<RegisterWrite>& pCommand) {
ModbusRequestsQueues& queue = mSlaveQueues[slaveId];
queue.addWriteCommand(pCommand);
if (mCurrentSlaveQueue == mSlaveQueues.end()) {
mCurrentSlaveQueue = mSlaveQueues.begin();
ModbusExecutor::addWriteCommand(const std::shared_ptr<RegisterWrite>& pCommand) {
if (mWriteCommandsQueued == 0) {
// skip queuing for if there is no queued write commands.
// This improves write latency in use case, where there is a lot of polling
// and sporadic write. I belive this is main use case for this gateway.

// TODO this could leat to poll queue starvation when write commands
// arive in sync with slave execution time. Maybe there should be a
// configuration switch to turn it off?
if (mWaitingCommand != nullptr)
mSlaveQueues[pCommand->mSlaveId].readdCommand(mWaitingCommand);

mWaitingCommand = pCommand;
mCurrentSlaveQueue = mSlaveQueues.find(pCommand->mSlaveId);
resetCommandsCounter();
} else {
ModbusRequestsQueues& queue = mSlaveQueues[pCommand->mSlaveId];
queue.addWriteCommand(pCommand);
if (mCurrentSlaveQueue == mSlaveQueues.end()) {
mCurrentSlaveQueue = mSlaveQueues.find(pCommand->mSlaveId);
resetCommandsCounter();
}
}
mWriteCommandsQueued++;
}


void
ModbusExecutor::pollRegisters(int slaveId, RegisterPoll& reg, bool forceSend) {
ModbusExecutor::pollRegisters(RegisterPoll& reg, bool forceSend) {
try {
std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();

std::vector<uint16_t> newValues(mModbus->readModbusRegisters(slaveId, reg));
std::vector<uint16_t> newValues(mModbus->readModbusRegisters(reg.mSlaveId, reg));
reg.mLastReadOk = true;

std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
BOOST_LOG_SEV(log, Log::trace) << "Register " << slaveId << "." << reg.mRegister << " (0x" << std::hex << slaveId << ".0x" << std::hex << reg.mRegister << ")"
BOOST_LOG_SEV(log, Log::trace) << "Register " << reg.mSlaveId << "." << reg.mRegister << " (0x" << std::hex << reg.mSlaveId << ".0x" << std::hex << reg.mRegister << ")"
<< " polled in " << std::dec << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << "ms";

if ((reg.getValues() != newValues) || forceSend || (reg.mReadErrors != 0)) {
MsgRegisterValues val(slaveId, reg.mRegisterType, reg.mRegister, newValues);
MsgRegisterValues val(reg.mSlaveId, reg.mRegisterType, reg.mRegister, newValues);
sendMessage(QueueItem::create(val));
reg.update(newValues);
if (reg.mReadErrors != 0) {
BOOST_LOG_SEV(log, Log::debug) << "Register "
<< slaveId << "." << reg.mRegister
<< reg.mSlaveId << "." << reg.mRegister
<< " read ok after " << reg.mReadErrors << " error(s)";
}
reg.mReadErrors = 0;
BOOST_LOG_SEV(log, Log::trace) << "Register " << slaveId << "." << reg.mRegister
BOOST_LOG_SEV(log, Log::trace) << "Register " << reg.mSlaveId << "." << reg.mRegister
<< " values sent, data=" << DebugTools::registersToStr(reg.getValues());
};
} catch (const ModbusReadException& ex) {
handleRegisterReadError(slaveId, reg, ex.what());
handleRegisterReadError(reg, ex.what());
}
// set mLastRead regardless if modbus command was successful or not
// ModbusScheduler should not reschedule again after failed read
Expand All @@ -162,46 +179,47 @@ ModbusExecutor::pollRegisters(int slaveId, RegisterPoll& reg, bool forceSend) {
};

void
ModbusExecutor::handleRegisterReadError(int slaveId, RegisterPoll& regPoll, const char* errorMessage) {
ModbusExecutor::handleRegisterReadError(RegisterPoll& regPoll, const char* errorMessage) {
// avoid flooding logs with register read error messages - log last error every 5 minutes
regPoll.mReadErrors++;
regPoll.mLastReadOk = false;

if (regPoll.mReadErrors == 1 || (std::chrono::steady_clock::now() - regPoll.mFirstErrorTime > RegisterPoll::DurationBetweenLogError)) {
BOOST_LOG_SEV(log, Log::error) << regPoll.mReadErrors << " error(s) when reading register "
<< slaveId << "." << regPoll.mRegister << ", last error: " << errorMessage;
<< regPoll.mSlaveId << "." << regPoll.mRegister << ", last error: " << errorMessage;
regPoll.mFirstErrorTime = std::chrono::steady_clock::now();
if (regPoll.mReadErrors != 1)
regPoll.mReadErrors = 0;
}

// start sending MsgRegisterReadFailed if we cannot read register DefaultReadErrorCount times
if (regPoll.mReadErrors > RegisterPoll::DefaultReadErrorCount) {
MsgRegisterReadFailed msg(slaveId, regPoll.mRegisterType, regPoll.mRegister, regPoll.getCount());
MsgRegisterReadFailed msg(regPoll.mSlaveId, regPoll.mRegisterType, regPoll.mRegister, regPoll.getCount());
sendMessage(QueueItem::create(msg));
}
}

void
ModbusExecutor::writeRegisters(int slaveId, RegisterWrite& cmd) {
ModbusExecutor::writeRegisters(RegisterWrite& cmd) {
try {
std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
mModbus->writeModbusRegisters(slaveId, cmd);
mModbus->writeModbusRegisters(cmd.mSlaveId, cmd);
cmd.mLastWriteOk = true;

std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
BOOST_LOG_SEV(log, Log::debug) << "Register " << slaveId << "." << cmd.mRegister << " (0x" << std::hex << slaveId << ".0x" << std::hex << cmd.mRegister << ")"
<< " written in " << std::dec << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << "ms";
BOOST_LOG_SEV(log, Log::debug) << "Register " << cmd.mSlaveId << "." << cmd.mRegister << " (0x" << std::hex << cmd.mSlaveId << ".0x" << std::hex << cmd.mRegister << ")"
<< " written in " << std::dec << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << "ms"
<< ", processing time " << std::dec << std::chrono::duration_cast<std::chrono::milliseconds>(start - cmd.mCreationTime).count() << "ms";

if (cmd.mReturnMessage != nullptr) {
cmd.mReturnMessage->mRegisters = ModbusRegisters(cmd.getValues());
sendMessage(QueueItem::create(*cmd.mReturnMessage));
}
} catch (const ModbusWriteException& ex) {
BOOST_LOG_SEV(log, Log::error) << "error writing register "
<< slaveId << "." << cmd.mRegister << ": " << ex.what();
<< cmd.mSlaveId << "." << cmd.mRegister << ": " << ex.what();
cmd.mLastWriteOk = false;
MsgRegisterWriteFailed msg(slaveId, cmd.mRegisterType, cmd.mRegister, cmd.getCount());
MsgRegisterWriteFailed msg(cmd.mSlaveId, cmd.mRegisterType, cmd.mRegister, cmd.getCount());
sendMessage(QueueItem::create(msg));
}
mLastCommandTime = std::chrono::steady_clock::now();
Expand Down Expand Up @@ -305,7 +323,7 @@ ModbusExecutor::sendCommand() {

if (typeid(*mWaitingCommand) == typeid(RegisterPoll)) {
RegisterPoll& pollcmd(static_cast<RegisterPoll&>(*mWaitingCommand));
pollRegisters(mCurrentSlaveQueue->first, pollcmd, mInitialPoll);
pollRegisters(pollcmd, mInitialPoll);
if (!pollcmd.mLastReadOk) {
if (mReadRetryCount != 0) {
retry = true;
Expand All @@ -316,14 +334,16 @@ ModbusExecutor::sendCommand() {
}
} else {
RegisterWrite& writecmd(static_cast<RegisterWrite&>(*mWaitingCommand));
writeRegisters(mCurrentSlaveQueue->first, writecmd);
writeRegisters(writecmd);
if (!writecmd.mLastWriteOk) {
if (mWriteRetryCount != 0) {
retry = true;
mWriteRetryCount--;
}
} else {
mWriteRetryCount = mMaxWriteRetryCount;
mWriteCommandsQueued--;
assert(mWriteCommandsQueued >= 0);
}
}
mLastQueue = mCurrentSlaveQueue;
Expand Down
10 changes: 6 additions & 4 deletions libmodmqttsrv/modbus_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ModbusExecutor {
bool pollDone() const;

void addPollList(const std::map<int, std::vector<std::shared_ptr<RegisterPoll>>>& pRegisters, bool mInitialPoll = false);
void addWriteCommand(int slaveId, const std::shared_ptr<RegisterWrite>& pCommand);
void addWriteCommand(const std::shared_ptr<RegisterWrite>& pCommand);
/**
* Get next request R to send from modbus queues
* If R needs delay then return how much time we should wait before
Expand Down Expand Up @@ -64,6 +64,8 @@ class ModbusExecutor {
// is called faster than executor is able to handle them.
int mCommandsLeft = 0;

int mWriteCommandsQueued = 0;

short mMaxReadRetryCount = 0;
short mMaxWriteRetryCount = 0;
short mWriteRetryCount;
Expand All @@ -80,10 +82,10 @@ class ModbusExecutor {
std::chrono::time_point<std::chrono::steady_clock> mInitialPollStart;

void sendCommand();
void pollRegisters(int slaveId, RegisterPoll& reg_ptr, bool forceSend);
void writeRegisters(int slaveId, RegisterWrite& cmd);
void pollRegisters(RegisterPoll& reg_ptr, bool forceSend);
void writeRegisters(RegisterWrite& cmd);
void sendMessage(const QueueItem& item);
void handleRegisterReadError(int slaveId, RegisterPoll& reg, const char* errorMessage);
void handleRegisterReadError(RegisterPoll& reg, const char* errorMessage);
void resetCommandsCounter();

void setMaxReadRetryCount(short val) { mMaxReadRetryCount = mReadRetryCount = val; }
Expand Down
9 changes: 7 additions & 2 deletions libmodmqttsrv/modbus_messages.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,16 @@ class MsgRegisterValues : public MsgRegisterMessageBase {
public:
MsgRegisterValues(int slaveId, RegisterType regType, int registerNumber, const ModbusRegisters& registers)
: MsgRegisterMessageBase(slaveId, registerNumber, regType, registers.getCount()),
mRegisters(registers) {}
mRegisters(registers),
mCreationTime(std::chrono::steady_clock::now())
{}
MsgRegisterValues(int slaveId, RegisterType regType, int registerNumber, const std::vector<uint16_t>& registers)
: MsgRegisterMessageBase(slaveId, registerNumber, regType, registers.size()),
mRegisters(registers) {}
mRegisters(registers),
mCreationTime(std::chrono::steady_clock::now())
{}

std::chrono::steady_clock::time_point mCreationTime;
ModbusRegisters mRegisters;
};

Expand Down
11 changes: 11 additions & 0 deletions libmodmqttsrv/modbus_request_queues.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,15 @@ ModbusRequestsQueues::addWriteCommand(const std::shared_ptr<RegisterWrite>& pReq
}


void
ModbusRequestsQueues::readdCommand(const std::shared_ptr<RegisterCommand>& pCmd) {
if (typeid(*pCmd) == typeid(RegisterPoll)) {
mPollQueue.push_front(std::static_pointer_cast<RegisterPoll>(pCmd));
mPopFromPoll = true;
} else {
mWriteQueue.push_front(std::static_pointer_cast<RegisterWrite>(pCmd));
mPopFromPoll = false;
}
}

}
2 changes: 2 additions & 0 deletions libmodmqttsrv/modbus_request_queues.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class ModbusRequestsQueues {
// to count and log write errors in 5min timeframes
void addWriteCommand(const std::shared_ptr<RegisterWrite>& pReq);

void readdCommand(const std::shared_ptr<RegisterCommand>& pCmd);

// find the smallest positive difference between silence_period and delay need for register in queue.
std::chrono::steady_clock::duration findForSilencePeriod(std::chrono::steady_clock::duration pPeriod, bool ignore_first_read);

Expand Down
8 changes: 4 additions & 4 deletions libmodmqttsrv/modbus_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ ModbusThread::setPollSpecification(const MsgRegisterPollSpecification& spec) {
// do not poll a poll group declared in modbus config section
// that was not merged with any mqtt register declaration
if (it->mRefreshMsec != MsgRegisterPoll::INVALID_REFRESH) {
std::shared_ptr<RegisterPoll> reg(new RegisterPoll(it->mRegister, it->mRegisterType, it->mCount, it->mRefreshMsec));
std::map<int, ModbusSlaveConfig>::const_iterator slave_cfg = mSlaves.find(it->mSlaveId);
std::shared_ptr<RegisterPoll> reg(new RegisterPoll(it->mSlaveId, it->mRegister, it->mRegisterType, it->mCount, it->mRefreshMsec));
std::map<int, ModbusSlaveConfig>::const_iterator slave_cfg = mSlaves.find(reg->mSlaveId);

setCommandDelays(*reg, mDelayBeforeCommand, mDelayBeforeFirstCommand);
reg->setMaxRetryCounts(mMaxReadRetryCount, mMaxWriteRetryCount, true);
Expand All @@ -77,7 +77,7 @@ ModbusThread::setPollSpecification(const MsgRegisterPollSpecification& spec) {
reg->setMaxRetryCounts(slave_cfg->second.mMaxReadRetryCount, slave_cfg->second.mMaxWriteRetryCount);
}

registerMap[it->mSlaveId].push_back(reg);
registerMap[reg->mSlaveId].push_back(reg);
}
}

Expand Down Expand Up @@ -120,7 +120,7 @@ ModbusThread::processWrite(const std::shared_ptr<MsgRegisterValues>& msg) {
cmd->setMaxRetryCounts(it->second.mMaxReadRetryCount, it->second.mMaxWriteRetryCount);
}

mExecutor.addWriteCommand(msg->mSlaveId, cmd);
mExecutor.addWriteCommand(cmd);
}

void
Expand Down
8 changes: 4 additions & 4 deletions libmodmqttsrv/register_poll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ RegisterCommand::setMaxRetryCounts(short pMaxRead, short pMaxWrite, bool pForce)
mMaxWriteRetryCount = pMaxWrite;
}

RegisterPoll::RegisterPoll(int regNum, RegisterType regType, int regCount, std::chrono::milliseconds refreshMsec)
: RegisterCommand(regNum, regType, regCount),
RegisterPoll::RegisterPoll(int pSlaveId, int pRegNum, RegisterType pRegType, int pRegCount, std::chrono::milliseconds pRefreshMsec)
: RegisterCommand(pSlaveId, pRegNum, pRegType, pRegCount),
mLastRead(std::chrono::steady_clock::now() - std::chrono::hours(24)),
mLastValues(regCount)
mLastValues(pRegCount)
{
mRefresh = refreshMsec;
mRefresh = pRefreshMsec;
mReadErrors = 0;
mFirstErrorTime = std::chrono::steady_clock::now();
mDelay = std::chrono::milliseconds::zero();
Expand Down
22 changes: 13 additions & 9 deletions libmodmqttsrv/register_poll.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ namespace modmqttd {

class RegisterCommand : public ModbusAddressRange {
public:
RegisterCommand(int pRegister, RegisterType pRegisterType, int pCount)
: ModbusAddressRange(pRegister, pRegisterType, pCount)
RegisterCommand(int pSlaveId, int pRegister, RegisterType pRegisterType, int pCount)
: ModbusAddressRange(pRegister, pRegisterType, pCount),
mSlaveId(pSlaveId)
{}

virtual int getRegister() const = 0;
Expand All @@ -30,6 +31,8 @@ class RegisterCommand : public ModbusAddressRange {

void setMaxRetryCounts(short pMaxRead, short pMaxWrite, bool pForce = false);

int mSlaveId;

short mMaxReadRetryCount;
short mMaxWriteRetryCount;
protected:
Expand All @@ -44,7 +47,7 @@ class RegisterPoll : public RegisterCommand {
// if we cannot read register in this time MsgRegisterReadFailed is sent
static constexpr int DefaultReadErrorCount = 3;

RegisterPoll(int regNum, RegisterType regType, int regCount, std::chrono::milliseconds refreshMsec);
RegisterPoll(int pSlaveId, int regNum, RegisterType regType, int regCount, std::chrono::milliseconds refreshMsec);

virtual int getRegister() const { return mRegister; };
virtual int getCount() const { return mLastValues.size(); }
Expand All @@ -68,13 +71,13 @@ class RegisterPoll : public RegisterCommand {
class RegisterWrite : public RegisterCommand {
public:
RegisterWrite(const MsgRegisterValues& msg)
: RegisterWrite(msg.mRegister,
msg.mRegisterType,
msg.mRegisters
)
: RegisterCommand(msg.mSlaveId, msg.mRegister, msg.mRegisterType, msg.mRegisters.getCount()),
mCreationTime(msg.mCreationTime),
mValues(msg.mRegisters)
{}
RegisterWrite(int pRegister, RegisterType pType, const ModbusRegisters& pValues)
: RegisterCommand(pRegister, pType, pValues.getCount()),
RegisterWrite(int pSlaveId, int pRegister, RegisterType pType, const ModbusRegisters& pValues)
: RegisterCommand(pSlaveId, pRegister, pType, pValues.getCount()),
mCreationTime(std::chrono::steady_clock::now()),
mValues(pValues)
{}

Expand All @@ -86,6 +89,7 @@ class RegisterWrite : public RegisterCommand {
ModbusRegisters mValues;

bool mLastWriteOk = false;
std::chrono::steady_clock::time_point mCreationTime;

std::shared_ptr<MsgRegisterValues> mReturnMessage;
};
Expand Down
6 changes: 3 additions & 3 deletions unittests/mockedmodbuscontext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ MockedModbusContext::getWriteCount(int slaveId) const {
uint16_t
MockedModbusContext::getModbusRegisterValue(int slaveId, int regNum, modmqttd::RegisterType regtype) {
mInternalOperation = true;
modmqttd::RegisterPoll poll(--regNum, regtype, 1, std::chrono::milliseconds(0));
modmqttd::RegisterPoll poll(slaveId, --regNum, regtype, 1, std::chrono::milliseconds(0));

auto vals = readModbusRegisters(slaveId, poll);
return vals[0];
Expand Down Expand Up @@ -353,9 +353,9 @@ void
MockedModbusFactory::setModbusRegisterValue(const char* network, int slaveId, int regNum, modmqttd::RegisterType regtype, uint16_t val) {
regNum--;
std::shared_ptr<MockedModbusContext> ctx = getOrCreateContext(network);
modmqttd::RegisterWrite msg(regNum, regtype, ModbusRegisters(val));
modmqttd::RegisterWrite msg(slaveId, regNum, regtype, ModbusRegisters(val));
ctx->mInternalOperation = true;
ctx->writeModbusRegisters(slaveId, msg);
ctx->writeModbusRegisters(msg.mSlaveId, msg);
}


Expand Down
4 changes: 2 additions & 2 deletions unittests/modbus_executor_single_delay_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ TEST_CASE("ModbusExecutor for first delay config") {
executor.executeNext();

// add write about to be executed
auto write = registers.createWriteDelayed(1,0x15, std::chrono::milliseconds(30));
executor.addWriteCommand(1, write);
auto write = registers.createWriteDelayed(1, 1,0x15, std::chrono::milliseconds(30));
executor.addWriteCommand(write);
waitTime = executor.executeNext();
REQUIRE(executor.getWaitingCommand() == write);

Expand Down
Loading

0 comments on commit 6deb0d7

Please sign in to comment.