From 2475993ad054f157bb7a756d3399e29965f2ef77 Mon Sep 17 00:00:00 2001 From: golanha Date: Tue, 26 Dec 2023 17:16:16 +0200 Subject: [PATCH 1/2] skip commit --- .../zmq/streaming/ZMQListener.py | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/hkube_python_wrapper/communication/zmq/streaming/ZMQListener.py b/hkube_python_wrapper/communication/zmq/streaming/ZMQListener.py index 3fa87614..2464812e 100644 --- a/hkube_python_wrapper/communication/zmq/streaming/ZMQListener.py +++ b/hkube_python_wrapper/communication/zmq/streaming/ZMQListener.py @@ -6,8 +6,10 @@ context = zmq.Context() POLL_MS = 1000 +MAX_SKIPS = 150 MAX_POLLS = 5 + class ZMQListener(object): def __init__(self, remoteAddress, onMessage, encoding, consumerType): self._encoding = encoding @@ -18,6 +20,8 @@ def __init__(self, remoteAddress, onMessage, encoding, consumerType): self._pollTimeoutCount = 0 self._remoteAddress = remoteAddress self._worker = self._worker_socket(remoteAddress) + self._numberOfNoMsg = 0 + self._numberOfTimesSkipped = 0 def _worker_socket(self, remoteAddress): """Helper function that returns a new configured socket @@ -43,24 +47,26 @@ def fetch(self): if (self._active is False): time.sleep(0.2) return - - if (self._pollTimeoutCount == MAX_POLLS): - log.warning('ZMQListener poll timeout reached') - self._pollTimeoutCount = 0 - self._worker.close() - self._worker = self._worker_socket(self._remoteAddress) - - if (self._pollTimeoutCount > 0): + skip = self._numberOfTimesSkipped < MAX_SKIPS and self._numberOfNoMsg > 0 + if not skip: + self._numberOfTimesSkipped = 0 + if (self._pollTimeoutCount == MAX_POLLS): + log.warning('ZMQListener poll timeout reached') + self._pollTimeoutCount = 0 + self._worker.close() + self._worker = self._worker_socket(self._remoteAddress) + if (self._pollTimeoutCount > 0): + self._readMessage() + return + self._send(signals.PPP_READY) self._readMessage() - return - - self._send(signals.PPP_READY) - self._readMessage() - + else: + time.sleep(0.01) + self._numberOfTimesSkipped += 1 except Exception as e: log.error('ZMQListener.fetch {e}', e=str(e)) finally: - if(self._active is False): + if (self._active is False): self._working = False def _readMessage(self, timeout=POLL_MS): @@ -75,7 +81,9 @@ def _readMessage(self, timeout=POLL_MS): hasMsg = True msgResult = self._handleAMessage(frames) self._send(signals.PPP_DONE, msgResult) + self._numberOfNoMsg = 0 else: + self._numberOfNoMsg += 1 time.sleep(0.005) else: self._pollTimeoutCount += 1 From fb30f240b3b4f1fa48e2d6b41a88d8fb669c5063 Mon Sep 17 00:00:00 2001 From: golanha Date: Tue, 26 Dec 2023 17:33:19 +0200 Subject: [PATCH 2/2] tests fix --- tests/test_streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 6e7fbd9d..61187432 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -84,7 +84,7 @@ def statsInvoked(args): streamingManagerC.startMessageListening() streamingManagerA.sendMessage('msg from A') - time.sleep(1) + time.sleep(2) assert resultsAtB['data'] == 'msg from A' assert resultsAtB['origin'] == 'A' assert resultsAtC['data'] == 'msg from B'