@@ -315,6 +315,7 @@ def _getMethod(self, name):
315
315
if (self ._algorithm ):
316
316
return self ._algorithm .get (name )
317
317
return None
318
+
318
319
def _init (self , options ):
319
320
redirector = None
320
321
try :
@@ -345,14 +346,14 @@ def _init(self, options):
345
346
346
347
def _discovery_update (self , discovery ):
347
348
log .debug ('Got discovery update {discovery}' , discovery = discovery )
348
- messageListenerConfig = {'encoding' : config .discovery ['encoding' ],'delay' :config .discovery ['delay' ]}
349
+ messageListenerConfig = {'encoding' : config .discovery ['encoding' ], 'delay' : config .discovery ['delay' ]}
349
350
self .streamingManager .setupStreamingListeners (
350
351
messageListenerConfig , discovery , self ._job .nodeName )
351
352
352
353
def _setupStreamingProducer (self , nodeName ):
353
354
def onStatistics (statistics ):
354
355
thread_list = ""
355
- self ._printThread = self ._printThread + 1
356
+ self ._printThread = self ._printThread + 1
356
357
for thread in threading .enumerate ():
357
358
thread_list = thread_list + " " + str (thread .name )
358
359
if (self ._printThread % 30 == 0 ):
@@ -492,6 +493,7 @@ def _stopAlgorithm(self, options):
492
493
if (self ._job and self ._job .isStreaming ):
493
494
if (forceStop is False ):
494
495
stoppingState = True
496
+
495
497
def stopping ():
496
498
while (stoppingState ):
497
499
self ._sendCommand (messages .outgoing .stopping , None )
@@ -559,7 +561,7 @@ def _sendCommand(self, command, data):
559
561
560
562
def sendError (self , error ):
561
563
try :
562
- log .error (error )
564
+ log .error ("Sending error to worker " + str ( error ) )
563
565
self ._wsc .send ({
564
566
'command' : messages .outgoing .error ,
565
567
'error' : {
0 commit comments