diff --git a/platform/JS/Bridge/src/BridgeWrapper.cpp b/platform/JS/Bridge/src/BridgeWrapper.cpp index 6dd6fa024..9a4a70f31 100644 --- a/platform/JS/Bridge/src/BridgeWrapper.cpp +++ b/platform/JS/Bridge/src/BridgeWrapper.cpp @@ -238,7 +238,7 @@ class EventTask: public Poco::Util::TimerTask v8::Handle args[1]; v8::Local eventArg = v8::Local::New(pIsolate, reader.read(istr)); v8::Local sourceArg(v8::Local::New(pIsolate, _jsObject)); - V8_CHECK_SET_RESULT(eventArg->Set(context, Core::Wrapper::toV8String(pIsolate, "source"s), sourceArg)); + V8_CHECK_SET_RESULT(eventArg->Set(context, Core::Wrapper::toV8Internalized(pIsolate, "source"s), sourceArg)); args[0] = eventArg; _pExecutor->callInContext(pIsolate, context, _jsObject, _event, 1, args); } @@ -674,8 +674,8 @@ void BridgeWrapper::toJSON(const v8::FunctionCallbackInfo& args) const std::string& sub = pHolder->subscriberURI(); v8::Local object = v8::Object::New(pIsolate); - V8_CHECK_SET_RESULT(object->Set(context, Core::Wrapper::toV8String(pIsolate, "$uri"s), Core::Wrapper::toV8String(pIsolate, uri))); - V8_CHECK_SET_RESULT(object->Set(context, Core::Wrapper::toV8String(pIsolate, "$sub"s), Core::Wrapper::toV8String(pIsolate, sub))); + V8_CHECK_SET_RESULT(object->Set(context, Core::Wrapper::toV8Internalized(pIsolate, "$uri"s), Core::Wrapper::toV8String(pIsolate, uri))); + V8_CHECK_SET_RESULT(object->Set(context, Core::Wrapper::toV8Internalized(pIsolate, "$sub"s), Core::Wrapper::toV8String(pIsolate, sub))); args.GetReturnValue().Set(object); } diff --git a/protocols/Modbus/RTU/src/BundleActivator.cpp b/protocols/Modbus/RTU/src/BundleActivator.cpp index 9a81e823a..955a5b3ba 100644 --- a/protocols/Modbus/RTU/src/BundleActivator.cpp +++ b/protocols/Modbus/RTU/src/BundleActivator.cpp @@ -88,7 +88,7 @@ class BundleActivator: public Poco::OSP::BundleActivator const int speed = _pPrefs->configuration()->getInt(baseKey + ".speed", 9600); const Poco::Timespan timeout = 1000*_pPrefs->configuration()->getInt(baseKey + ".timeout"s, 2000); const Poco::Timespan frameTimeout = _pPrefs->configuration()->getInt(baseKey + ".frameTimeout"s, 10000); - const std::size_t maxAsyncQueueSize = _pPrefs->configuration()->getUInt32(baseKey + "maxAsyncQueueSize"s, 32); + const std::size_t maxAsyncQueueSize = _pPrefs->configuration()->getUInt32(baseKey + ".maxAsyncQueueSize"s, 32); try { diff --git a/protocols/Modbus/TCP/src/BundleActivator.cpp b/protocols/Modbus/TCP/src/BundleActivator.cpp index 69265c5a4..e0a9daaeb 100644 --- a/protocols/Modbus/TCP/src/BundleActivator.cpp +++ b/protocols/Modbus/TCP/src/BundleActivator.cpp @@ -92,7 +92,7 @@ class BundleActivator: public Poco::OSP::BundleActivator const Poco::Timespan connectTimeout = Poco::Timespan::MILLISECONDS*connectTimeoutMS; const bool connectImmediately = !_pPrefs->configuration()->getBool(baseKey + ".lazyConnect"s, false); const std::size_t maxSimultaneousTransactions = _pPrefs->configuration()->getUInt16(baseKey + ".maxSimultaneousTransactions"s, 16); - const std::size_t maxAsyncQueueSize = _pPrefs->configuration()->getUInt32(baseKey + "maxAsyncQueueSize"s, 256); + const std::size_t maxAsyncQueueSize = _pPrefs->configuration()->getUInt32(baseKey + ".maxAsyncQueueSize"s, 256); try { diff --git a/protocols/Modbus/include/IoT/Modbus/ModbusMaster.h b/protocols/Modbus/include/IoT/Modbus/ModbusMaster.h index 5ea82f436..51369c009 100644 --- a/protocols/Modbus/include/IoT/Modbus/ModbusMaster.h +++ b/protocols/Modbus/include/IoT/Modbus/ModbusMaster.h @@ -71,12 +71,18 @@ class IoTModbus_API ModbusMaster /// Fired when no response has been received within the /// specified timeout for the transaction with the given /// transaction ID. + /// + /// Note that in case of a timeout, also the requestFailed + /// event is triggered, which provides more details about + /// the message. Poco::BasicEvent error; /// Fired when an exception has been thrown while reading a message. Poco::BasicEvent requestFailed; - /// Fired when an asynchronous request message could not be sent. + /// Fired when an asynchronous request message could not be sent, + /// a timeout waiting for the response occured or the request + /// was canceled. Poco::BasicEvent responseReceived; /// Fired when a valid, but unsupported Modbus response message has been received. diff --git a/protocols/Modbus/include/IoT/Modbus/ModbusMasterImpl.h b/protocols/Modbus/include/IoT/Modbus/ModbusMasterImpl.h index 8054f532f..37ddee4bb 100644 --- a/protocols/Modbus/include/IoT/Modbus/ModbusMasterImpl.h +++ b/protocols/Modbus/include/IoT/Modbus/ModbusMasterImpl.h @@ -564,11 +564,13 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable void reset() { + using namespace std::string_literals; + Poco::FastMutex::ScopedLock lock(_mutex); if (_pAsyncThread) { - abortPending(FAILURE_RESET); + abortPending(FAILURE_RESET, "Reset"s); _asyncQueue.clear(); _asyncQueue.enqueueNotification(new ResetNotification); } @@ -586,6 +588,7 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable Poco::UInt8 functionCode = 0; Poco::UInt8 slaveOrUnitAddress = 0; Poco::Timestamp timeSent; + bool sent = false; }; using PendingMap = std::map; @@ -641,7 +644,7 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable _logger.trace("Sending frame: functionCode=%02x, slaveOrUnit=%02x"s, static_cast(message.functionCode), static_cast(message.slaveOrUnitAddress)); } - if (_pAsyncThread && _asyncQueue.size() > _maxAsyncQueueSize) + if (_pAsyncThread && _asyncQueue.size() >= _maxAsyncQueueSize) { throw Poco::ProtocolException("Maximum number of queued requests exceeded"); } @@ -702,11 +705,19 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable void run() { + using namespace std::string_literals; + bool stopped = false; const std::size_t maxPending = _pPort->maxSimultaneousTransactions(); while (!stopped) { - const std::size_t pending = countPending(); + const auto p = countPendingSent(); + std::size_t pendingSent = p.first; + const std::size_t pending = p.second; + if (_logger.debug()) + { + _logger.debug("Currently pending requests: %z, sent: %z."s, pending, pendingSent); + } const long dequeueTimeout = pending == 0 ? 100 : 0; Poco::Notification::Ptr pNf = _asyncQueue.waitDequeueNotification(dequeueTimeout); if (pNf) @@ -724,24 +735,26 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable Poco::AutoPtr pSendNf = pNf.cast(); if (pSendNf) { - if (pending <= maxPending) + if (pendingSent < maxPending) { - try - { - pSendNf->run(); - } - catch (Poco::Exception& exc) - { - removePending(pSendNf->message().transactionID); - RequestFailure requestFailure; - requestFailure.slaveOrUnitAddress = pSendNf->message().slaveOrUnitAddress; - requestFailure.functionCode = pSendNf->message().functionCode; - requestFailure.transactionID = pSendNf->message().transactionID; - requestFailure.reason = FAILURE_ERROR; - requestFailure.message = exc.displayText(); - this->requestFailed(this, requestFailure); + try + { + markPendingSent(pSendNf->message().transactionID); + pendingSent++; + pSendNf->run(); + } + catch (Poco::Exception& exc) + { + removePending(pSendNf->message().transactionID); + RequestFailure requestFailure; + requestFailure.slaveOrUnitAddress = pSendNf->message().slaveOrUnitAddress; + requestFailure.functionCode = pSendNf->message().functionCode; + requestFailure.transactionID = pSendNf->message().transactionID; + requestFailure.reason = FAILURE_ERROR; + requestFailure.message = exc.displayText(); + this->requestFailed(this, requestFailure); + } } - } else { // Put back send notification and try again later @@ -751,11 +764,11 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable } } - if (!stopped && pending> 0) + if (!stopped && pendingSent > 0) { try { - Poco::Timespan timeout(0, _asyncQueue.empty() || pending > maxPending ? 250 : 0); + Poco::Timespan timeout(0, _asyncQueue.empty() || pendingSent >= maxPending ? 1000 : 0); if (_pPort->poll(timeout)) { try @@ -953,6 +966,8 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable void processPendingTimeouts() { + using namespace std::string_literals; + PendingMap timeouts; { Poco::FastMutex::ScopedLock lock(_pendingMutex); @@ -981,11 +996,12 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable failure.functionCode = p.second.functionCode; failure.slaveOrUnitAddress = p.second.slaveOrUnitAddress; failure.reason = FAILURE_TIMEOUT; + failure.message = "Timeout"s; this->requestFailed(this, failure); } } - void abortPending(RequestFailureReason reason, const std::string& message = std::string()) + void abortPending(RequestFailureReason reason, const std::string& message) { PendingMap pendingMap; { @@ -1024,6 +1040,29 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable return _pendingMap.size(); } + std::pair countPendingSent() const + { + std::size_t sent = 0; + std::size_t total = 0; + Poco::FastMutex::ScopedLock lock(_pendingMutex); + for (const auto& p: _pendingMap) + { + if (p.second.sent) sent++; + total++; + } + return std::make_pair(sent, total); + } + + void markPendingSent(Poco::UInt16 transactionID) + { + Poco::FastMutex::ScopedLock lock(_pendingMutex); + auto it = _pendingMap.find(transactionID); + if (it != _pendingMap.end()) + { + it->second.sent = true; + } + } + void clearPending() { Poco::FastMutex::ScopedLock lock(_pendingMutex); @@ -1041,7 +1080,7 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable if (state == CONNECTION_CLOSING || state == CONNECTION_CLOSED) { - abortPending(FAILURE_CLOSED); + abortPending(FAILURE_CLOSED, "Connection closed"s); } this->connectionStateChanged(this, state); }