Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel handle of messages #103

Merged
merged 3 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def onMessage(self, messageFlowPattern, header, msg):
return self._encoding.encode({'duration': round(duration, 4)}, plainEncode=True)

def fetch(self):
return self.adapater.fetch()
self.adapater.fetch()

def close(self, force=True):
closed = False
Expand Down
20 changes: 11 additions & 9 deletions hkube_python_wrapper/communication/streaming/StreamingListener.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
import threading
import time
from hkube_python_wrapper.util.DaemonThread import DaemonThread
import uuid

from hkube_python_wrapper.util.DaemonThread import DaemonThread

class StreamingListener(DaemonThread):

def __init__(self, messageListeners, balcFetchSize=100):
self._balcFetchSize = balcFetchSize
def __init__(self, messageListeners):
self._listeningToMessages = True
self._working = True
self._messageListeners = messageListeners
DaemonThread.__init__(self, "StreamingListener")

def fetch(self,messageListener):
messageListener.fetch()
def run(self):
while (self._listeningToMessages):
messageListeners = self._messageListeners()
if (not messageListeners):
time.sleep(1) # free some cpu
continue
for listener in messageListeners:
hasMessage = True
count = 0
while hasMessage and count < self._balcFetchSize:
hasMessage = listener.fetch()
count = count + 1
threadName = listener.messageOriginNodeName + "_" + str(uuid.uuid4())
listener.thread = threading.Thread(name=threadName,target=self.fetch, args=[listener])
listener.thread.start()
for listener in messageListeners:
listener.thread.join()
self._working = False

def stop(self, force=True):
Expand Down
13 changes: 4 additions & 9 deletions hkube_python_wrapper/communication/streaming/StreamingManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@ def __init__(self):
self.parsedFlows = {}
self.defaultFlow = None
self._streamingListener = None
self.balcFetchSize = None


def setParsedFlows(self, flows, defaultFlow):
self.parsedFlows = flows
self.defaultFlow = defaultFlow

def setListenerBalcFetchSize(self, balcFetchSize):
self.balcFetchSize = balcFetchSize

def setupStreamingProducer(self, onStatistics, producerConfig, nextNodes, nodeName):
self.messageProducer = MessageProducer(producerConfig, nextNodes, nodeName)
self.messageProducer.registerStatisticsListener(onStatistics)
Expand Down Expand Up @@ -86,18 +83,16 @@ def startMessageListening(self):
self.listeningToMessages = True
if (self._isStarted is False):
self._isStarted = True
self._streamingListener = StreamingListener(self._getMessageListeners, self.balcFetchSize)
self._streamingListener = StreamingListener(self._getMessageListeners)
self._streamingListener.start()

