Skip to content

Commit

Permalink
merge upstream fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
obiltschnig committed Mar 18, 2024
1 parent 139ca3a commit 5de4e46
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 29 deletions.
6 changes: 3 additions & 3 deletions platform/JS/Bridge/src/BridgeWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class EventTask: public Poco::Util::TimerTask
v8::Handle<v8::Value> args[1];
v8::Local<v8::Object> eventArg = v8::Local<v8::Object>::New(pIsolate, reader.read(istr));
v8::Local<v8::Object> sourceArg(v8::Local<v8::Object>::New(pIsolate, _jsObject));
V8_CHECK_SET_RESULT(eventArg->Set(context, Core::Wrapper::toV8String(pIsolate, "source"s), sourceArg));
V8_CHECK_SET_RESULT(eventArg->Set(context, Core::Wrapper::toV8Internalized(pIsolate, "source"s), sourceArg));
args[0] = eventArg;
_pExecutor->callInContext(pIsolate, context, _jsObject, _event, 1, args);
}
Expand Down Expand Up @@ -674,8 +674,8 @@ void BridgeWrapper::toJSON(const v8::FunctionCallbackInfo<v8::Value>& args)
const std::string& sub = pHolder->subscriberURI();

v8::Local<v8::Object> object = v8::Object::New(pIsolate);
V8_CHECK_SET_RESULT(object->Set(context, Core::Wrapper::toV8String(pIsolate, "$uri"s), Core::Wrapper::toV8String(pIsolate, uri)));
V8_CHECK_SET_RESULT(object->Set(context, Core::Wrapper::toV8String(pIsolate, "$sub"s), Core::Wrapper::toV8String(pIsolate, sub)));
V8_CHECK_SET_RESULT(object->Set(context, Core::Wrapper::toV8Internalized(pIsolate, "$uri"s), Core::Wrapper::toV8String(pIsolate, uri)));
V8_CHECK_SET_RESULT(object->Set(context, Core::Wrapper::toV8Internalized(pIsolate, "$sub"s), Core::Wrapper::toV8String(pIsolate, sub)));

args.GetReturnValue().Set(object);
}
Expand Down
2 changes: 1 addition & 1 deletion protocols/Modbus/RTU/src/BundleActivator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class BundleActivator: public Poco::OSP::BundleActivator
const int speed = _pPrefs->configuration()->getInt(baseKey + ".speed", 9600);
const Poco::Timespan timeout = 1000*_pPrefs->configuration()->getInt(baseKey + ".timeout"s, 2000);
const Poco::Timespan frameTimeout = _pPrefs->configuration()->getInt(baseKey + ".frameTimeout"s, 10000);
const std::size_t maxAsyncQueueSize = _pPrefs->configuration()->getUInt32(baseKey + "maxAsyncQueueSize"s, 32);
const std::size_t maxAsyncQueueSize = _pPrefs->configuration()->getUInt32(baseKey + ".maxAsyncQueueSize"s, 32);

