File tree 1 file changed +13
-4
lines changed 1 file changed +13
-4
lines changed Original file line number Diff line number Diff line change @@ -136,7 +136,9 @@ def process_aggregator(
136
136
f"Running aggregator: { aggregator } (source={ source } ) on { len (aggregator .get_buffer ())} messages"
137
137
)
138
138
ts = time .perf_counter ()
139
+
139
140
output = aggregator .get ()
141
+
140
142
self .logger .debug (
141
143
f'Aggregator "{ aggregator } (source={ source } )" done in { time .perf_counter () - ts :.2f} s'
142
144
)
@@ -150,9 +152,15 @@ def process_aggregator(
150
152
futures .append (future )
151
153
152
154
for future in as_completed (futures ):
153
- source , output = future .result ()
154
- if output is not None :
155
- self ._aggregation_results [source ] = output
155
+ try :
156
+ source , output = future .result ()
157
+ except Exception as e :
158
+ self .logger .error (f"Aggregator crashed: { e } " )
159
+ continue
160
+
161
+ if output is None :
162
+ continue
163
+ self ._aggregation_results [source ] = output
156
164
157
165
def stop (self ):
158
166
"""Stop the agent's execution loop."""
@@ -164,7 +172,8 @@ def stop(self):
164
172
if self ._aggregation_thread is not None :
165
173
self ._aggregation_thread .join ()
166
174
self ._aggregation_thread = None
175
+ self ._stop_event .clear ()
167
176
for callback_id in self ._registered_callbacks :
168
177
self ._connector .unregister_callback (callback_id )
169
- self ._stop_event . clear ()
178
+ self ._connector . shutdown ()
170
179
self .logger .info ("Agent stopped" )
You can’t perform that action at this time.
0 commit comments