Skip to content

Commit 6dd1b40

Browse files
committed
grateful shutdown from forever loop
1 parent ef94a77 commit 6dd1b40

File tree

1 file changed

+25
-17
lines changed

1 file changed

+25
-17
lines changed

mercury/platform.py

+25-17
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ def __init__(self, work_dir: str = None, log_file: str = None, log_level: str =
323323
self.log.debug("Estimated processing rate is "+format(self._throttle.get_tps(), ',d')
324324
+ " events per second for this computer")
325325
self.running = True
326+
self.stopped = False
326327

327328
# start event loop in a new thread to avoid blocking the main thread
328329
def main_event_loop():
@@ -346,6 +347,7 @@ def graceful_shutdown(signum, frame):
346347
self.log.info("To stop this application, press Control-C")
347348
while self.running:
348349
time.sleep(0.1)
350+
self.log.info("Bye")
349351
self.stop()
350352
else:
351353
raise ValueError('Unable to register Control-C and KILL signals because this is not the main thread')
@@ -556,20 +558,26 @@ def connect_to_cloud(self):
556558
self._loop.run_in_executor(self._executor, self._cloud.start_connection)
557559

558560
def stop(self):
559-
def stopping():
560-
route_list = []
561-
for route in self.get_routes():
562-
route_list.append(route)
563-
for route in route_list:
564-
self._remove_route(route)
565-
self._loop.create_task(full_stop())
566-
567-
async def full_stop():
568-
# give time for registered services to stop
569-
await asyncio.sleep(1.0)
570-
queue_dir = self.util.normalize_path(self.work_dir + "/queues/" + self.get_origin())
571-
self.util.cleanup_dir(queue_dir)
572-
self._loop.stop()
573-
574-
self._cloud.close_connection(1000, 'bye', stop_engine=True)
575-
self._loop.call_soon_threadsafe(stopping)
561+
if not self.stopped:
562+
# guarantee this stop function to execute only once
563+
self.stopped = True
564+
# exit the run_forever loop if any
565+
self.running = False
566+
567+
def stopping():
568+
route_list = []
569+
for route in self.get_routes():
570+
route_list.append(route)
571+
for route in route_list:
572+
self._remove_route(route)
573+
self._loop.create_task(full_stop())
574+
575+
async def full_stop():
576+
# give time for registered services to stop
577+
await asyncio.sleep(1.0)
578+
queue_dir = self.util.normalize_path(self.work_dir + "/queues/" + self.get_origin())
579+
self.util.cleanup_dir(queue_dir)
580+
self._loop.stop()
581+
582+
self._cloud.close_connection(1000, 'bye', stop_engine=True)
583+
self._loop.call_soon_threadsafe(stopping)

0 commit comments

Comments
 (0)