try
{
Expand Down
2 changes: 1 addition & 1 deletion protocols/Modbus/TCP/src/BundleActivator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class BundleActivator: public Poco::OSP::BundleActivator
const Poco::Timespan connectTimeout = Poco::Timespan::MILLISECONDS*connectTimeoutMS;
const bool connectImmediately = !_pPrefs->configuration()->getBool(baseKey + ".lazyConnect"s, false);
const std::size_t maxSimultaneousTransactions = _pPrefs->configuration()->getUInt16(baseKey + ".maxSimultaneousTransactions"s, 16);
const std::size_t maxAsyncQueueSize = _pPrefs->configuration()->getUInt32(baseKey + "maxAsyncQueueSize"s, 256);
const std::size_t maxAsyncQueueSize = _pPrefs->configuration()->getUInt32(baseKey + ".maxAsyncQueueSize"s, 256);

try
{
Expand Down
8 changes: 7 additions & 1 deletion protocols/Modbus/include/IoT/Modbus/ModbusMaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,18 @@ class IoTModbus_API ModbusMaster
/// Fired when no response has been received within the
/// specified timeout for the transaction with the given
/// transaction ID.
///
/// Note that in case of a timeout, also the requestFailed
/// event is triggered, which provides more details about
/// the message.

Poco::BasicEvent<const std::string> error;
/// Fired when an exception has been thrown while reading a message.

Poco::BasicEvent<const RequestFailure> requestFailed;
/// Fired when an asynchronous request message could not be sent.
/// Fired when an asynchronous request message could not be sent,
/// a timeout waiting for the response occured or the request
/// was canceled.

Poco::BasicEvent<const GenericMessage> responseReceived;
/// Fired when a valid, but unsupported Modbus response message has been received.
Expand Down
85 changes: 62 additions & 23 deletions protocols/Modbus/include/IoT/Modbus/ModbusMasterImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -564,11 +564,13 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable

void reset()
{
using namespace std::string_literals;

Poco::FastMutex::ScopedLock lock(_mutex);

if (_pAsyncThread)
{
abortPending(FAILURE_RESET);
abortPending(FAILURE_RESET, "Reset"s);
_asyncQueue.clear();
_asyncQueue.enqueueNotification(new ResetNotification);
}
Expand All @@ -586,6 +588,7 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable
Poco::UInt8 functionCode = 0;
Poco::UInt8 slaveOrUnitAddress = 0;
Poco::Timestamp timeSent;
bool sent = false;
};
using PendingMap = std::map<Poco::UInt16, PendingInfo>;

Expand Down Expand Up @@ -641,7 +644,7 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable
_logger.trace("Sending frame: functionCode=%02x, slaveOrUnit=%02x"s, static_cast<unsigned>(message.functionCode), static_cast<unsigned>(message.slaveOrUnitAddress));
}

if (_pAsyncThread && _asyncQueue.size() > _maxAsyncQueueSize)
if (_pAsyncThread && _asyncQueue.size() >= _maxAsyncQueueSize)
{
throw Poco::ProtocolException("Maximum number of queued requests exceeded");
}
Expand Down Expand Up @@ -702,11 +705,19 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable

void run()
{
using namespace std::string_literals;

bool stopped = false;
const std::size_t maxPending = _pPort->maxSimultaneousTransactions();
while (!stopped)
{
const std::size_t pending = countPending();
const auto p = countPendingSent();
std::size_t pendingSent = p.first;
const std::size_t pending = p.second;
if (_logger.debug())
{
_logger.debug("Currently pending requests: %z, sent: %z."s, pending, pendingSent);
}
const long dequeueTimeout = pending == 0 ? 100 : 0;
Poco::Notification::Ptr pNf = _asyncQueue.waitDequeueNotification(dequeueTimeout);
if (pNf)
Expand All @@ -724,24 +735,26 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable
Poco::AutoPtr<SendNotification> pSendNf = pNf.cast<SendNotification>();
if (pSendNf)
{
if (pending <= maxPending)
if (pendingSent < maxPending)
{
try
{
pSendNf->run();
}
catch (Poco::Exception& exc)
{
removePending(pSendNf->message().transactionID);
RequestFailure requestFailure;
requestFailure.slaveOrUnitAddress = pSendNf->message().slaveOrUnitAddress;
requestFailure.functionCode = pSendNf->message().functionCode;
requestFailure.transactionID = pSendNf->message().transactionID;
requestFailure.reason = FAILURE_ERROR;
requestFailure.message = exc.displayText();
this->requestFailed(this, requestFailure);
try
{
markPendingSent(pSendNf->message().transactionID);
pendingSent++;
pSendNf->run();
}
catch (Poco::Exception& exc)
{
removePending(pSendNf->message().transactionID);
RequestFailure requestFailure;
requestFailure.slaveOrUnitAddress = pSendNf->message().slaveOrUnitAddress;
requestFailure.functionCode = pSendNf->message().functionCode;
requestFailure.transactionID = pSendNf->message().transactionID;
requestFailure.reason = FAILURE_ERROR;
requestFailure.message = exc.displayText();
this->requestFailed(this, requestFailure);
}
}
}
else
{
// Put back send notification and try again later
Expand All @@ -751,11 +764,11 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable
}
}

if (!stopped && pending> 0)
if (!stopped && pendingSent > 0)
{
try
{
Poco::Timespan timeout(0, _asyncQueue.empty() || pending > maxPending ? 250 : 0);
Poco::Timespan timeout(0, _asyncQueue.empty() || pendingSent >= maxPending ? 1000 : 0);
if (_pPort->poll(timeout))
{
try
Expand Down Expand Up @@ -953,6 +966,8 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable

void processPendingTimeouts()
{
using namespace std::string_literals;

PendingMap timeouts;
{
Poco::FastMutex::ScopedLock lock(_pendingMutex);
Expand Down Expand Up @@ -981,11 +996,12 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable
failure.functionCode = p.second.functionCode;
failure.slaveOrUnitAddress = p.second.slaveOrUnitAddress;
failure.reason = FAILURE_TIMEOUT;
failure.message = "Timeout"s;
this->requestFailed(this, failure);
}
}

void abortPending(RequestFailureReason reason, const std::string& message = std::string())
void abortPending(RequestFailureReason reason, const std::string& message)
{
PendingMap pendingMap;
{
Expand Down Expand Up @@ -1024,6 +1040,29 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable
return _pendingMap.size();
}

std::pair<std::size_t, std::size_t> countPendingSent() const
{
std::size_t sent = 0;
std::size_t total = 0;
Poco::FastMutex::ScopedLock lock(_pendingMutex);
for (const auto& p: _pendingMap)
{
if (p.second.sent) sent++;
total++;
}
return std::make_pair(sent, total);
}

void markPendingSent(Poco::UInt16 transactionID)
{
Poco::FastMutex::ScopedLock lock(_pendingMutex);
auto it = _pendingMap.find(transactionID);
if (it != _pendingMap.end())
{
it->second.sent = true;
}
}

void clearPending()
{
Poco::FastMutex::ScopedLock lock(_pendingMutex);
Expand All @@ -1041,7 +1080,7 @@ class IoTModbus_API ModbusMasterImpl: public ModbusMaster, public Poco::Runnable

if (state == CONNECTION_CLOSING || state == CONNECTION_CLOSED)
{
abortPending(FAILURE_CLOSED);
abortPending(FAILURE_CLOSED, "Connection closed"s);
}
this->connectionStateChanged(this, state);
}
Expand Down

0 comments on commit 5de4e46

Please sign in to comment.