Skip to content

Commit 018df18

Browse files
authored
Merge pull request #103 from kube-HPC/move_stop_to_end
Move stop to end
2 parents f4fe7d3 + 862721b commit 018df18

File tree

8 files changed

+24
-32
lines changed

8 files changed

+24
-32
lines changed

Diff for: hkube_python_wrapper/communication/streaming/MessageListener.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def onMessage(self, messageFlowPattern, header, msg):
3131
return self._encoding.encode({'duration': round(duration, 4)}, plainEncode=True)
3232

3333
def fetch(self):
34-
return self.adapater.fetch()
34+
self.adapater.fetch()
3535

3636
def close(self, force=True):
3737
closed = False

Diff for: hkube_python_wrapper/communication/streaming/StreamingListener.py

+11-9
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,30 @@
1+
import threading
12
import time
2-
from hkube_python_wrapper.util.DaemonThread import DaemonThread
3+
import uuid
34

5+
from hkube_python_wrapper.util.DaemonThread import DaemonThread
46

57
class StreamingListener(DaemonThread):
68

7-
def __init__(self, messageListeners, balcFetchSize=100):
8-
self._balcFetchSize = balcFetchSize
9+
def __init__(self, messageListeners):
910
self._listeningToMessages = True
1011
self._working = True
1112
self._messageListeners = messageListeners
1213
DaemonThread.__init__(self, "StreamingListener")
13-
14+
def fetch(self,messageListener):
15+
messageListener.fetch()
1416
def run(self):
1517
while (self._listeningToMessages):
1618
messageListeners = self._messageListeners()
1719
if (not messageListeners):
1820
time.sleep(1) # free some cpu
1921
continue
2022
for listener in messageListeners:
21-
hasMessage = True
22-
count = 0
23-
while hasMessage and count < self._balcFetchSize:
24-
hasMessage = listener.fetch()
25-
count = count + 1
23+
threadName = listener.messageOriginNodeName + "_" + str(uuid.uuid4())
24+
listener.thread = threading.Thread(name=threadName,target=self.fetch, args=[listener])
25+
listener.thread.start()
26+
for listener in messageListeners:
27+
listener.thread.join()
2628
self._working = False
2729

2830
def stop(self, force=True):

Diff for: hkube_python_wrapper/communication/streaming/StreamingManager.py

+4-9
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,12 @@ def __init__(self):
1818
self.parsedFlows = {}
1919
self.defaultFlow = None
2020
self._streamingListener = None
21-
self.balcFetchSize = None
21+
2222

2323
def setParsedFlows(self, flows, defaultFlow):
2424
self.parsedFlows = flows
2525
self.defaultFlow = defaultFlow
2626

27-
def setListenerBalcFetchSize(self, balcFetchSize):
28-
self.balcFetchSize = balcFetchSize
29-
3027
def setupStreamingProducer(self, onStatistics, producerConfig, nextNodes, nodeName):
3128
self.messageProducer = MessageProducer(producerConfig, nextNodes, nodeName)
3229
self.messageProducer.registerStatisticsListener(onStatistics)
@@ -86,18 +83,16 @@ def startMessageListening(self):
8683
self.listeningToMessages = True
8784
if (self._isStarted is False):
8885
self._isStarted = True
89-
self._streamingListener = StreamingListener(self._getMessageListeners, self.balcFetchSize)
86+
self._streamingListener = StreamingListener(self._getMessageListeners)
9087
self._streamingListener.start()
9188

9289
def sendMessage(self, msg, flowName=None):
9390
if (self.messageProducer is None):
94-
raise Exception(
95-
'Trying to send a message from a none stream pipeline or after close had been applied on algorithm')
91+
raise Exception('Trying to send a message from a none stream pipeline or after close had been applied on algorithm')
9692
if (self.messageProducer.nodeNames):
9793
parsedFlow = None
9894
if (flowName is None):
99-
if hasattr(self.threadLocalStorage,
100-
'messageFlowPattern') and self.threadLocalStorage.messageFlowPattern:
95+
if hasattr(self.threadLocalStorage, 'messageFlowPattern') and self.threadLocalStorage.messageFlowPattern:
10196
parsedFlow = self.threadLocalStorage.messageFlowPattern
10297
else:
10398
if (self.defaultFlow is None):

Diff for: hkube_python_wrapper/communication/zmq/streaming/ZMQListener.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def fetch(self):
4242
try:
4343
if (self._active is False):
4444
time.sleep(0.2)
45+
return
4546

4647
if (self._pollTimeoutCount == MAX_POLLS):
4748
log.warning('ZMQListener poll timeout reached')
@@ -50,14 +51,14 @@ def fetch(self):
5051
self._worker = self._worker_socket(self._remoteAddress)
5152

