@@ -64,13 +64,16 @@ def fetch(self):
64
64
time .sleep (0.2 )
65
65
self ._numberOfTimesSkipped += 1
66
66
except Exception as e :
67
- print ("Error occurred during fetching,readingMessage or message handling by algorithm" )
68
- print ("Error occurred error is" + str (e ))
69
- try :
70
- algorithmLogger .exception (e )
71
- log .error ('Caught during fetch and handling {e}' , e = str (e ))
72
- except Exception as e :
73
- print ("Exception during error handling " + str (e ))
67
+ if (hasattr (e , 'strerror' ) and e .strerror == 'Socket operation on non-socket' ):
68
+ log .info (self ._remoteAddress + " already closed" )
69
+ else :
70
+ print ("Error occurred during fetching,readingMessage or message handling by algorithm" )
71
+ print ("Error occurred error is" + str (e ))
72
+ try :
73
+ algorithmLogger .exception (e )
74
+ log .error ('Caught during fetch and handling {e}' , e = str (e ))
75
+ except Exception as e :
76
+ print ("Exception during error handling " + str (e ))
74
77
finally :
75
78
if (self ._active is False ):
76
79
self ._working = False
@@ -106,11 +109,16 @@ def close(self, force=True):
106
109
time .sleep (0.02 )
107
110
108
111
if (self ._pollTimeoutCount ):
109
- log .warning ('trying to read message from socket after close' )
110
- hasMsg = self ._readMessage (timeout = POLL_MS * 5 )
111
- if (hasMsg ):
112
- log .warning ('success reading message from socket after close' )
113
-
112
+ try :
113
+ hasMsg = self ._readMessage (timeout = POLL_MS * 5 )
114
+ except Exception as e :
115
+ print ("Error occurred readingMessage or message handling by algorithm during close" )
116
+ print ("Error occurred error is" + str (e ))
117
+ try :
118
+ algorithmLogger .exception (e )
119
+ log .error ('Caught during close and message handling {e}' , e = str (e ))
120
+ except Exception as e :
121
+ print ("Exception during error handling " + str (e ))
114
122
self ._worker .close ()
115
123
closed = True
116
124
return closed
0 commit comments