Skip to content

Commit cde9242

Browse files
authored
Merge pull request #106 from kube-HPC/skip_fetch
skip commit
2 parents 1d1862e + fb30f24 commit cde9242

File tree

2 files changed

+23
-15
lines changed

2 files changed

+23
-15
lines changed

hkube_python_wrapper/communication/zmq/streaming/ZMQListener.py

+22-14
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66

77
context = zmq.Context()
88
POLL_MS = 1000
9+
MAX_SKIPS = 150
910
MAX_POLLS = 5
1011

12+
1113
class ZMQListener(object):
1214
def __init__(self, remoteAddress, onMessage, encoding, consumerType):
1315
self._encoding = encoding
@@ -18,6 +20,8 @@ def __init__(self, remoteAddress, onMessage, encoding, consumerType):
1820
self._pollTimeoutCount = 0
1921
self._remoteAddress = remoteAddress
2022
self._worker = self._worker_socket(remoteAddress)
23+
self._numberOfNoMsg = 0
24+
self._numberOfTimesSkipped = 0
2125

2226
def _worker_socket(self, remoteAddress):
2327
"""Helper function that returns a new configured socket
@@ -43,24 +47,26 @@ def fetch(self):
4347
if (self._active is False):
4448
time.sleep(0.2)
4549
return
46-
47-
if (self._pollTimeoutCount == MAX_POLLS):
48-
log.warning('ZMQListener poll timeout reached')
49-
self._pollTimeoutCount = 0
50-
self._worker.close()
51-
self._worker = self._worker_socket(self._remoteAddress)
52-
53-
if (self._pollTimeoutCount > 0):
50+
skip = self._numberOfTimesSkipped < MAX_SKIPS and self._numberOfNoMsg > 0
51+
if not skip:
52+
self._numberOfTimesSkipped = 0
53+
if (self._pollTimeoutCount == MAX_POLLS):
54+
log.warning('ZMQListener poll timeout reached')
55+
self._pollTimeoutCount = 0
56+
self._worker.close()
57+
self._worker = self._worker_socket(self._remoteAddress)
58+
if (self._pollTimeoutCount > 0):
59+
self._readMessage()
60+
return
61+
self._send(signals.PPP_READY)
5462
self._readMessage()
55-
return
56-
57-
self._send(signals.PPP_READY)
58-
self._readMessage()
59-
63+
else:
64+
time.sleep(0.01)
65+
self._numberOfTimesSkipped += 1
6066
except Exception as e:
6167
log.error('ZMQListener.fetch {e}', e=str(e))
6268
finally:
63-
if(self._active is False):
69+
if (self._active is False):
6470
self._working = False
6571

6672
def _readMessage(self, timeout=POLL_MS):
@@ -75,7 +81,9 @@ def _readMessage(self, timeout=POLL_MS):
7581
hasMsg = True
7682
msgResult = self._handleAMessage(frames)
7783
self._send(signals.PPP_DONE, msgResult)
84+
self._numberOfNoMsg = 0
7885
else:
86+
self._numberOfNoMsg += 1
7987
time.sleep(0.005)
8088
else:
8189
self._pollTimeoutCount += 1

tests/test_streaming.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def statsInvoked(args):
8484
streamingManagerC.startMessageListening()
8585

8686
streamingManagerA.sendMessage('msg from A')
87-
time.sleep(1)
87+
time.sleep(2)
8888
assert resultsAtB['data'] == 'msg from A'
8989
assert resultsAtB['origin'] == 'A'
9090
assert resultsAtC['data'] == 'msg from B'

0 commit comments

Comments
 (0)