5253
if (self._pollTimeoutCount > 0):
53-
return self._readMessage()
54+
self._readMessage()
55+
return
5456

5557
self._send(signals.PPP_READY)
56-
return self._readMessage()
58+
self._readMessage()
5759

5860
except Exception as e:
5961
log.error('ZMQListener.fetch {e}', e=str(e))
60-
return False
6162
finally:
6263
if(self._active is False):
6364
self._working = False

Diff for: hkube_python_wrapper/config/config.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ def getBoolEnv(name, defaultValue):
3333
"messagesMemoryBuff": getIntEnv('STREAMING_MAX_BUFFER_MB', 1500),
3434
"port": os.environ.get('STREAMING_DISCOVERY_PORT', 9022),
3535
"statisticsInterval": os.environ.get('STREAMING_STATISTICS_INTERVAL', 2),
36-
"stateful": getBoolEnv('STREAMING_STATEFUL', 'True'),
37-
"balcFetchSize": getIntEnv('STREAMING_BALC_SIZE', '100')
36+
"stateful": getBoolEnv('STREAMING_STATEFUL', 'True')
3837
}
38+
3939
}
4040
algorithm = {
4141
"path": os.environ.get('ALGORITHM_PATH', "algorithm_unique_folder"),

Diff for: hkube_python_wrapper/wrapper/algorunner.py

-1
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,6 @@ def onStatistics(statistics):
361361
def _start(self, options):
362362
if (self._job.isStreaming):
363363
self.streamingManager.setParsedFlows(self._job.parsedFlow, self._job.defaultFlow)
364-
self.streamingManager.setListenerBalcFetchSize(config.discovery["streaming"]["balcFetchSize"])
365364
if (self._job.childs):
366365
self._setupStreamingProducer(self._job.nodeName)
367366
self.streamingManager.clearMessageListeners()

Diff for: tests/configs/config.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ def getBoolEnv(name, defaultValue):
2222
"encoding": os.environ.get('DISCOVERY_ENCODING', 'msgpack'),
2323
"timeout": getIntEnv('DISCOVERY_TIMEOUT', 5000),
2424
"networkTimeout": getIntEnv('DISCOVERY_NETWORK_TIMEOUT', 1000),
25-
"maxCacheSize": getIntEnv('DISCOVERY_MAX_CACHE_SIZE', 400),
26-
"balcFetchSize": getIntEnv('STREAMING_BALC_SIZE', '100')
25+
"maxCacheSize": getIntEnv('DISCOVERY_MAX_CACHE_SIZE', 400)
2726
}
2827
algorithm = {
2928
"path": os.environ.get('ALGORITHM_PATH', "algorithm_unique_folder"),

Diff for: tests/test_streaming.py

+1-5
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ def statsInvoked(args):
2424
listen_config = {'encoding': 'msgpack'}
2525
streamingManager = StreamingManager()
2626
streamingManager.setParsedFlows(parsedFlows, 'analyze')
27-
streamingManager.setListenerBalcFetchSize(100)
2827
streamingManager.setupStreamingProducer(statsInvoked, producer_config, [nodeName], 'A')
2928
streamingManager.setupStreamingListeners(listen_config, parents, nodeName)
3029
streamingManager.registerInputListener(onMessage)
@@ -70,12 +69,10 @@ def statsInvoked(args):
7069

7170
streamingManagerA = StreamingManager()
7271
streamingManagerA.setParsedFlows(parsedFlows, 'analyze')
73-
streamingManagerA.setListenerBalcFetchSize(100)
7472
streamingManagerA.setupStreamingProducer(statsInvoked, producer_configA, ['B'], 'A')
7573

7674
streamingManagerB = StreamingManager()
7775
streamingManagerB.setParsedFlows(parsedFlows, 'analyze')
78-
streamingManagerB.setListenerBalcFetchSize(100)
7976
streamingManagerB.setupStreamingProducer(statsInvoked, producer_configB, ['C'], 'B')
8077
streamingManagerB.setupStreamingListeners(listen_config, parents1, 'B')
8178
streamingManagerB.registerInputListener(onMessageAtB)
@@ -84,11 +81,10 @@ def statsInvoked(args):
8481
streamingManagerC = StreamingManager()
8582
streamingManagerC.setupStreamingListeners(listen_config, parents2, 'C')
8683
streamingManagerC.registerInputListener(onMessageAtC)
87-
streamingManagerC.setListenerBalcFetchSize(100)
8884
streamingManagerC.startMessageListening()
8985

9086
streamingManagerA.sendMessage('msg from A')
91-
time.sleep(3)
87+
time.sleep(1)
9288
assert resultsAtB['data'] == 'msg from A'
9389
assert resultsAtB['origin'] == 'A'
9490
assert resultsAtC['data'] == 'msg from B'

0 commit comments

Comments
 (0)