def sendMessage(self, msg, flowName=None):
if (self.messageProducer is None):
raise Exception(
'Trying to send a message from a none stream pipeline or after close had been applied on algorithm')
raise Exception('Trying to send a message from a none stream pipeline or after close had been applied on algorithm')
if (self.messageProducer.nodeNames):
parsedFlow = None
if (flowName is None):
if hasattr(self.threadLocalStorage,
'messageFlowPattern') and self.threadLocalStorage.messageFlowPattern:
if hasattr(self.threadLocalStorage, 'messageFlowPattern') and self.threadLocalStorage.messageFlowPattern:
parsedFlow = self.threadLocalStorage.messageFlowPattern
else:
if (self.defaultFlow is None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def fetch(self):
try:
if (self._active is False):
time.sleep(0.2)
return

if (self._pollTimeoutCount == MAX_POLLS):
log.warning('ZMQListener poll timeout reached')
Expand All @@ -50,14 +51,14 @@ def fetch(self):
self._worker = self._worker_socket(self._remoteAddress)

if (self._pollTimeoutCount > 0):
return self._readMessage()
self._readMessage()
return

self._send(signals.PPP_READY)
return self._readMessage()
self._readMessage()

except Exception as e:
log.error('ZMQListener.fetch {e}', e=str(e))
return False
finally:
if(self._active is False):
self._working = False
Expand Down
4 changes: 2 additions & 2 deletions hkube_python_wrapper/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ def getBoolEnv(name, defaultValue):
"messagesMemoryBuff": getIntEnv('STREAMING_MAX_BUFFER_MB', 1500),
"port": os.environ.get('STREAMING_DISCOVERY_PORT', 9022),
"statisticsInterval": os.environ.get('STREAMING_STATISTICS_INTERVAL', 2),
"stateful": getBoolEnv('STREAMING_STATEFUL', 'True'),
"balcFetchSize": getIntEnv('STREAMING_BALC_SIZE', '100')
"stateful": getBoolEnv('STREAMING_STATEFUL', 'True')
}

}
algorithm = {
"path": os.environ.get('ALGORITHM_PATH', "algorithm_unique_folder"),
Expand Down
1 change: 0 additions & 1 deletion hkube_python_wrapper/wrapper/algorunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,6 @@ def onStatistics(statistics):
def _start(self, options):
if (self._job.isStreaming):
self.streamingManager.setParsedFlows(self._job.parsedFlow, self._job.defaultFlow)
self.streamingManager.setListenerBalcFetchSize(config.discovery["streaming"]["balcFetchSize"])
if (self._job.childs):
self._setupStreamingProducer(self._job.nodeName)
self.streamingManager.clearMessageListeners()
Expand Down
3 changes: 1 addition & 2 deletions tests/configs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ def getBoolEnv(name, defaultValue):
"encoding": os.environ.get('DISCOVERY_ENCODING', 'msgpack'),
"timeout": getIntEnv('DISCOVERY_TIMEOUT', 5000),
"networkTimeout": getIntEnv('DISCOVERY_NETWORK_TIMEOUT', 1000),
"maxCacheSize": getIntEnv('DISCOVERY_MAX_CACHE_SIZE', 400),
"balcFetchSize": getIntEnv('STREAMING_BALC_SIZE', '100')
"maxCacheSize": getIntEnv('DISCOVERY_MAX_CACHE_SIZE', 400)
}
algorithm = {
"path": os.environ.get('ALGORITHM_PATH', "algorithm_unique_folder"),
Expand Down
6 changes: 1 addition & 5 deletions tests/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def statsInvoked(args):
listen_config = {'encoding': 'msgpack'}
streamingManager = StreamingManager()
streamingManager.setParsedFlows(parsedFlows, 'analyze')
streamingManager.setListenerBalcFetchSize(100)
streamingManager.setupStreamingProducer(statsInvoked, producer_config, [nodeName], 'A')
streamingManager.setupStreamingListeners(listen_config, parents, nodeName)
streamingManager.registerInputListener(onMessage)
Expand Down Expand Up @@ -70,12 +69,10 @@ def statsInvoked(args):

streamingManagerA = StreamingManager()
streamingManagerA.setParsedFlows(parsedFlows, 'analyze')
streamingManagerA.setListenerBalcFetchSize(100)
streamingManagerA.setupStreamingProducer(statsInvoked, producer_configA, ['B'], 'A')

streamingManagerB = StreamingManager()
streamingManagerB.setParsedFlows(parsedFlows, 'analyze')
streamingManagerB.setListenerBalcFetchSize(100)
streamingManagerB.setupStreamingProducer(statsInvoked, producer_configB, ['C'], 'B')
streamingManagerB.setupStreamingListeners(listen_config, parents1, 'B')
streamingManagerB.registerInputListener(onMessageAtB)
Expand All @@ -84,11 +81,10 @@ def statsInvoked(args):
streamingManagerC = StreamingManager()
streamingManagerC.setupStreamingListeners(listen_config, parents2, 'C')
streamingManagerC.registerInputListener(onMessageAtC)
streamingManagerC.setListenerBalcFetchSize(100)
streamingManagerC.startMessageListening()

streamingManagerA.sendMessage('msg from A')
time.sleep(3)
time.sleep(1)
assert resultsAtB['data'] == 'msg from A'
assert resultsAtB['origin'] == 'A'
assert resultsAtC['data'] == 'msg from B'
Expand Down