From ad2290df52b83fc4e4feee513163b6a1b59fd2e9 Mon Sep 17 00:00:00 2001 From: golanha Date: Mon, 17 Jun 2024 11:29:07 +0300 Subject: [PATCH] Enhance error logging --- hkube_python_wrapper/wrapper/algorunner.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/hkube_python_wrapper/wrapper/algorunner.py b/hkube_python_wrapper/wrapper/algorunner.py index c1dbbadd..05157495 100644 --- a/hkube_python_wrapper/wrapper/algorunner.py +++ b/hkube_python_wrapper/wrapper/algorunner.py @@ -315,6 +315,7 @@ def _getMethod(self, name): if (self._algorithm): return self._algorithm.get(name) return None + def _init(self, options): redirector = None try: @@ -345,14 +346,14 @@ def _init(self, options): def _discovery_update(self, discovery): log.debug('Got discovery update {discovery}', discovery=discovery) - messageListenerConfig = {'encoding': config.discovery['encoding'],'delay':config.discovery['delay']} + messageListenerConfig = {'encoding': config.discovery['encoding'], 'delay': config.discovery['delay']} self.streamingManager.setupStreamingListeners( messageListenerConfig, discovery, self._job.nodeName) def _setupStreamingProducer(self, nodeName): def onStatistics(statistics): thread_list = "" - self._printThread = self._printThread + 1 + self._printThread = self._printThread + 1 for thread in threading.enumerate(): thread_list = thread_list + " " + str(thread.name) if (self._printThread % 30 == 0): @@ -492,6 +493,7 @@ def _stopAlgorithm(self, options): if (self._job and self._job.isStreaming): if (forceStop is False): stoppingState = True + def stopping(): while (stoppingState): self._sendCommand(messages.outgoing.stopping, None) @@ -559,7 +561,7 @@ def _sendCommand(self, command, data): def sendError(self, error): try: - log.error(error) + log.error("Sending error to worker " + str(error)) self._wsc.send({ 'command': messages.outgoing.error, 'error': {