diff --git a/README.md b/README.md index 3bc915a..91ba71f 100644 --- a/README.md +++ b/README.md @@ -141,7 +141,16 @@ Modbus network configuration parameters are listed below: Same as delay_before_first_command, but a delay is applied to every modbus command sent on this network. -* RTU device settings +* **read_retries** (optional, default 1) + + A number of retries after a modbus read command fails. + +* **write_retries** (optional, default 2) + + A number of retries after a modbus write command fails. + +* **RTU device settings** + For details, see modbus_new_rtu(3) * **device** (required) @@ -176,7 +185,7 @@ Modbus network configuration parameters are listed below: modbus Request To Send delay period in microseconds. See modbus_rtu_set_rts_delay(3) -* TCP/IP device settings +* **TCP/IP device settings** * **address** @@ -186,6 +195,16 @@ Modbus network configuration parameters are listed below: TCP port of a device +* **watchdog** (optional) + + An optional configuration section for modbus connection watchdog. Watchdog monitors modbus command errors. If there is no successful command execution in *watch_period*, then it restarts the modbus connection. + + Additionally for RTU network the *device* path is checked on the first error and then in small (300ms) time periods. If modbus RTU device is unplugged, then connection is restarted. + + * **watch_period** (optional, timespan, default=10s) + + The amount of time after which the connection should be reestablished if there has been no successful execution of a modbus command. + * **slaves** (optional) An optional slave list with modbus specific configuration like register groups to poll (see poll groups below) and timing constraints @@ -201,6 +220,14 @@ Modbus network configuration parameters are listed below: Same as global *delay_before_command* but applies to this slave only. + * **read_retries** (optional) + + A number of retries after a modbus read command to this slave fails. Uses the global *read_retries* if not defined. + + * **write_retries** (optional) + + A number of retries after a modbus write command to this slave fails. Uses the global *write_retries* if not defined. + * **poll_groups** (optional) An optional list of modbus register address ranges that will be polled with a single modbus_read_registers(3) call. diff --git a/libmodmqttsrv/CMakeLists.txt b/libmodmqttsrv/CMakeLists.txt index 9148df1..7aa8320 100644 --- a/libmodmqttsrv/CMakeLists.txt +++ b/libmodmqttsrv/CMakeLists.txt @@ -31,6 +31,8 @@ add_library(modmqttsrv modbus_thread.hpp modbus_types.cpp modbus_types.hpp + modbus_watchdog.cpp + modbus_watchdog.hpp modmqtt.cpp modmqtt.hpp mosquitto.cpp diff --git a/libmodmqttsrv/config.cpp b/libmodmqttsrv/config.cpp index 8d0aed5..a5f4b32 100644 --- a/libmodmqttsrv/config.cpp +++ b/libmodmqttsrv/config.cpp @@ -7,7 +7,7 @@ namespace modmqttd { boost::log::sources::severity_logger ModbusNetworkConfig::log; #if __cplusplus < 201703L - constexpr std::chrono::milliseconds ModbusNetworkConfig::MAX_RESPONSE_TIMEOUT; +constexpr std::chrono::milliseconds ModbusNetworkConfig::MAX_RESPONSE_TIMEOUT; #endif ConfigurationException::ConfigurationException(const YAML::Mark& mark, const char* what) { @@ -43,6 +43,9 @@ ModbusNetworkConfig::ModbusNetworkConfig(const YAML::Node& source) { ConfigTools::readOptionalValue(this->mDelayBeforeCommand, source, "delay_before_command"); ConfigTools::readOptionalValue(this->mDelayBeforeFirstCommand, source, "delay_before_first_command"); + ConfigTools::readOptionalValue(this->mMaxWriteRetryCount, source, "write_retries"); + ConfigTools::readOptionalValue(this->mMaxReadRetryCount, source, "read_retries"); + if (source["device"]) { mType = Type::RTU; @@ -54,6 +57,8 @@ ModbusNetworkConfig::ModbusNetworkConfig(const YAML::Node& source) { ConfigTools::readOptionalValue(this->mRtuSerialMode, source, "rtu_serial_mode"); ConfigTools::readOptionalValue(this->mRtsMode, source, "rtu_rts_mode"); ConfigTools::readOptionalValue(this->mRtsDelayUs, source, "rtu_rts_delay_us"); + + mWatchdogConfig.mDevicePath = mDevice; } else if (source["address"]) { mType = Type::TCPIP; mAddress = ConfigTools::readRequiredString(source, "address"); @@ -61,6 +66,10 @@ ModbusNetworkConfig::ModbusNetworkConfig(const YAML::Node& source) { } else { throw ConfigurationException(source.Mark(), "Cannot determine modbus network type: missing 'device' or 'address'"); } + + if (source["watchdog"]) { + ConfigTools::readOptionalValue(this->mWatchdogConfig.mWatchPeriod, source["watchdog"], "watch_period"); + } } MqttBrokerConfig::MqttBrokerConfig(const YAML::Node& source) { diff --git a/libmodmqttsrv/config.hpp b/libmodmqttsrv/config.hpp index 9876235..0260878 100644 --- a/libmodmqttsrv/config.hpp +++ b/libmodmqttsrv/config.hpp @@ -75,6 +75,11 @@ class ConfigTools { } }; +class ModbusWatchdogConfig { + public: + std::chrono::milliseconds mWatchPeriod = std::chrono::seconds(10); + std::string mDevicePath; +}; class ModbusNetworkConfig { static constexpr std::chrono::milliseconds MAX_RESPONSE_TIMEOUT = std::chrono::milliseconds(999); @@ -106,9 +111,14 @@ class ModbusNetworkConfig { std::string mName = ""; std::chrono::milliseconds mResponseTimeout = std::chrono::milliseconds(500); std::chrono::milliseconds mResponseDataTimeout = std::chrono::seconds(0); + std::chrono::milliseconds mDelayBeforeCommand = std::chrono::seconds(0); std::chrono::milliseconds mDelayBeforeFirstCommand = std::chrono::seconds(0); + unsigned short mMaxWriteRetryCount = 2; + unsigned short mMaxReadRetryCount = 1; + + //RTU only std::string mDevice = ""; int mBaud = 0; @@ -122,6 +132,8 @@ class ModbusNetworkConfig { //TCP only std::string mAddress = ""; int mPort = 0; + + ModbusWatchdogConfig mWatchdogConfig; }; class MqttBrokerConfig { diff --git a/libmodmqttsrv/modbus_context.cpp b/libmodmqttsrv/modbus_context.cpp index 78f499a..5e7f6cf 100644 --- a/libmodmqttsrv/modbus_context.cpp +++ b/libmodmqttsrv/modbus_context.cpp @@ -117,6 +117,7 @@ ModbusContext::connect() { void ModbusContext::disconnect() { modbus_close(mCtx); + mIsConnected = false; } std::vector diff --git a/libmodmqttsrv/modbus_executor.cpp b/libmodmqttsrv/modbus_executor.cpp index 1f46818..2b69653 100644 --- a/libmodmqttsrv/modbus_executor.cpp +++ b/libmodmqttsrv/modbus_executor.cpp @@ -12,7 +12,7 @@ namespace modmqttd { boost::log::sources::severity_logger ModbusExecutor::log; #if __cplusplus < 201703L - constexpr short ModbusExecutor::WRITE_BATCH_SIZE; +constexpr short ModbusExecutor::WRITE_BATCH_SIZE; #endif ModbusExecutor::ModbusExecutor( @@ -22,10 +22,12 @@ ModbusExecutor::ModbusExecutor( : mFromModbusQueue(fromModbusQueue), mToModbusQueue(toModbusQueue) { //some random past value, not using steady_clock:min() due to overflow - mLastPollTime = std::chrono::steady_clock::now() - std::chrono::hours(100000); + mLastCommandTime = std::chrono::steady_clock::now() - std::chrono::hours(100000); mLastQueue = mSlaveQueues.end(); mCurrentSlaveQueue = mSlaveQueues.end(); mInitialPoll = false; + mReadRetryCount = mMaxReadRetryCount; + mWriteRetryCount = mMaxWriteRetryCount; } void @@ -54,7 +56,7 @@ ModbusExecutor::addPollList(const std::map::iterator first_added = mSlaveQueues.end(); @@ -75,12 +77,11 @@ ModbusExecutor::addPollList(const std::map(last_silence_period).count() << "ms"; - assert(mWaitingRegister == nullptr); - mWaitingRegister = nullptr; + assert(mWaitingCommand == nullptr); resetCommandsCounter(); auto currentDiff = std::chrono::steady_clock::duration::max(); @@ -92,11 +93,11 @@ ModbusExecutor::addPollList(const std::mapsecond.findForSilencePeriod(last_silence_period, ignore_first_read); if (reg_delay < currentDiff) { - std::shared_ptr reg(sit->second.popFirstWithDelay(last_silence_period, ignore_first_read)); - mWaitingRegister = reg; + std::shared_ptr reg(sit->second.popFirstWithDelay(last_silence_period, ignore_first_read)); + mWaitingCommand = reg; mCurrentSlaveQueue = sit; currentDiff = reg_delay; - BOOST_LOG_SEV(log, Log::trace) << "Electing next register to poll as " << mCurrentSlaveQueue->first << "." << mWaitingRegister->getRegister() + BOOST_LOG_SEV(log, Log::trace) << "Electing next register to poll as " << mCurrentSlaveQueue->first << "." << mWaitingCommand->getRegister() << ", delay=" << std::chrono::duration_cast(reg_delay).count() << "ms" << ", diff=" << std::chrono::duration_cast(currentDiff).count() << "ms"; if (reg_delay.count() == 0) @@ -106,17 +107,17 @@ ModbusExecutor::addPollList(const std::mapsecond.popNext(); + if (mWaitingCommand == nullptr) { + mWaitingCommand = mCurrentSlaveQueue->second.popNext(); } - BOOST_LOG_SEV(log, Log::trace) << "Next register to poll set to " << mCurrentSlaveQueue->first << "." << mWaitingRegister->getRegister() << ", commands_left=" << mCommandsLeft; + BOOST_LOG_SEV(log, Log::trace) << "Next register to poll set to " << mCurrentSlaveQueue->first << "." << mWaitingCommand->getRegister() << ", commands_left=" << mCommandsLeft; } void ModbusExecutor::addWriteCommand(int slaveId, const std::shared_ptr& pCommand) { - auto& queue = mSlaveQueues[slaveId]; + ModbusRequestsQueues& queue = mSlaveQueues[slaveId]; queue.addWriteCommand(pCommand); if (mCurrentSlaveQueue == mSlaveQueues.end()) { mCurrentSlaveQueue = mSlaveQueues.begin(); @@ -129,30 +130,43 @@ void ModbusExecutor::pollRegisters(int slaveId, RegisterPoll& reg, bool forceSend) { try { std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now(); + std::vector newValues(mModbus->readModbusRegisters(slaveId, reg)); - mLastPollTime = reg.mLastRead = std::chrono::steady_clock::now(); + reg.mLastReadOk = true; std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now(); - BOOST_LOG_SEV(log, Log::debug) << "Register " << slaveId << "." << reg.mRegister << " (0x" << std::hex << slaveId << ".0x" << std::hex << reg.mRegister << ")" + BOOST_LOG_SEV(log, Log::trace) << "Register " << slaveId << "." << reg.mRegister << " (0x" << std::hex << slaveId << ".0x" << std::hex << reg.mRegister << ")" << " polled in " << std::dec << std::chrono::duration_cast(end - start).count() << "ms"; if ((reg.getValues() != newValues) || forceSend || (reg.mReadErrors != 0)) { MsgRegisterValues val(slaveId, reg.mRegisterType, reg.mRegister, newValues); sendMessage(QueueItem::create(val)); reg.update(newValues); + if (reg.mReadErrors != 0) { + BOOST_LOG_SEV(log, Log::debug) << "Register " + << slaveId << "." << reg.mRegister + << " read ok after " << reg.mReadErrors << " error(s)"; + } reg.mReadErrors = 0; - BOOST_LOG_SEV(log, Log::debug) << "Register " << slaveId << "." << reg.mRegister + BOOST_LOG_SEV(log, Log::trace) << "Register " << slaveId << "." << reg.mRegister << " values sent, data=" << DebugTools::registersToStr(reg.getValues()); }; } catch (const ModbusReadException& ex) { handleRegisterReadError(slaveId, reg, ex.what()); } + // set mLastRead regardless if modbus command was successful or not + // ModbusScheduler should not reschedule again after failed read + // This will cause endless readModbusRegisters if register always + // returns read error + mLastCommandTime = reg.mLastRead = std::chrono::steady_clock::now(); }; void ModbusExecutor::handleRegisterReadError(int slaveId, RegisterPoll& regPoll, const char* errorMessage) { // avoid flooding logs with register read error messages - log last error every 5 minutes regPoll.mReadErrors++; + regPoll.mLastReadOk = false; + if (regPoll.mReadErrors == 1 || (std::chrono::steady_clock::now() - regPoll.mFirstErrorTime > RegisterPoll::DurationBetweenLogError)) { BOOST_LOG_SEV(log, Log::error) << regPoll.mReadErrors << " error(s) when reading register " << slaveId << "." << regPoll.mRegister << ", last error: " << errorMessage; @@ -169,11 +183,11 @@ ModbusExecutor::handleRegisterReadError(int slaveId, RegisterPoll& regPoll, cons } void -ModbusExecutor::writeRegisters(int slaveId, const RegisterWrite& cmd) { +ModbusExecutor::writeRegisters(int slaveId, RegisterWrite& cmd) { try { std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now(); mModbus->writeModbusRegisters(slaveId, cmd); - mLastPollTime = std::chrono::steady_clock::now(); + cmd.mLastWriteOk = true; std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now(); BOOST_LOG_SEV(log, Log::debug) << "Register " << slaveId << "." << cmd.mRegister << " (0x" << std::hex << slaveId << ".0x" << std::hex << cmd.mRegister << ")" @@ -186,34 +200,36 @@ ModbusExecutor::writeRegisters(int slaveId, const RegisterWrite& cmd) { } catch (const ModbusWriteException& ex) { BOOST_LOG_SEV(log, Log::error) << "error writing register " << slaveId << "." << cmd.mRegister << ": " << ex.what(); + cmd.mLastWriteOk = false; MsgRegisterWriteFailed msg(slaveId, cmd.mRegisterType, cmd.mRegister, cmd.getCount()); sendMessage(QueueItem::create(msg)); } + mLastCommandTime = std::chrono::steady_clock::now(); } std::chrono::steady_clock::duration -ModbusExecutor::pollNext() { +ModbusExecutor::executeNext() { //assert(!allDone()); - if (mWaitingRegister != nullptr) { + if (mWaitingCommand != nullptr) { if ( - (mWaitingRegister->getDelay().delay_type == ModbusCommandDelay::DelayType::EVERYTIME) + (mWaitingCommand->getDelay().delay_type == ModbusCommandDelay::DelayType::EVERYTIME) || ( - mWaitingRegister->getDelay().delay_type == ModbusCommandDelay::DelayType::ON_SLAVE_CHANGE + mWaitingCommand->getDelay().delay_type == ModbusCommandDelay::DelayType::ON_SLAVE_CHANGE && mLastQueue != mSlaveQueues.end() && mCurrentSlaveQueue->first != mLastQueue->first ) ) { - auto delay_passed = std::chrono::steady_clock::now() - mLastPollTime; - auto delay_left = mWaitingRegister->getDelay() - delay_passed; + auto delay_passed = std::chrono::steady_clock::now() - mLastCommandTime; + auto delay_left = mWaitingCommand->getDelay() - delay_passed; if (delay_left > std::chrono::steady_clock::duration::zero()) { - BOOST_LOG_SEV(log, Log::trace) << "Command for " << mCurrentSlaveQueue->first << "." << mWaitingRegister->getRegister() + BOOST_LOG_SEV(log, Log::trace) << "Command for " << mCurrentSlaveQueue->first << "." << mWaitingCommand->getRegister() << " need to wait " << std::chrono::duration_cast(delay_left).count() << "ms"; return delay_left; } } - //mWaitingRegister is ready to be read or written + //mWaitingCommand is ready to be read or written sendCommand(); } else { // find next non empty queue and start sending requests from it @@ -235,28 +251,28 @@ ModbusExecutor::pollNext() { // nextQueue could point at it again after doing full circle if (mCurrentSlaveQueue != nextQueue || !mCurrentSlaveQueue->second.empty()) { mCurrentSlaveQueue = nextQueue; - mWaitingRegister = mCurrentSlaveQueue->second.popNext(); + mWaitingCommand = mCurrentSlaveQueue->second.popNext(); resetCommandsCounter(); } else { //nothing to do return std::chrono::steady_clock::duration::max(); } } else { - mWaitingRegister = mCurrentSlaveQueue->second.popNext(); + mWaitingCommand = mCurrentSlaveQueue->second.popNext(); } } - if (mWaitingRegister != nullptr) { - if (mWaitingRegister->getDelay() != std::chrono::steady_clock::duration::zero()) { + if (mWaitingCommand != nullptr) { + if (mWaitingCommand->getDelay() != std::chrono::steady_clock::duration::zero()) { // setup and return needed delay // or pull register if there was enough silence // after previous command - auto last_silence_period = std::chrono::steady_clock::now() - mLastPollTime; - if (last_silence_period < mWaitingRegister->getDelay()) { - auto delay_left = mWaitingRegister->getDelay() - last_silence_period; + auto last_silence_period = std::chrono::steady_clock::now() - mLastCommandTime; + if (last_silence_period < mWaitingCommand->getDelay()) { + auto delay_left = mWaitingCommand->getDelay() - last_silence_period; - BOOST_LOG_SEV(log, Log::trace) << "Next register set to " << mCurrentSlaveQueue->first << "." << mWaitingRegister->getRegister() - << ", delay=" << std::chrono::duration_cast(mWaitingRegister->getDelay()).count() << "ms" + BOOST_LOG_SEV(log, Log::trace) << "Next register set to " << mCurrentSlaveQueue->first << "." << mWaitingCommand->getRegister() + << ", delay=" << std::chrono::duration_cast(mWaitingCommand->getDelay()).count() << "ms" << ", left=" << std::chrono::duration_cast(delay_left).count() << "ms"; return delay_left; } @@ -270,7 +286,7 @@ ModbusExecutor::pollNext() { BOOST_LOG_SEV(log, Log::info) << "Nothing to do for initial poll"; } else { auto end = std::chrono::steady_clock::now(); - BOOST_LOG_SEV(log, Log::info) << "Initial poll done in " << std::chrono::duration_cast(end - mPollStart).count() << "ms"; + BOOST_LOG_SEV(log, Log::info) << "Initial poll done in " << std::chrono::duration_cast(end - mInitialPollStart).count() << "ms"; mInitialPoll = false; } } @@ -280,20 +296,51 @@ ModbusExecutor::pollNext() { void ModbusExecutor::sendCommand() { - if (typeid(*mWaitingRegister) == typeid(RegisterPoll)) { - pollRegisters(mCurrentSlaveQueue->first, static_cast(*mWaitingRegister), mInitialPoll); + bool retry = false; + if (mWaitingCommand != mLastCommand) { + setMaxReadRetryCount(mWaitingCommand->mMaxReadRetryCount); + setMaxWriteRetryCount(mWaitingCommand->mMaxWriteRetryCount); + } + + + if (typeid(*mWaitingCommand) == typeid(RegisterPoll)) { + RegisterPoll& pollcmd(static_cast(*mWaitingCommand)); + pollRegisters(mCurrentSlaveQueue->first, pollcmd, mInitialPoll); + if (!pollcmd.mLastReadOk) { + if (mReadRetryCount != 0) { + retry = true; + mReadRetryCount--; + } + } else { + mReadRetryCount = mMaxReadRetryCount; + } } else { - writeRegisters(mCurrentSlaveQueue->first, static_cast(*mWaitingRegister)); + RegisterWrite& writecmd(static_cast(*mWaitingCommand)); + writeRegisters(mCurrentSlaveQueue->first, writecmd); + if (!writecmd.mLastWriteOk) { + if (mWriteRetryCount != 0) { + retry = true; + mWriteRetryCount--; + } + } else { + mWriteRetryCount = mMaxWriteRetryCount; + } } mLastQueue = mCurrentSlaveQueue; - mWaitingRegister.reset(); - mCommandsLeft--; + mLastCommand = mWaitingCommand; + + // to retry just leave mCurrentCommand + // for next executeNext() call + if (!retry) { + mWaitingCommand.reset(); + mCommandsLeft--; + } } bool ModbusExecutor::allDone() const { - if (mWaitingRegister != nullptr) + if (mWaitingCommand != nullptr) return false; auto non_empty = std::find_if(mSlaveQueues.begin(), mSlaveQueues.end(), @@ -305,7 +352,7 @@ ModbusExecutor::allDone() const { bool ModbusExecutor::pollDone() const { - if (mWaitingRegister != nullptr && typeid(*mWaitingRegister) == typeid(RegisterPoll)) + if (mWaitingCommand != nullptr && typeid(*mWaitingCommand) == typeid(RegisterPoll)) return false; auto non_empty = std::find_if(mSlaveQueues.begin(), mSlaveQueues.end(), diff --git a/libmodmqttsrv/modbus_executor.hpp b/libmodmqttsrv/modbus_executor.hpp index d16044a..5ab8c40 100644 --- a/libmodmqttsrv/modbus_executor.hpp +++ b/libmodmqttsrv/modbus_executor.hpp @@ -35,15 +35,19 @@ class ModbusExecutor { * * If queues are empty return duration=max */ - std::chrono::steady_clock::duration pollNext(); + std::chrono::steady_clock::duration executeNext(); - std::chrono::steady_clock::duration getTotalPollDuration() const { - return std::chrono::steady_clock::now() - mPollStart; - } - bool isInitial() const { return mInitialPoll; } + bool isInitialPollInProgress() const { return mInitialPoll; } int getCommandsLeft() const { return mCommandsLeft; } - const std::shared_ptr& getWaitingRegister() const { return mWaitingRegister; } + + const std::shared_ptr& getWaitingCommand() const { return mWaitingCommand; } + /* + Returns last command executed by executeNext or nullptr if executeNext() + returns non-zero duration + */ + const std::shared_ptr& getLastCommand() const { return mLastCommand; } + private: static boost::log::sources::severity_logger log; @@ -60,20 +64,30 @@ class ModbusExecutor { // is called faster than executor is able to handle them. int mCommandsLeft = 0; - std::chrono::steady_clock::time_point mLastPollTime; + short mMaxReadRetryCount = 0; + short mMaxWriteRetryCount = 0; + short mWriteRetryCount; + short mReadRetryCount; + + std::chrono::steady_clock::time_point mLastCommandTime; //used to determine if we have to respect delay of RegisterPoll::ReadDelayType::ON_SLAVE_CHANGE std::map::iterator mLastQueue; - std::shared_ptr mWaitingRegister; + std::shared_ptr mWaitingCommand; + std::shared_ptr mLastCommand; + bool mInitialPoll; - std::chrono::time_point mPollStart; + std::chrono::time_point mInitialPollStart; void sendCommand(); void pollRegisters(int slaveId, RegisterPoll& reg_ptr, bool forceSend); - void writeRegisters(int slaveId, const RegisterWrite& cmd); + void writeRegisters(int slaveId, RegisterWrite& cmd); void sendMessage(const QueueItem& item); void handleRegisterReadError(int slaveId, RegisterPoll& reg, const char* errorMessage); void resetCommandsCounter(); + + void setMaxReadRetryCount(short val) { mMaxReadRetryCount = mReadRetryCount = val; } + void setMaxWriteRetryCount(short val) { mMaxWriteRetryCount = mWriteRetryCount = val; } }; } diff --git a/libmodmqttsrv/modbus_messages.cpp b/libmodmqttsrv/modbus_messages.cpp index 0c634bc..9fc256d 100644 --- a/libmodmqttsrv/modbus_messages.cpp +++ b/libmodmqttsrv/modbus_messages.cpp @@ -7,7 +7,7 @@ namespace modmqttd { boost::log::sources::severity_logger MsgRegisterPollSpecification::log; #if __cplusplus < 201703L - constexpr std::chrono::milliseconds MsgRegisterPoll::INVALID_REFRESH; +constexpr std::chrono::milliseconds MsgRegisterPoll::INVALID_REFRESH; #endif void diff --git a/libmodmqttsrv/modbus_request_queues.cpp b/libmodmqttsrv/modbus_request_queues.cpp index 5181711..abf1f2c 100644 --- a/libmodmqttsrv/modbus_request_queues.cpp +++ b/libmodmqttsrv/modbus_request_queues.cpp @@ -16,9 +16,9 @@ ModbusRequestsQueues::addPollList(const std::vector +std::shared_ptr ModbusRequestsQueues::popNext() { - std::shared_ptr ret; + std::shared_ptr ret; if (mPopFromPoll) { if (mPollQueue.empty()) { ret = popNext(mWriteQueue); @@ -39,10 +39,10 @@ ModbusRequestsQueues::popNext() { template -std::shared_ptr +std::shared_ptr ModbusRequestsQueues::popNext(T& queue) { assert(!queue.empty()); - std::shared_ptr ret(queue.front()); + std::shared_ptr ret(queue.front()); queue.pop_front(); return ret; } @@ -75,9 +75,9 @@ ModbusRequestsQueues::findForSilencePeriod(std::chrono::steady_clock::duration p return ret; } -std::shared_ptr +std::shared_ptr ModbusRequestsQueues::popFirstWithDelay(std::chrono::steady_clock::duration pPeriod, bool ignore_first_read) { - auto ret = std::shared_ptr(); + auto ret = std::shared_ptr(); check_cache: if (mLastPollFound != mPollQueue.end()) { ret = *mLastPollFound; diff --git a/libmodmqttsrv/modbus_request_queues.hpp b/libmodmqttsrv/modbus_request_queues.hpp index 32396e1..12a3d93 100644 --- a/libmodmqttsrv/modbus_request_queues.hpp +++ b/libmodmqttsrv/modbus_request_queues.hpp @@ -22,12 +22,12 @@ class ModbusRequestsQueues { // pop the first register with pDelay // uses popNext() if pDelay is not found in queue - std::shared_ptr popFirstWithDelay(std::chrono::steady_clock::duration pPeriod, bool ignore_first_read); + std::shared_ptr popFirstWithDelay(std::chrono::steady_clock::duration pPeriod, bool ignore_first_read); // remove next RegisterPoll from queue and return it // if mPollQueue is empty then move all registers from // mNextPollQueue and return the first one - std::shared_ptr popNext(); + std::shared_ptr popNext(); bool empty() const { return mPollQueue.empty() && mWriteQueue.empty(); } @@ -39,7 +39,7 @@ class ModbusRequestsQueues { //cache for popFirstWithDelay std::deque>::iterator mLastPollFound; - template std::shared_ptr popNext(T& queue); + template std::shared_ptr popNext(T& queue); // if true then popNext will get element from mPollQueue, // otherwise from mWriteQueue diff --git a/libmodmqttsrv/modbus_scheduler.cpp b/libmodmqttsrv/modbus_scheduler.cpp index c6f7742..cd49ce1 100644 --- a/libmodmqttsrv/modbus_scheduler.cpp +++ b/libmodmqttsrv/modbus_scheduler.cpp @@ -12,7 +12,7 @@ ModbusScheduler::getRegistersToPoll( ) { std::map>> ret; - //BOOST_LOG_SEV(log, Log::debug) << "initial outduration " << std::chrono::duration_cast(outDuration).count(); + //BOOST_LOG_SEV(log, Log::trace) << "initial outduration " << std::chrono::duration_cast(outDuration).count(); outDuration = std::chrono::steady_clock::duration::max(); for(std::map>>::const_iterator slave = mRegisterMap.begin(); @@ -26,10 +26,10 @@ ModbusScheduler::getRegistersToPoll( auto time_passed = timePoint - reg.mLastRead; auto time_to_poll = reg.mRefresh; - //BOOST_LOG_SEV(log, Log::debug) << "time passed: " << std::chrono::duration_cast(time_to_poll).count(); + //BOOST_LOG_SEV(log, Log::trace) << "time passed: " << std::chrono::duration_cast(time_to_poll).count(); if (time_passed >= reg.mRefresh) { - BOOST_LOG_SEV(log, Log::debug) << "Register " << slave->first << "." << reg.mRegister << " (0x" << std::hex << slave->first << ".0x" << std::hex << reg.mRegister << ")" + BOOST_LOG_SEV(log, Log::trace) << "Register " << slave->first << "." << reg.mRegister << " (0x" << std::hex << slave->first << ".0x" << std::hex << reg.mRegister << ")" << " added, last read " << std::dec << std::chrono::duration_cast(time_passed).count() << "ms ago"; ret[slave->first].push_back(*reg_it); } else { @@ -38,7 +38,7 @@ ModbusScheduler::getRegistersToPoll( if (outDuration > time_to_poll) { outDuration = time_to_poll; - BOOST_LOG_SEV(log, Log::debug) << "Wait duration set to " << std::chrono::duration_cast(time_to_poll).count() + BOOST_LOG_SEV(log, Log::trace) << "Wait duration set to " << std::chrono::duration_cast(time_to_poll).count() << "ms as next poll for register " << slave->first << "." << reg.mRegister << " (0x" << std::hex << slave->first << ".0x" << std::hex << reg.mRegister << ")"; } } diff --git a/libmodmqttsrv/modbus_slave.cpp b/libmodmqttsrv/modbus_slave.cpp index f3cea34..8e704ba 100644 --- a/libmodmqttsrv/modbus_slave.cpp +++ b/libmodmqttsrv/modbus_slave.cpp @@ -20,6 +20,8 @@ ModbusSlaveConfig::ModbusSlaveConfig(const YAML::Node& data) { ConfigTools::readOptionalValue(this->mDelayBeforeCommand, data, "delay_before_command"); ConfigTools::readOptionalValue(this->mDelayBeforeFirstCommand, data, "delay_before_first_command"); + ConfigTools::readOptionalValue(this->mMaxWriteRetryCount, data, "write_retries"); + ConfigTools::readOptionalValue(this->mMaxReadRetryCount, data, "read_retries"); } } diff --git a/libmodmqttsrv/modbus_slave.hpp b/libmodmqttsrv/modbus_slave.hpp index 8899d56..f8bed5e 100644 --- a/libmodmqttsrv/modbus_slave.hpp +++ b/libmodmqttsrv/modbus_slave.hpp @@ -16,6 +16,9 @@ class ModbusSlaveConfig { int mAddress; std::chrono::milliseconds mDelayBeforeCommand = std::chrono::milliseconds::zero(); std::chrono::milliseconds mDelayBeforeFirstCommand = std::chrono::milliseconds::zero(); + + unsigned short mMaxWriteRetryCount = 0; + unsigned short mMaxReadRetryCount = 0; }; } diff --git a/libmodmqttsrv/modbus_thread.cpp b/libmodmqttsrv/modbus_thread.cpp index 3de075b..f15df2b 100644 --- a/libmodmqttsrv/modbus_thread.cpp +++ b/libmodmqttsrv/modbus_thread.cpp @@ -9,7 +9,7 @@ namespace modmqttd { void -setCommandDelays(IRegisterCommand& cmd, const std::chrono::milliseconds& everyTime, const std::chrono::milliseconds& onChange) { +setCommandDelays(RegisterCommand& cmd, const std::chrono::milliseconds& everyTime, const std::chrono::milliseconds& onChange) { ModbusCommandDelay delay; if (everyTime != std::chrono::milliseconds::zero()) { delay = everyTime; @@ -21,7 +21,6 @@ setCommandDelays(IRegisterCommand& cmd, const std::chrono::milliseconds& everyTi cmd.setDelay(delay); } - void ModbusThread::sendMessageFromModbus(moodycamel::BlockingReaderWriterQueue& fromModbusQueue, const QueueItem& item) { fromModbusQueue.enqueue(item); @@ -43,6 +42,7 @@ ModbusThread::configure(const ModbusNetworkConfig& config) { mModbus = ModMqtt::getModbusFactory().getContext(config.mName); mModbus->init(config); mExecutor.init(mModbus); + mWatchdog.init(config.mWatchdogConfig); mDelayBeforeCommand = config.mDelayBeforeCommand; mDelayBeforeFirstCommand = config.mDelayBeforeFirstCommand; @@ -52,6 +52,9 @@ ModbusThread::configure(const ModbusNetworkConfig& config) { << ", delay when slave changes " << std::chrono::duration_cast(mDelayBeforeFirstCommand).count() << "ms"; } + + mMaxReadRetryCount = config.mMaxReadRetryCount; + mMaxWriteRetryCount = config.mMaxWriteRetryCount; } void @@ -67,9 +70,12 @@ ModbusThread::setPollSpecification(const MsgRegisterPollSpecification& spec) { std::map::const_iterator slave_cfg = mSlaves.find(it->mSlaveId); setCommandDelays(*reg, mDelayBeforeCommand, mDelayBeforeFirstCommand); + reg->setMaxRetryCounts(mMaxReadRetryCount, mMaxWriteRetryCount, true); - if (slave_cfg != mSlaves.end()) + if (slave_cfg != mSlaves.end()) { setCommandDelays(*reg, slave_cfg->second.mDelayBeforeCommand, slave_cfg->second.mDelayBeforeFirstCommand); + reg->setMaxRetryCounts(slave_cfg->second.mMaxReadRetryCount, slave_cfg->second.mMaxWriteRetryCount); + } registerMap[it->mSlaveId].push_back(reg); } @@ -107,12 +113,13 @@ ModbusThread::processWrite(const std::shared_ptr& msg) { } setCommandDelays(*cmd, mDelayBeforeCommand, mDelayBeforeFirstCommand); + cmd->setMaxRetryCounts(mMaxReadRetryCount, mMaxWriteRetryCount, true); std::map::const_iterator it = mSlaves.find(msg->mSlaveId); if (it != mSlaves.end()) { setCommandDelays(*cmd, it->second.mDelayBeforeCommand, it->second.mDelayBeforeFirstCommand); + cmd->setMaxRetryCounts(it->second.mMaxReadRetryCount, it->second.mMaxWriteRetryCount); } - mExecutor.addWriteCommand(msg->mSlaveId, cmd); } @@ -161,6 +168,7 @@ ModbusThread::updateFromSlaveConfig(const ModbusSlaveConfig& pConfig) { if (slave_registers != registers.end()) { for (auto it = slave_registers->second.begin(); it != slave_registers->second.end(); it++) { setCommandDelays(**it, pConfig.mDelayBeforeCommand, pConfig.mDelayBeforeFirstCommand); + (*it)->setMaxRetryCounts(pConfig.mMaxReadRetryCount, pConfig.mMaxWriteRetryCount); } } } @@ -197,10 +205,11 @@ ModbusThread::run() { mModbus->connect(); if (mModbus->isConnected()) { BOOST_LOG_SEV(log, Log::info) << "modbus: connected"; + mWatchdog.reset(); sendMessage(QueueItem::create(MsgModbusNetworkState(mNetworkName, true))); // if modbus network was disconnected // we need to refresh everything - if (!mExecutor.isInitial()) { + if (!mExecutor.isInitialPollInProgress()) { mExecutor.setupInitialPoll(mScheduler.getPollSpecification()); } } @@ -214,7 +223,7 @@ ModbusThread::run() { if (mMqttConnected) { auto now = std::chrono::steady_clock::now(); - if (!mExecutor.isInitial() && nextPollTimePoint < now) { + if (!mExecutor.isInitialPollInProgress() && nextPollTimePoint < now) { std::chrono::steady_clock::duration schedulerWaitDuration; std::map>> regsToPoll = mScheduler.getRegistersToPoll(schedulerWaitDuration, now); nextPollTimePoint = now + schedulerWaitDuration; @@ -226,7 +235,10 @@ ModbusThread::run() { if (mExecutor.allDone()) { idleWaitDuration = (nextPollTimePoint - now); } else { - idleWaitDuration = mExecutor.pollNext(); + idleWaitDuration = mExecutor.executeNext(); + if (idleWaitDuration == std::chrono::steady_clock::duration::zero()) { + mWatchdog.inspectCommand(*mExecutor.getLastCommand()); + } } } else { if (!mMqttConnected) @@ -247,13 +259,26 @@ ModbusThread::run() { //dispatchMessages can change mShouldRun flag, do not wait //for next poll if we are exiting if (mShouldRun) { - QueueItem item; - BOOST_LOG_SEV(log, Log::debug) << constructIdleWaitMessage(idleWaitDuration); - if (!mToModbusQueue.wait_dequeue_timed(item, idleWaitDuration)) - continue; - dispatchMessages(item); - while(mToModbusQueue.try_dequeue(item)) + if (mModbus && mModbus->isConnected() && mWatchdog.isReconnectRequired()) { + if (mWatchdog.isDeviceRemoved()) { + BOOST_LOG_SEV(log, Log::error) << "Device " << mWatchdog.getDevicePath() << " was removed, forcing reconnect"; + } else { + BOOST_LOG_SEV(log, Log::error) << "Cannot execute any command in last " + << std::chrono::duration_cast(mWatchdog.getCurrentErrorPeriod()).count() << "s" + << ", reconnecting"; + } + mWatchdog.reset(); + mModbus->disconnect(); + sendMessage(QueueItem::create(MsgModbusNetworkState(mNetworkName, false))); + } else { + QueueItem item; + BOOST_LOG_SEV(log, Log::trace) << constructIdleWaitMessage(idleWaitDuration); + if (!mToModbusQueue.wait_dequeue_timed(item, idleWaitDuration)) + continue; dispatchMessages(item); + while(mToModbusQueue.try_dequeue(item)) + dispatchMessages(item); + } } }; if (mModbus && mModbus->isConnected()) diff --git a/libmodmqttsrv/modbus_thread.hpp b/libmodmqttsrv/modbus_thread.hpp index 32b0f86..0ea84da 100644 --- a/libmodmqttsrv/modbus_thread.hpp +++ b/libmodmqttsrv/modbus_thread.hpp @@ -7,6 +7,7 @@ #include "modbus_scheduler.hpp" #include "modbus_slave.hpp" #include "modbus_executor.hpp" +#include "modbus_watchdog.hpp" #include "imodbuscontext.hpp" @@ -29,6 +30,8 @@ class ModbusThread { std::string mNetworkName; std::chrono::milliseconds mDelayBeforeCommand = std::chrono::milliseconds::zero(); std::chrono::milliseconds mDelayBeforeFirstCommand = std::chrono::milliseconds::zero(); + short mMaxReadRetryCount; + short mMaxWriteRetryCount; // slave config std::map mSlaves; @@ -41,6 +44,7 @@ class ModbusThread { std::shared_ptr mModbus; ModbusScheduler mScheduler; ModbusExecutor mExecutor; + ModbusWatchdog mWatchdog; void configure(const ModbusNetworkConfig& config); void setPollSpecification(const MsgRegisterPollSpecification& spec); diff --git a/libmodmqttsrv/modbus_watchdog.cpp b/libmodmqttsrv/modbus_watchdog.cpp new file mode 100644 index 0000000..bb8f2bd --- /dev/null +++ b/libmodmqttsrv/modbus_watchdog.cpp @@ -0,0 +1,69 @@ +#include "modbus_watchdog.hpp" +#include "boost/filesystem.hpp" + +namespace modmqttd { + +boost::log::sources::severity_logger ModbusWatchdog::log; + +#if __cplusplus < 201703L +constexpr std::chrono::milliseconds ModbusWatchdog::sDeviceCheckPeriod; +#endif + +void +ModbusWatchdog::init(const ModbusWatchdogConfig& conf) { + mConfig = conf; + reset(); + BOOST_LOG_SEV(log, Log::debug) << "Watchdog initialized. Watch period set to " + << std::chrono::duration_cast(mConfig.mWatchPeriod).count() << "s"; + if (!mConfig.mDevicePath.empty()) { + BOOST_LOG_SEV(log, Log::debug) << "Monitoring " << mConfig.mDevicePath << " existence"; + } +} + + +void +ModbusWatchdog::inspectCommand(const RegisterCommand& command) { + if (command.executedOk()) { + reset(); + } else { + // TODO remove this hack and create separate thread on ModMqtt level + // that will monitor USB plug/unplug events using netlink or inotify + // or https://github.com/erikzenker/inotify-cpp + if (!mConfig.mDevicePath.empty()) { + auto now = std::chrono::steady_clock::now(); + if (!mDeviceRemoved && (mLastCommandOk || (now - mLastDeviceCheckTime) > sDeviceCheckPeriod)) { + mDeviceRemoved = !boost::filesystem::exists(mConfig.mDevicePath.c_str()); + mLastDeviceCheckTime = now; + if (mDeviceRemoved) { + BOOST_LOG_SEV(log, Log::warn) << "Detected device " << mConfig.mDevicePath << " removal"; + } + } + } + } + + mLastCommandOk = command.executedOk(); +} + + +bool +ModbusWatchdog::isReconnectRequired() const { + if (mDeviceRemoved) + return true; + + auto error_p = getCurrentErrorPeriod(); + BOOST_LOG_SEV(log, Log::trace) << "Watchdog: current error period is " + << std::chrono::duration_cast(error_p).count() << "ms"; + + return error_p > mConfig.mWatchPeriod; +} + + +void +ModbusWatchdog::reset() { + mLastSuccessfulCommandTime = std::chrono::steady_clock::now(); + mDeviceRemoved = false; + mLastCommandOk = true; +} + + +} diff --git a/libmodmqttsrv/modbus_watchdog.hpp b/libmodmqttsrv/modbus_watchdog.hpp new file mode 100644 index 0000000..a238d5e --- /dev/null +++ b/libmodmqttsrv/modbus_watchdog.hpp @@ -0,0 +1,38 @@ +#pragma once + +#include + +#include "imodbuscontext.hpp" +#include "modbus_messages.hpp" +#include "register_poll.hpp" + +namespace modmqttd { + +class ModbusWatchdog { + public: + void init(const ModbusWatchdogConfig& conf); + + void inspectCommand(const RegisterCommand& command); + void reset(); + bool isReconnectRequired() const; + bool isDeviceRemoved() const { return mDeviceRemoved; } + const std::string& getDevicePath() const { return mConfig.mDevicePath; } + std::chrono::steady_clock::time_point getLastSuccessfulCommandTime() const; + std::chrono::steady_clock::duration getCurrentErrorPeriod() const { + return std::chrono::steady_clock::now() - mLastSuccessfulCommandTime; + } + + private: + static boost::log::sources::severity_logger log; + static constexpr std::chrono::milliseconds sDeviceCheckPeriod = std::chrono::milliseconds(300); + + ModbusWatchdogConfig mConfig; + std::chrono::steady_clock::time_point mLastSuccessfulCommandTime; + + std::chrono::steady_clock::time_point mLastDeviceCheckTime; + bool mLastCommandOk = true; + bool mDeviceRemoved = false; +}; + + +} diff --git a/libmodmqttsrv/mqttclient.cpp b/libmodmqttsrv/mqttclient.cpp index 43bf296..bf6a3eb 100644 --- a/libmodmqttsrv/mqttclient.cpp +++ b/libmodmqttsrv/mqttclient.cpp @@ -277,7 +277,7 @@ MqttClient::onMessage(const char* topic, const void* payload, int payloadlen) { void MqttClient::addCommand(const MqttObjectCommand& pCommand) { - mCommands.insert(std::pair(pCommand.mTopic, pCommand)); + mCommands.insert(std::pair(pCommand.mTopic, pCommand)); } diff --git a/libmodmqttsrv/register_poll.cpp b/libmodmqttsrv/register_poll.cpp index db8813a..da19bfb 100644 --- a/libmodmqttsrv/register_poll.cpp +++ b/libmodmqttsrv/register_poll.cpp @@ -4,8 +4,16 @@ namespace modmqttd { constexpr std::chrono::steady_clock::duration RegisterPoll::DurationBetweenLogError; +void +RegisterCommand::setMaxRetryCounts(short pMaxRead, short pMaxWrite, bool pForce) { + if (pMaxRead != 0 || pForce) + mMaxReadRetryCount = pMaxRead; + if (pMaxWrite || pForce) + mMaxWriteRetryCount = pMaxWrite; +} + RegisterPoll::RegisterPoll(int regNum, RegisterType regType, int regCount, std::chrono::milliseconds refreshMsec) - : ModbusAddressRange(regNum, regType, regCount), + : RegisterCommand(regNum, regType, regCount), mLastRead(std::chrono::steady_clock::now() - std::chrono::hours(24)), mLastValues(regCount) { diff --git a/libmodmqttsrv/register_poll.hpp b/libmodmqttsrv/register_poll.hpp index a153d6f..6845994 100644 --- a/libmodmqttsrv/register_poll.hpp +++ b/libmodmqttsrv/register_poll.hpp @@ -10,20 +10,35 @@ namespace modmqttd { -class IRegisterCommand { +class RegisterCommand : public ModbusAddressRange { public: + RegisterCommand(int pRegister, RegisterType pRegisterType, int pCount) + : ModbusAddressRange(pRegister, pRegisterType, pCount) + {} + virtual int getRegister() const = 0; - virtual const ModbusCommandDelay& getDelay() const = 0; const bool hasDelay() const { - return getDelay() != std::chrono::steady_clock::duration::zero(); + return mDelay != std::chrono::steady_clock::duration::zero(); } - virtual void setDelay(const ModbusCommandDelay& pDelay) = 0; virtual int getCount() const = 0; virtual const std::vector& getValues() const = 0; + + virtual bool executedOk() const = 0; + + const ModbusCommandDelay& getDelay() const { return mDelay; } + void setDelay(const ModbusCommandDelay& pDelay) { mDelay = pDelay; } + + void setMaxRetryCounts(short pMaxRead, short pMaxWrite, bool pForce = false); + + short mMaxReadRetryCount; + short mMaxWriteRetryCount; + protected: + ModbusCommandDelay mDelay = std::chrono::steady_clock::duration::zero(); + }; -class RegisterPoll : public ModbusAddressRange, public IRegisterCommand { +class RegisterPoll : public RegisterCommand { public: static constexpr std::chrono::steady_clock::duration DurationBetweenLogError = std::chrono::minutes(5); // if we cannot read register in this time MsgRegisterReadFailed is sent @@ -34,26 +49,23 @@ class RegisterPoll : public ModbusAddressRange, public IRegisterCommand { virtual int getRegister() const { return mRegister; }; virtual int getCount() const { return mLastValues.size(); } virtual const std::vector& getValues() const { return mLastValues; } - virtual const ModbusCommandDelay& getDelay() const { return mDelay; } - virtual void setDelay(const ModbusCommandDelay& pDelay) { mDelay = pDelay; } + virtual bool executedOk() const { return mLastReadOk; }; + void update(const std::vector newValues) { mLastValues = newValues; mCount = newValues.size(); } std::chrono::steady_clock::duration mRefresh; - + bool mLastReadOk = false; std::chrono::steady_clock::time_point mLastRead; int mReadErrors; std::chrono::steady_clock::time_point mFirstErrorTime; private: std::vector mLastValues; - - // delay before poll - ModbusCommandDelay mDelay; }; -class RegisterWrite : public ModbusAddressRange, public IRegisterCommand { +class RegisterWrite : public RegisterCommand { public: RegisterWrite(const MsgRegisterValues& msg) : RegisterWrite(msg.mRegister, @@ -62,22 +74,20 @@ class RegisterWrite : public ModbusAddressRange, public IRegisterCommand { ) {} RegisterWrite(int pRegister, RegisterType pType, const ModbusRegisters& pValues) - : ModbusAddressRange(pRegister, pType, pValues.getCount()), - mValues(pValues), - mDelay(std::chrono::milliseconds::zero()) + : RegisterCommand(pRegister, pType, pValues.getCount()), + mValues(pValues) {} virtual int getRegister() const { return mRegister; }; - virtual const ModbusCommandDelay& getDelay() const { return mDelay; } virtual int getCount() const { return mValues.getCount(); }; virtual const std::vector& getValues() const { return mValues.values(); } - virtual void setDelay(const ModbusCommandDelay& pDelay) { mDelay = pDelay; } + virtual bool executedOk() const { return mLastWriteOk; }; ModbusRegisters mValues; + bool mLastWriteOk = false; + std::shared_ptr mReturnMessage; - private: - ModbusCommandDelay mDelay; }; } //namespace diff --git a/unittests/CMakeLists.txt b/unittests/CMakeLists.txt index 7649843..7b8874e 100644 --- a/unittests/CMakeLists.txt +++ b/unittests/CMakeLists.txt @@ -26,6 +26,8 @@ add_executable(tests modbus_silence_before_poll_tests.cpp modbus_poll_specification_tests.cpp modbus_request_queues_tests.cpp + modbus_retry_tests.cpp + modbus_watchdog_tests.cpp mqtt_command_tests.cpp mqtt_command_only_tests.cpp mqtt_command_conv_tests.cpp diff --git a/unittests/mockedmodbuscontext.cpp b/unittests/mockedmodbuscontext.cpp index 2fbdd9d..ecba217 100644 --- a/unittests/mockedmodbuscontext.cpp +++ b/unittests/mockedmodbuscontext.cpp @@ -1,13 +1,14 @@ #include "mockedmodbuscontext.hpp" +#include +#include + +#include "catch2/catch_all.hpp" + #include "libmodmqttsrv/modbus_messages.hpp" #include "libmodmqttsrv/modbus_context.hpp" #include "libmodmqttsrv/debugtools.hpp" #include "libmodmqttsrv/register_poll.hpp" -#include "catch2/catch_all.hpp" - -#include -#include const std::chrono::milliseconds MockedModbusContext::sDefaultSlaveReadTime = std::chrono::milliseconds(5); const std::chrono::milliseconds MockedModbusContext::sDefaultSlaveWriteTime = std::chrono::milliseconds(10); @@ -16,13 +17,14 @@ void MockedModbusContext::Slave::write(const modmqttd::RegisterWrite& msg, bool internalOperation) { if (!internalOperation) { std::this_thread::sleep_for(mWriteTime); + mWriteCount++; if (mDisconnected) { errno = EIO; throw modmqttd::ModbusWriteException(std::string("write fn ") + std::to_string(msg.mRegister) + " failed"); } if (hasError(msg.mRegister, msg.mRegisterType, msg.getCount())) { errno = EIO; - throw modmqttd::ModbusReadException(std::string("register write fn ") + std::to_string(msg.mRegister) + " failed"); + throw modmqttd::ModbusWriteException(std::string("register write fn ") + std::to_string(msg.mRegister) + " failed"); } } @@ -49,7 +51,6 @@ MockedModbusContext::Slave::write(const modmqttd::RegisterWrite& msg, bool inter } if (!internalOperation) { - mWriteCount++; mIOCondition->notify_all(); } } @@ -58,6 +59,7 @@ std::vector MockedModbusContext::Slave::read(const modmqttd::RegisterPoll& regData, bool internalOperation) { if (!internalOperation) { std::this_thread::sleep_for(mReadTime); + mReadCount++; if (mDisconnected) { errno = EIO; throw modmqttd::ModbusReadException(std::string("read fn ") + std::to_string(regData.mRegister) + " failed"); @@ -66,7 +68,6 @@ MockedModbusContext::Slave::read(const modmqttd::RegisterPoll& regData, bool int errno = EIO; throw modmqttd::ModbusReadException(std::string("register read fn ") + std::to_string(regData.mRegister) + " failed"); } - mReadCount++; } switch(regData.mRegisterType) { case modmqttd::RegisterType::COIL: @@ -209,8 +210,38 @@ MockedModbusContext::readModbusRegisters(int slaveId, const modmqttd::RegisterPo void MockedModbusContext::init(const modmqttd::ModbusNetworkConfig& config) { mNetworkName = config.mName; + std::string fname = std::string("_") + mNetworkName; + if (config.mType == modmqttd::ModbusNetworkConfig::Type::RTU) + fname = config.mDevice; + + mDeviceName = fname; + createFakeDevice(); +} + +void +MockedModbusContext::createFakeDevice() { + std::ofstream s(mDeviceName); + s.close(); } + +void +MockedModbusContext::connect() { + mConnectionCount++; + mDeviceFile.open(mDeviceName); +} + +bool +MockedModbusContext::isConnected() const { + return mDeviceFile.is_open(); +} + +void +MockedModbusContext::disconnect() { + mDeviceFile.close(); +} + + void MockedModbusContext::writeModbusRegisters(int pSlaveId, const modmqttd::RegisterWrite& msg) { std::unique_lock lck(mMutex); @@ -295,9 +326,7 @@ MockedModbusFactory::getOrCreateContext(const char* network) { std::shared_ptr ctx; if (it == mModbusNetworks.end()) { ctx.reset(new MockedModbusContext()); - modmqttd::ModbusNetworkConfig c; - c.mName = network; - ctx->init(c); + ctx->mNetworkName = network; mModbusNetworks[network] = ctx; } else { ctx = it->second; @@ -345,6 +374,13 @@ MockedModbusFactory::setModbusRegisterReadError(const char* network, int slaveId s.setError(regNum, regType); } +void +MockedModbusFactory::setModbusRegisterWriteError(const char* network, int slaveId, int regNum, modmqttd::RegisterType regType) { + regNum--; + std::shared_ptr ctx = getOrCreateContext(network); + MockedModbusContext::Slave& s(ctx->getSlave(slaveId)); + s.setError(regNum, regType); +} void MockedModbusFactory::disconnectModbusSlave(const char* network, int slaveId) { diff --git a/unittests/mockedmodbuscontext.hpp b/unittests/mockedmodbuscontext.hpp index 4acea02..7e5c306 100644 --- a/unittests/mockedmodbuscontext.hpp +++ b/unittests/mockedmodbuscontext.hpp @@ -6,6 +6,9 @@ #include #include #include +#include +#include +#include #include "libmodmqttsrv/imodbuscontext.hpp" #include "libmodmqttsrv/modbus_types.hpp" @@ -59,17 +62,25 @@ class MockedModbusContext : public modmqttd::IModbusContext { } virtual void init(const modmqttd::ModbusNetworkConfig& config); - virtual void connect() { mIsConnected = true; } - virtual bool isConnected() const { return mIsConnected; } - virtual void disconnect() { mIsConnected = false; } + virtual void connect(); + virtual bool isConnected() const; + virtual void disconnect(); + virtual std::vector readModbusRegisters(int slaveId, const modmqttd::RegisterPoll& regData); virtual void writeModbusRegisters(int slaveId, const modmqttd::RegisterWrite& msg); virtual modmqttd::ModbusNetworkConfig::Type getNetworkType() const { return modmqttd::ModbusNetworkConfig::Type::TCPIP; }; virtual uint16_t waitForModbusValue(int slaveId, int regNum, modmqttd::RegisterType regType, uint16_t val, std::chrono::milliseconds timeout); virtual uint16_t getModbusRegisterValue(int slaveId, int regNum, modmqttd::RegisterType regtype); + int getReadCount(int slaveId) const; int getWriteCount(int slaveId) const; + int getConnectionCount() const { return mConnectionCount; } + + void removeFakeDevice() { std::remove(mDeviceName.c_str()); } + + void createFakeDevice(); + std::tuple getLastReadRegisterAddress() const { return std::tuple(mLastPolledSlave, mLastPolledRegister+1); } @@ -79,9 +90,15 @@ class MockedModbusContext : public modmqttd::IModbusContext { Slave& getSlave(int slaveId); - bool mIsConnected = false; bool mInternalOperation = false; std::string mNetworkName; + std::string mDeviceName; + std::fstream mDeviceFile; + + ~MockedModbusContext() { + mDeviceFile.close(); + std::remove(mDeviceName.c_str()); + } private: boost::log::sources::severity_logger log; std::mutex mMutex; @@ -91,6 +108,7 @@ class MockedModbusContext : public modmqttd::IModbusContext { std::chrono::time_point mLastPolTime; int mLastPolledSlave; int mLastPolledRegister; + int mConnectionCount = 0; std::map::iterator findOrCreateSlave(int id); @@ -109,6 +127,7 @@ class MockedModbusFactory : public modmqttd::IModbusFactory { void setModbusRegisterValue(const char* network, int slaveId, int regNum, modmqttd::RegisterType regtype, uint16_t val); uint16_t getModbusRegisterValue(const char* network, int slaveId, int regNum, modmqttd::RegisterType regtype); void setModbusRegisterReadError(const char* network, int slaveId, int regNum, modmqttd::RegisterType regtype); + void setModbusRegisterWriteError(const char* network, int slaveId, int regNum, modmqttd::RegisterType regtype); MockedModbusContext& getMockedModbusContext(const std::string& networkName) const { auto it = mModbusNetworks.find(networkName); @@ -117,6 +136,8 @@ class MockedModbusFactory : public modmqttd::IModbusFactory { void disconnectModbusSlave(const char* network, int slaveId); void connectModbusSlave(const char* network, int slaveId); + void disconnectSerialPortFor(const char* network); + uint16_t waitForModbusValue(const char* network, int slaveId, int regNum, modmqttd::RegisterType regType, uint16_t val, std::chrono::milliseconds timeout); private: std::shared_ptr getOrCreateContext(const char* network); diff --git a/unittests/mockedmqttimpl.cpp b/unittests/mockedmqttimpl.cpp index acfa7ee..82682c1 100644 --- a/unittests/mockedmqttimpl.cpp +++ b/unittests/mockedmqttimpl.cpp @@ -30,6 +30,7 @@ void MockedMqttImpl::subscribe(const char* topic) { std::unique_lock lck(mMutex); mSubscriptions.insert(topic); + BOOST_LOG_SEV(log, modmqttd::Log::info) << "TEST: subscribe " << topic; mCondition.notify_all(); } diff --git a/unittests/mockedserver.hpp b/unittests/mockedserver.hpp index 9a9ab81..e22c8bb 100644 --- a/unittests/mockedserver.hpp +++ b/unittests/mockedserver.hpp @@ -156,10 +156,19 @@ class MockedModMqttServerThread : public ModMqttServerThread { mModbusFactory->connectModbusSlave(network, slaveId); } + void disconnectSerialPortFor(const char* networkName) { + mModbusFactory->getMockedModbusContext(networkName).removeFakeDevice(); + } + + void setModbusRegisterReadError(const char* network, int slaveId, int regNum, modmqttd::RegisterType regtype) { mModbusFactory->setModbusRegisterReadError(network, slaveId, regNum, regtype); } + MockedModbusContext& getMockedModbusContext(const std::string& networkName) const { + return mModbusFactory->getMockedModbusContext(networkName); + } + std::shared_ptr mModbusFactory; std::shared_ptr mMqtt; }; diff --git a/unittests/modbus_executor_single_delay_tests.cpp b/unittests/modbus_executor_single_delay_tests.cpp index 9aa348c..337acf9 100644 --- a/unittests/modbus_executor_single_delay_tests.cpp +++ b/unittests/modbus_executor_single_delay_tests.cpp @@ -12,7 +12,7 @@ #include "mockedmodbuscontext.hpp" #include "../readerwriterqueue/readerwriterqueue.h" -TEST_CASE("ModbusExecutor for single delay config") { +TEST_CASE("ModbusExecutor for first delay config") { moodycamel::BlockingReaderWriterQueue fromModbusQueue; moodycamel::BlockingReaderWriterQueue toModbusQueue; MockedModbusFactory modbus_factory; @@ -29,12 +29,12 @@ TEST_CASE("ModbusExecutor for single delay config") { auto reg1 = registers.addPollDelayed(1, 1, std::chrono::milliseconds(50), modmqttd::ModbusCommandDelay::DelayType::ON_SLAVE_CHANGE); executor.setupInitialPoll(registers); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime == std::chrono::milliseconds::zero()); REQUIRE(executor.allDone()); executor.addPollList(registers); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime == std::chrono::milliseconds::zero()); REQUIRE(executor.allDone()); } @@ -46,10 +46,10 @@ TEST_CASE("ModbusExecutor for single delay config") { auto reg2 = registers.addPoll(2, 20); executor.setupInitialPoll(registers); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(modbus_factory.getLastReadRegisterAddress() == std::tuple(1,1)); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(modbus_factory.getLastReadRegisterAddress() == std::tuple(2,20)); REQUIRE(waitTime == std::chrono::milliseconds::zero()); REQUIRE(executor.allDone()); @@ -60,18 +60,18 @@ TEST_CASE("ModbusExecutor for single delay config") { executor.addPollList(registers); // reg1 should be polled first because we have silence to use // but poll of reg1 need to wait about 10ms more, because reg2 was polled last - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime > std::chrono::milliseconds(5)); REQUIRE(!executor.allDone()); std::this_thread::sleep_for(std::chrono::milliseconds(10)); //poll of reg1 after silence - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime == std::chrono::milliseconds::zero()); REQUIRE(modbus_factory.getLastReadRegisterAddress() == std::tuple(1,1)); REQUIRE(!executor.allDone()); // poll of reg2 - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime == std::chrono::milliseconds::zero()); REQUIRE(modbus_factory.getLastReadRegisterAddress() == std::tuple(2,20)); REQUIRE(executor.allDone()); @@ -83,7 +83,7 @@ TEST_CASE("ModbusExecutor for single delay config") { //initial poll selects reg2 due to longer delay needed executor.setupInitialPoll(registers); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(modbus_factory.getLastReadRegisterAddress() == std::tuple(2,20)); REQUIRE(waitTime == std::chrono::milliseconds::zero()); REQUIRE(!executor.allDone()); @@ -91,7 +91,7 @@ TEST_CASE("ModbusExecutor for single delay config") { //slave changed, reg1 needs 15 ms delay std::this_thread::sleep_for(std::chrono::milliseconds(20)); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(modbus_factory.getLastReadRegisterAddress() == std::tuple(1,1)); REQUIRE(executor.allDone()); @@ -99,21 +99,21 @@ TEST_CASE("ModbusExecutor for single delay config") { std::this_thread::sleep_for(std::chrono::milliseconds(30)); executor.addPollList(registers); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime == std::chrono::milliseconds::zero()); //we choose to poll slave 2 because it better fits into available delay REQUIRE(modbus_factory.getLastReadRegisterAddress() == std::tuple(2,20)); REQUIRE(!executor.allDone()); //reg1 needs silence due to slave change - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime > std::chrono::milliseconds(5)); REQUIRE(!executor.allDone()); //make silence that can acomodate reg1 std::this_thread::sleep_for(std::chrono::milliseconds(15)); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(modbus_factory.getLastReadRegisterAddress() == std::tuple(1,1)); REQUIRE(executor.allDone()); } @@ -122,18 +122,18 @@ TEST_CASE("ModbusExecutor for single delay config") { // execute poll to clean silence period auto reg1 = registers.addPoll(1, 1); executor.setupInitialPoll(registers); - executor.pollNext(); + executor.executeNext(); // add write about to be executed auto write = registers.createWriteDelayed(1,0x15, std::chrono::milliseconds(30)); executor.addWriteCommand(1, write); - waitTime = executor.pollNext(); - REQUIRE(executor.getWaitingRegister() == write); + waitTime = executor.executeNext(); + REQUIRE(executor.getWaitingCommand() == write); // bug - should not drop waiting command when adding new command that needs wait reg1 = registers.addPoll(1, 1, std::chrono::milliseconds(50)); executor.setupInitialPoll(registers); - REQUIRE(executor.getWaitingRegister() == write); + REQUIRE(executor.getWaitingCommand() == write); } } diff --git a/unittests/modbus_executor_tests.cpp b/unittests/modbus_executor_tests.cpp index 0ddcbc0..8a6d8f7 100644 --- a/unittests/modbus_executor_tests.cpp +++ b/unittests/modbus_executor_tests.cpp @@ -27,12 +27,12 @@ TEST_CASE("ModbusExecutor") { SECTION("should return zero duration for empty register set") { executor.setupInitialPoll(registers); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime == std::chrono::milliseconds::zero()); REQUIRE(executor.allDone()); executor.addPollList(registers); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime == std::chrono::milliseconds::zero()); REQUIRE(executor.allDone()); } @@ -42,7 +42,7 @@ TEST_CASE("ModbusExecutor") { auto reg = registers.addPoll(1, 1); executor.setupInitialPoll(registers); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime == std::chrono::milliseconds::zero()); REQUIRE(reg->getValues()[0] == 5); REQUIRE(executor.allDone()); @@ -56,12 +56,12 @@ TEST_CASE("ModbusExecutor") { auto reg2 = registers.addPoll(1, 2); executor.setupInitialPoll(registers); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime == std::chrono::milliseconds::zero()); REQUIRE(reg1->getValues()[0] == 5); REQUIRE(!executor.allDone()); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime == std::chrono::milliseconds::zero()); REQUIRE(reg2->getValues()[0] == 6); REQUIRE(executor.allDone()); @@ -75,12 +75,12 @@ TEST_CASE("ModbusExecutor") { auto reg2 = registers.addPoll(2, 20); executor.setupInitialPoll(registers); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime == std::chrono::milliseconds::zero()); REQUIRE(reg1->getValues()[0] == 5); REQUIRE(!executor.allDone()); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime == std::chrono::milliseconds::zero()); REQUIRE(reg2->getValues()[0] == 60); REQUIRE(executor.allDone()); @@ -91,7 +91,7 @@ TEST_CASE("ModbusExecutor") { auto reg = registers.addPoll(1, 1, std::chrono::milliseconds(5)); executor.setupInitialPoll(registers); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime == std::chrono::milliseconds::zero()); REQUIRE(reg->getValues()[0] == 5); REQUIRE(executor.allDone()); @@ -102,27 +102,27 @@ TEST_CASE("ModbusExecutor") { auto reg = registers.addPollDelayed(1, 1, std::chrono::milliseconds(50)); executor.setupInitialPoll(registers); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(executor.allDone()); SECTION("should delay register read on normal poll") { // need to wait because there is no silence between inital poll and // next poll executor.addPollList(registers); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime > std::chrono::milliseconds(40)); REQUIRE(!executor.allDone()); //simulate shorter wait than required std::this_thread::sleep_for(std::chrono::milliseconds(20)); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(!executor.allDone()); REQUIRE(std::chrono::milliseconds::zero() < waitTime); REQUIRE(waitTime < std::chrono::milliseconds(50)); // required silence period reached std::this_thread::sleep_for(std::chrono::milliseconds(40)); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime == std::chrono::milliseconds::zero()); REQUIRE(reg->getValues()[0] == 5); REQUIRE(executor.allDone()); @@ -132,7 +132,7 @@ TEST_CASE("ModbusExecutor") { std::this_thread::sleep_for(std::chrono::milliseconds(70)); executor.addPollList(registers); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime == std::chrono::milliseconds::zero()); REQUIRE(reg->getValues()[0] == 5); REQUIRE(executor.allDone()); @@ -148,9 +148,9 @@ TEST_CASE("ModbusExecutor") { auto reg2 = registers.addPollDelayed(2, 20, std::chrono::milliseconds(50)); executor.setupInitialPoll(registers); - executor.pollNext(); // 2.20 is polled first because it requires silence + executor.executeNext(); // 2.20 is polled first because it requires silence REQUIRE(reg2->getValues()[0] == 6); - executor.pollNext(); + executor.executeNext(); REQUIRE(reg1->getValues()[0] == 1); REQUIRE(executor.allDone()); @@ -160,12 +160,12 @@ TEST_CASE("ModbusExecutor") { SECTION("should poll waiting register with max delay first") { std::this_thread::sleep_for(std::chrono::milliseconds(100)); executor.addPollList(registers); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime == std::chrono::milliseconds::zero()); REQUIRE(reg2->getValues()[0] == 60); REQUIRE(!executor.allDone()); - waitTime = executor.pollNext(); + waitTime = executor.executeNext(); REQUIRE(waitTime == std::chrono::milliseconds::zero()); REQUIRE(reg1->getValues()[0] == 10); REQUIRE(executor.allDone()); @@ -182,11 +182,11 @@ TEST_CASE("ModbusExecutor") { auto reg3 = registers.addPollDelayed(2, 21, std::chrono::milliseconds(50)); executor.setupInitialPoll(registers); - executor.pollNext(); // 2.21 is polled first because it requires silence + executor.executeNext(); // 2.21 is polled first because it requires silence REQUIRE(reg3->getValues()[0] == 21); - executor.pollNext(); // 2.20 is polled next because we group reads by slave + executor.executeNext(); // 2.20 is polled next because we group reads by slave REQUIRE(reg2->getValues()[0] == 20); - executor.pollNext(); + executor.executeNext(); REQUIRE(reg1->getValues()[0] == 1); REQUIRE(executor.allDone()); @@ -200,7 +200,7 @@ TEST_CASE("ModbusExecutor") { registers.addPoll(1, 10); registers.addPoll(1, 20); - //mWaitingRegister is set to poll 1,1 + //mWaitingCommand is set to poll 1,1 executor.setupInitialPoll(registers); executor.addWriteCommand(1, ModbusExecutorTestRegisters::createWrite(1, 100)); executor.addWriteCommand(1, ModbusExecutorTestRegisters::createWrite(10, 101)); @@ -211,28 +211,28 @@ TEST_CASE("ModbusExecutor") { REQUIRE(executor.getCommandsLeft() == 6); - executor.pollNext(); //poll 1,1 + executor.executeNext(); //poll 1,1 REQUIRE(fromModbusQueue.size_approx() == 1); - executor.pollNext(); //write 1,1 + executor.executeNext(); //write 1,1 REQUIRE(modbus_factory.getModbusRegisterValue("test", 1, 1, modmqttd::RegisterType::HOLDING) == 100); - executor.pollNext(); //poll 1,10 + executor.executeNext(); //poll 1,10 REQUIRE(fromModbusQueue.size_approx() == 2); - executor.pollNext(); //write 1,10 + executor.executeNext(); //write 1,10 REQUIRE(modbus_factory.getModbusRegisterValue("test", 1, 10, modmqttd::RegisterType::HOLDING) == 101); - executor.pollNext(); //poll 1,20 + executor.executeNext(); //poll 1,20 REQUIRE(fromModbusQueue.size_approx() == 3); - executor.pollNext(); //write 1,20 + executor.executeNext(); //write 1,20 REQUIRE(modbus_factory.getModbusRegisterValue("test", 1, 20, modmqttd::RegisterType::HOLDING) == 102); // write only mode, start writing 20x values - executor.pollNext(); //write 1,1 + executor.executeNext(); //write 1,1 REQUIRE(modbus_factory.getModbusRegisterValue("test", 1, 1, modmqttd::RegisterType::HOLDING) == 200); REQUIRE(executor.getCommandsLeft() == modmqttd::ModbusExecutor::WRITE_BATCH_SIZE - 1); - executor.pollNext(); //write 1,10 + executor.executeNext(); //write 1,10 REQUIRE(modbus_factory.getModbusRegisterValue("test", 1, 10, modmqttd::RegisterType::HOLDING) == 201); //back to read/write mode, queues are not setup because we still have something to write @@ -241,14 +241,14 @@ TEST_CASE("ModbusExecutor") { executor.addPollList(registers); REQUIRE(executor.getCommandsLeft() == 8); - executor.pollNext(); //poll 1,1 + executor.executeNext(); //poll 1,1 REQUIRE(fromModbusQueue.size_approx() == 4); - executor.pollNext(); //write 1,20 + executor.executeNext(); //write 1,20 REQUIRE(modbus_factory.getModbusRegisterValue("test", 1, 20, modmqttd::RegisterType::HOLDING) == 202); - executor.pollNext(); //poll 1,10 + executor.executeNext(); //poll 1,10 REQUIRE(fromModbusQueue.size_approx() == 5); - executor.pollNext(); //poll 1,20 + executor.executeNext(); //poll 1,20 REQUIRE(fromModbusQueue.size_approx() == 6); @@ -267,14 +267,14 @@ TEST_CASE("ModbusExecutor") { executor.addWriteCommand(2, ModbusExecutorTestRegisters::createWrite(2, 200)); for (int i = 0; i < modmqttd::ModbusExecutor::WRITE_BATCH_SIZE; i++) { - executor.pollNext(); //write to the first slave + executor.executeNext(); //write to the first slave } REQUIRE(modbus_factory.getModbusRegisterValue("test", 1, 1, modmqttd::RegisterType::HOLDING) == modmqttd::ModbusExecutor::WRITE_BATCH_SIZE); REQUIRE(modbus_factory.getModbusRegisterValue("test", 2, 2, modmqttd::RegisterType::HOLDING) == 20); REQUIRE(executor.getCommandsLeft() == 0); - executor.pollNext(); //switch and write to the second slave + executor.executeNext(); //switch and write to the second slave REQUIRE(modbus_factory.getModbusRegisterValue("test", 1, 1, modmqttd::RegisterType::HOLDING) == modmqttd::ModbusExecutor::WRITE_BATCH_SIZE); REQUIRE(modbus_factory.getModbusRegisterValue("test", 2, 2, modmqttd::RegisterType::HOLDING) == 200); @@ -282,7 +282,7 @@ TEST_CASE("ModbusExecutor") { } - SECTION("should not reelect mWaitingRegister poll is not finished") { + SECTION("should not reelect mWaitingCommand poll is not finished") { modbus_factory.setModbusRegisterValue("test",1,1,modmqttd::RegisterType::HOLDING, 1); modbus_factory.setModbusRegisterValue("test",1,20,modmqttd::RegisterType::HOLDING, 10); modbus_factory.setModbusRegisterValue("test",1,21,modmqttd::RegisterType::HOLDING, 20); @@ -293,16 +293,16 @@ TEST_CASE("ModbusExecutor") { // do initial poll executor.setupInitialPoll(registers); REQUIRE(executor.getCommandsLeft() == 6); - executor.pollNext(); //1,2 - executor.pollNext(); //1,1 or 1,3 - executor.pollNext(); //1,1 or 1,3 + executor.executeNext(); //1,2 + executor.executeNext(); //1,1 or 1,3 + executor.executeNext(); //1,1 or 1,3 // elect 1.2 after enough silence std::this_thread::sleep_for(std::chrono::milliseconds(100)); executor.addPollList(registers); REQUIRE(executor.getCommandsLeft() == 6); - REQUIRE(executor.getWaitingRegister()->getRegister() == 1); - executor.pollNext(); //1,1 + REQUIRE(executor.getWaitingCommand()->getRegister() == 1); + executor.executeNext(); //1,1 // still polling, should not relect 1,2 std::this_thread::sleep_for(std::chrono::milliseconds(100)); @@ -310,7 +310,7 @@ TEST_CASE("ModbusExecutor") { //we do not increase commands left to avoid queue lock if //scheduler adds registers quickly REQUIRE(executor.getCommandsLeft() == 5); - REQUIRE(executor.getWaitingRegister() == nullptr); + REQUIRE(executor.getWaitingCommand() == nullptr); } SECTION("should set next queue to added poll item if queues are empty") { @@ -320,14 +320,57 @@ TEST_CASE("ModbusExecutor") { registers.addPoll(3, 3); executor.setupInitialPoll(registers); - executor.pollNext(); - executor.pollNext(); - executor.pollNext(); + executor.executeNext(); + executor.executeNext(); + executor.executeNext(); REQUIRE(executor.allDone()); registers.clear(); registers.addPoll(4,4); executor.addPollList(registers); - executor.pollNext(); + executor.executeNext(); + REQUIRE(executor.getLastCommand()->getRegister() == 3); } + + + SECTION("should retry last write command mMaxWriteRetryCount times if failed") { + modbus_factory.setModbusRegisterWriteError("test", 1, 1, modmqttd::RegisterType::HOLDING); + + auto cmd(registers.createWrite(1, 0x3)); + cmd->setMaxRetryCounts(0,1); + executor.addWriteCommand(1, cmd); + + executor.executeNext(); + REQUIRE(!executor.allDone()); + REQUIRE(executor.getWaitingCommand()->getRegister() == 0); + + executor.executeNext(); + REQUIRE(executor.allDone()); + REQUIRE(executor.getWaitingCommand() == nullptr); + REQUIRE(executor.getLastCommand()->executedOk() == false); + } + + SECTION("should delay retry of last write command") { + modbus_factory.setModbusRegisterWriteError("test", 1, 1, modmqttd::RegisterType::HOLDING); + + auto cmd(registers.createWriteDelayed(1, 0x3, std::chrono::milliseconds(10))); + cmd->setMaxRetryCounts(0,1); + executor.addWriteCommand(1, cmd); + + executor.executeNext(); + REQUIRE(!executor.allDone()); + REQUIRE(executor.getWaitingCommand()->getRegister() == 0); + + auto delay = executor.executeNext(); + REQUIRE(!executor.allDone()); + REQUIRE(delay > std::chrono::milliseconds(5)); + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + executor.executeNext(); + REQUIRE(executor.allDone()); + REQUIRE(executor.getWaitingCommand() == nullptr); + REQUIRE(executor.getLastCommand()->executedOk() == false); + } + } diff --git a/unittests/modbus_request_queues_tests.cpp b/unittests/modbus_request_queues_tests.cpp index 88d80d3..f92302d 100644 --- a/unittests/modbus_request_queues_tests.cpp +++ b/unittests/modbus_request_queues_tests.cpp @@ -42,7 +42,7 @@ TEST_CASE("ModbusRequestQueues") { auto dur = queue.findForSilencePeriod(std::chrono::milliseconds(100), true); REQUIRE(dur == std::chrono::milliseconds(100)); - std::shared_ptr reg = queue.popFirstWithDelay(std::chrono::milliseconds(100), true); + std::shared_ptr reg = queue.popFirstWithDelay(std::chrono::milliseconds(100), true); REQUIRE(reg->getRegister() == 1); dur = queue.findForSilencePeriod(std::chrono::milliseconds(100), false); diff --git a/unittests/modbus_retry_tests.cpp b/unittests/modbus_retry_tests.cpp new file mode 100644 index 0000000..085876a --- /dev/null +++ b/unittests/modbus_retry_tests.cpp @@ -0,0 +1,88 @@ +#include "catch2/catch_all.hpp" +#include "yaml_utils.hpp" +#include "mockedserver.hpp" + +TEST_CASE ("Write retry") { +TestConfig config(R"( +modbus: + networks: + - name: tcptest + address: localhost + port: 501 + slaves: + - address: 1 + delay_before_command: 50ms +mqtt: + client_id: mqtt_test + broker: + host: localhost + objects: + - topic: write_fail + commands: + - name: set + register: tcptest.1.2 + register_type: holding +)"); + +SECTION ("should issue many write calls if write fails") { + MockedModMqttServerThread server(config.toString()); + server.setModbusRegisterValue("tcptest", 1, 2, modmqttd::RegisterType::HOLDING, 0); + server.mModbusFactory->setModbusRegisterWriteError("tcptest", 1, 2, modmqttd::RegisterType::HOLDING); + + server.start(); + + server.waitForSubscription("write_fail/set"); + server.publish("write_fail/set", "7"); + + // delay before command is 50ms, should try at least 2 times in this period + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + //std::this_thread::sleep_for(std::chrono::minutes(5)); + + + server.stop(); + + //default is 2 + original call + REQUIRE(server.getMockedModbusContext("tcptest").getWriteCount(1) == 3); +} + +SECTION ("should not issue many write calls if write_retries is zeroed") { + config.mYAML["modbus"]["networks"][0]["write_retries"] = 0; + MockedModMqttServerThread server(config.toString()); + server.setModbusRegisterValue("tcptest", 1, 2, modmqttd::RegisterType::HOLDING, 0); + server.mModbusFactory->setModbusRegisterWriteError("tcptest", 1, 2, modmqttd::RegisterType::HOLDING); + + server.start(); + + server.waitForSubscription("write_fail/set"); + server.publish("write_fail/set", "7"); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + server.stop(); + + REQUIRE(server.getMockedModbusContext("tcptest").getWriteCount(1) == 1); +} + + +SECTION ("should issue many write calls if write_retries is set on slave level") { + config.mYAML["modbus"]["networks"][0]["write_retries"] = 0; + config.mYAML["modbus"]["networks"][0]["slaves"][0]["write_retries"] = 2; + MockedModMqttServerThread server(config.toString()); + server.setModbusRegisterValue("tcptest", 1, 2, modmqttd::RegisterType::HOLDING, 0); + server.mModbusFactory->setModbusRegisterWriteError("tcptest", 1, 2, modmqttd::RegisterType::HOLDING); + + server.start(); + + server.waitForSubscription("write_fail/set"); + server.publish("write_fail/set", "7"); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + server.stop(); + + REQUIRE(server.getMockedModbusContext("tcptest").getWriteCount(1) == 3); +} + + +} + diff --git a/unittests/modbus_watchdog_tests.cpp b/unittests/modbus_watchdog_tests.cpp new file mode 100644 index 0000000..6506dc3 --- /dev/null +++ b/unittests/modbus_watchdog_tests.cpp @@ -0,0 +1,117 @@ +#include "catch2/catch_all.hpp" + +#include "defaults.hpp" +#include "mockedserver.hpp" + +TEST_CASE ("Modbus watchdog") { + +SECTION("on TCP network") { + +static const std::string config = R"( +modbus: + networks: + - name: tcptest + address: localhost + port: 501 + watchdog: + watch_period: 300ms +mqtt: + client_id: mqtt_test + refresh: 100ms + broker: + host: localhost + objects: + - topic: slave1 + state: + register: tcptest.1.1 + - topic: slave2 + state: + register: tcptest.2.2 +)"; + + + SECTION("should restart connection if no command can be executed") { + MockedModMqttServerThread server(config); + server.start(); + + server.waitForPublish("slave2/availability"); + REQUIRE(server.mqttValue("slave2/availability") == "1"); + + server.disconnectModbusSlave("tcptest", 1); + server.disconnectModbusSlave("tcptest", 2); + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + server.stop(); + + //one or two attempts to reconnect + REQUIRE(server.getMockedModbusContext("tcptest").getConnectionCount() >= 2); + REQUIRE(server.getMockedModbusContext("tcptest").getConnectionCount() <= 3); + } + + SECTION("should not restart connection if at least one slave is alive") { + MockedModMqttServerThread server(config); + server.start(); + + server.waitForPublish("slave2/availability"); + REQUIRE(server.mqttValue("slave2/availability") == "1"); + + server.disconnectModbusSlave("tcptest", 1); + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + server.stop(); + + //no reconnections, slave1 was alive all the time + REQUIRE(server.getMockedModbusContext("tcptest").getConnectionCount() == 1); + } + +} + +SECTION("on RTU network") { +static const std::string config = R"( +modbus: + networks: + - name: rtutest + device: /tmp/watchdog_test + baud: 9600 + parity: E + data_bit: 8 + stop_bit: 1 + watchdog: + watch_period: 10s +mqtt: + client_id: mqtt_test + refresh: 100ms + broker: + host: localhost + objects: + - topic: slave1 + state: + register: rtutest.1.1 + - topic: slave2 + state: + register: rtutest.2.2 +)"; + + SECTION("should close usb serial port if unplugged") { + MockedModMqttServerThread server(config); + server.start(); + + server.waitForPublish("slave2/availability"); + REQUIRE(server.mqttValue("slave2/availability") == "1"); + + //simulate USB unplug + server.disconnectSerialPortFor("rtutest"); + server.disconnectModbusSlave("rtutest", 1); + server.disconnectModbusSlave("rtutest", 2); + + std::this_thread::sleep_for(std::chrono::seconds(2)); + server.stop(); + + //at least one attempt to reconnect + REQUIRE(server.getMockedModbusContext("rtutest").getConnectionCount() >= 2); + } + + +} + +} //CASE diff --git a/unittests/mqtt_command_only_tests.cpp b/unittests/mqtt_command_only_tests.cpp index e7187ad..019d7a6 100644 --- a/unittests/mqtt_command_only_tests.cpp +++ b/unittests/mqtt_command_only_tests.cpp @@ -12,7 +12,7 @@ static const std::string config = R"( delay_before_first_command: 5ms slaves: - address: 1 - delay_before_first_command: 500ms + delay_before_first_command: 100ms mqtt: client_id: mqtt_test @@ -52,12 +52,11 @@ TEST_CASE ("Write should respect slave delay_before_command") { server.waitForModbusValue("tcptest",1,2, modmqttd::RegisterType::HOLDING, 0x8, std::chrono::milliseconds(700)); auto after = std::chrono::steady_clock::now(); //global is ignored - REQUIRE(after - now >= std::chrono::milliseconds(500)); + REQUIRE(after - now >= std::chrono::milliseconds(100)); server.stop(); } - TEST_CASE ("When network delay") {