From ef66e25a1a5514e170d763819babf5bf27c5635f Mon Sep 17 00:00:00 2001 From: golanha Date: Sun, 10 Dec 2023 14:59:48 +0200 Subject: [PATCH 1/3] Revert "Merge pull request #97 from kube-HPC/bolc" This reverts commit 4f3672d26793d2d7f25d5ba1febf7abbab9545db, reversing changes made to 16e5a4142503b159b19d4c951868e101ee843301. --- .../communication/streaming/MessageListener.py | 2 +- .../communication/streaming/StreamingListener.py | 10 ++-------- .../communication/streaming/StreamingManager.py | 13 ++++--------- .../communication/zmq/streaming/ZMQListener.py | 7 ++++--- hkube_python_wrapper/config/config.py | 4 ++-- hkube_python_wrapper/wrapper/algorunner.py | 1 - tests/configs/config.py | 3 +-- tests/test_streaming.py | 6 +----- 8 files changed, 15 insertions(+), 31 deletions(-) diff --git a/hkube_python_wrapper/communication/streaming/MessageListener.py b/hkube_python_wrapper/communication/streaming/MessageListener.py index f86c7af6..3f108361 100644 --- a/hkube_python_wrapper/communication/streaming/MessageListener.py +++ b/hkube_python_wrapper/communication/streaming/MessageListener.py @@ -31,7 +31,7 @@ def onMessage(self, messageFlowPattern, header, msg): return self._encoding.encode({'duration': round(duration, 4)}, plainEncode=True) def fetch(self): - return self.adapater.fetch() + self.adapater.fetch() def close(self, force=True): closed = False diff --git a/hkube_python_wrapper/communication/streaming/StreamingListener.py b/hkube_python_wrapper/communication/streaming/StreamingListener.py index 80300dbf..f6366d29 100644 --- a/hkube_python_wrapper/communication/streaming/StreamingListener.py +++ b/hkube_python_wrapper/communication/streaming/StreamingListener.py @@ -1,11 +1,9 @@ import time from hkube_python_wrapper.util.DaemonThread import DaemonThread - class StreamingListener(DaemonThread): - def __init__(self, messageListeners, balcFetchSize=100): - self._balcFetchSize = balcFetchSize + def __init__(self, messageListeners): self._listeningToMessages = True self._working = True self._messageListeners = messageListeners @@ -18,11 +16,7 @@ def run(self): time.sleep(1) # free some cpu continue for listener in messageListeners: - hasMessage = True - count = 0 - while hasMessage and count < self._balcFetchSize: - hasMessage = listener.fetch() - count = count + 1 + listener.fetch() self._working = False def stop(self, force=True): diff --git a/hkube_python_wrapper/communication/streaming/StreamingManager.py b/hkube_python_wrapper/communication/streaming/StreamingManager.py index 3a9aaa96..9938b1db 100644 --- a/hkube_python_wrapper/communication/streaming/StreamingManager.py +++ b/hkube_python_wrapper/communication/streaming/StreamingManager.py @@ -18,15 +18,12 @@ def __init__(self): self.parsedFlows = {} self.defaultFlow = None self._streamingListener = None - self.balcFetchSize = None + def setParsedFlows(self, flows, defaultFlow): self.parsedFlows = flows self.defaultFlow = defaultFlow - def setListenerBalcFetchSize(self, balcFetchSize): - self.balcFetchSize = balcFetchSize - def setupStreamingProducer(self, onStatistics, producerConfig, nextNodes, nodeName): self.messageProducer = MessageProducer(producerConfig, nextNodes, nodeName) self.messageProducer.registerStatisticsListener(onStatistics) @@ -86,18 +83,16 @@ def startMessageListening(self): self.listeningToMessages = True if (self._isStarted is False): self._isStarted = True - self._streamingListener = StreamingListener(self._getMessageListeners, self.balcFetchSize) + self._streamingListener = StreamingListener(self._getMessageListeners) self._streamingListener.start() def sendMessage(self, msg, flowName=None): if (self.messageProducer is None): - raise Exception( - 'Trying to send a message from a none stream pipeline or after close had been applied on algorithm') + raise Exception('Trying to send a message from a none stream pipeline or after close had been applied on algorithm') if (self.messageProducer.nodeNames): parsedFlow = None if (flowName is None): - if hasattr(self.threadLocalStorage, - 'messageFlowPattern') and self.threadLocalStorage.messageFlowPattern: + if hasattr(self.threadLocalStorage, 'messageFlowPattern') and self.threadLocalStorage.messageFlowPattern: parsedFlow = self.threadLocalStorage.messageFlowPattern else: if (self.defaultFlow is None): diff --git a/hkube_python_wrapper/communication/zmq/streaming/ZMQListener.py b/hkube_python_wrapper/communication/zmq/streaming/ZMQListener.py index d3846746..3fa87614 100644 --- a/hkube_python_wrapper/communication/zmq/streaming/ZMQListener.py +++ b/hkube_python_wrapper/communication/zmq/streaming/ZMQListener.py @@ -42,6 +42,7 @@ def fetch(self): try: if (self._active is False): time.sleep(0.2) + return if (self._pollTimeoutCount == MAX_POLLS): log.warning('ZMQListener poll timeout reached') @@ -50,14 +51,14 @@ def fetch(self): self._worker = self._worker_socket(self._remoteAddress) if (self._pollTimeoutCount > 0): - return self._readMessage() + self._readMessage() + return self._send(signals.PPP_READY) - return self._readMessage() + self._readMessage() except Exception as e: log.error('ZMQListener.fetch {e}', e=str(e)) - return False finally: if(self._active is False): self._working = False diff --git a/hkube_python_wrapper/config/config.py b/hkube_python_wrapper/config/config.py index 69546202..f8118419 100644 --- a/hkube_python_wrapper/config/config.py +++ b/hkube_python_wrapper/config/config.py @@ -33,9 +33,9 @@ def getBoolEnv(name, defaultValue): "messagesMemoryBuff": getIntEnv('STREAMING_MAX_BUFFER_MB', 1500), "port": os.environ.get('STREAMING_DISCOVERY_PORT', 9022), "statisticsInterval": os.environ.get('STREAMING_STATISTICS_INTERVAL', 2), - "stateful": getBoolEnv('STREAMING_STATEFUL', 'True'), - "balcFetchSize": getIntEnv('STREAMING_BALC_SIZE', '100') + "stateful": getBoolEnv('STREAMING_STATEFUL', 'True') } + } algorithm = { "path": os.environ.get('ALGORITHM_PATH', "algorithm_unique_folder"), diff --git a/hkube_python_wrapper/wrapper/algorunner.py b/hkube_python_wrapper/wrapper/algorunner.py index a8623895..3c3eca1e 100644 --- a/hkube_python_wrapper/wrapper/algorunner.py +++ b/hkube_python_wrapper/wrapper/algorunner.py @@ -361,7 +361,6 @@ def onStatistics(statistics): def _start(self, options): if (self._job.isStreaming): self.streamingManager.setParsedFlows(self._job.parsedFlow, self._job.defaultFlow) - self.streamingManager.setListenerBalcFetchSize(config.discovery["streaming"]["balcFetchSize"]) if (self._job.childs): self._setupStreamingProducer(self._job.nodeName) self.streamingManager.clearMessageListeners() diff --git a/tests/configs/config.py b/tests/configs/config.py index 9b2746e7..cbccccc3 100644 --- a/tests/configs/config.py +++ b/tests/configs/config.py @@ -22,8 +22,7 @@ def getBoolEnv(name, defaultValue): "encoding": os.environ.get('DISCOVERY_ENCODING', 'msgpack'), "timeout": getIntEnv('DISCOVERY_TIMEOUT', 5000), "networkTimeout": getIntEnv('DISCOVERY_NETWORK_TIMEOUT', 1000), - "maxCacheSize": getIntEnv('DISCOVERY_MAX_CACHE_SIZE', 400), - "balcFetchSize": getIntEnv('STREAMING_BALC_SIZE', '100') + "maxCacheSize": getIntEnv('DISCOVERY_MAX_CACHE_SIZE', 400) } algorithm = { "path": os.environ.get('ALGORITHM_PATH', "algorithm_unique_folder"), diff --git a/tests/test_streaming.py b/tests/test_streaming.py index fdf143d1..6e7fbd9d 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -24,7 +24,6 @@ def statsInvoked(args): listen_config = {'encoding': 'msgpack'} streamingManager = StreamingManager() streamingManager.setParsedFlows(parsedFlows, 'analyze') - streamingManager.setListenerBalcFetchSize(100) streamingManager.setupStreamingProducer(statsInvoked, producer_config, [nodeName], 'A') streamingManager.setupStreamingListeners(listen_config, parents, nodeName) streamingManager.registerInputListener(onMessage) @@ -70,12 +69,10 @@ def statsInvoked(args): streamingManagerA = StreamingManager() streamingManagerA.setParsedFlows(parsedFlows, 'analyze') - streamingManagerA.setListenerBalcFetchSize(100) streamingManagerA.setupStreamingProducer(statsInvoked, producer_configA, ['B'], 'A') streamingManagerB = StreamingManager() streamingManagerB.setParsedFlows(parsedFlows, 'analyze') - streamingManagerB.setListenerBalcFetchSize(100) streamingManagerB.setupStreamingProducer(statsInvoked, producer_configB, ['C'], 'B') streamingManagerB.setupStreamingListeners(listen_config, parents1, 'B') streamingManagerB.registerInputListener(onMessageAtB) @@ -84,11 +81,10 @@ def statsInvoked(args): streamingManagerC = StreamingManager() streamingManagerC.setupStreamingListeners(listen_config, parents2, 'C') streamingManagerC.registerInputListener(onMessageAtC) - streamingManagerC.setListenerBalcFetchSize(100) streamingManagerC.startMessageListening() streamingManagerA.sendMessage('msg from A') - time.sleep(3) + time.sleep(1) assert resultsAtB['data'] == 'msg from A' assert resultsAtB['origin'] == 'A' assert resultsAtC['data'] == 'msg from B' From 7a8c855b98a35e0749dc9b51ec819138d2ab4756 Mon Sep 17 00:00:00 2001 From: golanha Date: Tue, 12 Dec 2023 21:37:04 +0200 Subject: [PATCH 2/3] multi fetch --- .../communication/streaming/StreamingListener.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/hkube_python_wrapper/communication/streaming/StreamingListener.py b/hkube_python_wrapper/communication/streaming/StreamingListener.py index f6366d29..9e208e5e 100644 --- a/hkube_python_wrapper/communication/streaming/StreamingListener.py +++ b/hkube_python_wrapper/communication/streaming/StreamingListener.py @@ -1,3 +1,4 @@ +import threading import time from hkube_python_wrapper.util.DaemonThread import DaemonThread @@ -8,7 +9,8 @@ def __init__(self, messageListeners): self._working = True self._messageListeners = messageListeners DaemonThread.__init__(self, "StreamingListener") - + def fetch(self,messageListener): + messageListener.fetch() def run(self): while (self._listeningToMessages): messageListeners = self._messageListeners() @@ -16,7 +18,10 @@ def run(self): time.sleep(1) # free some cpu continue for listener in messageListeners: - listener.fetch() + listener.thread = threading.Thread(target=self.fetch, args=[listener]) + listener.thread.start() + for listener in messageListeners: + listener.thread.join() self._working = False def stop(self, force=True): From 862721b391d525395e90ca09ef435f9392853f06 Mon Sep 17 00:00:00 2001 From: golanha Date: Wed, 13 Dec 2023 11:40:46 +0200 Subject: [PATCH 3/3] thread name --- .../communication/streaming/StreamingListener.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/hkube_python_wrapper/communication/streaming/StreamingListener.py b/hkube_python_wrapper/communication/streaming/StreamingListener.py index 9e208e5e..6f0ae4b7 100644 --- a/hkube_python_wrapper/communication/streaming/StreamingListener.py +++ b/hkube_python_wrapper/communication/streaming/StreamingListener.py @@ -1,5 +1,7 @@ import threading import time +import uuid + from hkube_python_wrapper.util.DaemonThread import DaemonThread class StreamingListener(DaemonThread): @@ -18,7 +20,8 @@ def run(self): time.sleep(1) # free some cpu continue for listener in messageListeners: - listener.thread = threading.Thread(target=self.fetch, args=[listener]) + threadName = listener.messageOriginNodeName + "_" + str(uuid.uuid4()) + listener.thread = threading.Thread(name=threadName,target=self.fetch, args=[listener]) listener.thread.start() for listener in messageListeners: listener.thread.join()