Skip to content

Commit 711deaf

Browse files
committed
Import executor from sotoki
1 parent ea6505f commit 711deaf

File tree

1 file changed

+163
-0
lines changed

1 file changed

+163
-0
lines changed

src/zimscraperlib/executor.py

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
# vim: ai ts=4 sts=4 et sw=4 nu
4+
5+
import datetime
6+
import queue
7+
import threading
8+
from typing import Callable
9+
10+
from .shared import logger
11+
12+
_shutdown = False
13+
# Lock that ensures that new workers are not created while the interpreter is
14+
# shutting down. Must be held while mutating _threads_queues and _shutdown.
15+
_global_shutdown_lock = threading.Lock()
16+
thread_deadline_sec = 60
17+
18+
19+
def excepthook(args):
20+
logger.error(f"UNHANDLED Exception in {args.thread.name}: {args.exc_type}")
21+
logger.exception(args.exc_value)
22+
23+
24+
threading.excepthook = excepthook
25+
26+
27+
class SotokiExecutor(queue.Queue):
28+
"""Custom FIFO queue based Executor that's less generic than ThreadPoolExec one
29+
30+
Providing more flexibility for the use cases we're interested about:
31+
- halt immediately (sort of) upon exception (if requested)
32+
- able to join() then restart later to accomodate successive steps
33+
34+
See: https://github.com/python/cpython/blob/3.8/Lib/concurrent/futures/thread.py
35+
"""
36+
37+
def __init__(self, queue_size: int = 10, nb_workers: int = 1, prefix: str = "T-"):
38+
super().__init__(queue_size)
39+
self.prefix = prefix
40+
self._shutdown_lock = threading.Lock()
41+
self.nb_workers = nb_workers
42+
self.exceptions = []
43+
44+
@property
45+
def exception(self):
46+
"""Exception raises in any thread, if any"""
47+
try:
48+
return self.exceptions[0:1].pop()
49+
except IndexError:
50+
return None
51+
52+
@property
53+
def alive(self):
54+
"""whether it should continue running"""
55+
return not self._shutdown
56+
57+
def submit(self, task: Callable, **kwargs):
58+
"""Submit a callable and its kwargs for execution in one of the workers"""
59+
with self._shutdown_lock, _global_shutdown_lock:
60+
if not self.alive:
61+
raise RuntimeError("cannot submit task to dead executor")
62+
if _shutdown:
63+
raise RuntimeError("cannot submit task after " "interpreter shutdown")
64+
65+
while True:
66+
try:
67+
self.put((task, kwargs), block=True, timeout=3.0)
68+
except queue.Full:
69+
if self.no_more:
70+
break
71+
else:
72+
break
73+
74+
def start(self):
75+
"""Enable executor, starting requested amount of workers
76+
77+
Workers are started always, not provisioned dynamicaly"""
78+
self.drain()
79+
self.release_halt()
80+
self._workers = set()
81+
self._shutdown = False
82+
self.exceptions[:] = []
83+
84+
for n in range(self.nb_workers):
85+
t = threading.Thread(target=self.worker, name=f"{self.prefix}{n}")
86+
t.daemon = True
87+
t.start()
88+
self._workers.add(t)
89+
90+
def worker(self):
91+
while self.alive or self.no_more:
92+
try:
93+
func, kwargs = self.get(block=True, timeout=2.0)
94+
except queue.Empty:
95+
if self.no_more:
96+
break
97+
continue
98+
except TypeError:
99+
# received None from the queue. most likely shuting down
100+
return
101+
102+
raises = kwargs.pop("raises") if "raises" in kwargs.keys() else False
103+
callback = kwargs.pop("callback") if "callback" in kwargs.keys() else None
104+
dont_release = kwargs.pop("dont_release", False)
105+
106+
try:
107+
func(**kwargs)
108+
except Exception as exc:
109+
logger.error(f"Error processing {func} with {kwargs=}")
110+
logger.exception(exc)
111+
if raises:
112+
self.exceptions.append(exc)
113+
self.shutdown()
114+
finally:
115+
# user will manually release the queue for this task.
116+
# most likely in a libzim-written callback
117+
if not dont_release:
118+
self.task_done()
119+
if callback:
120+
callback.__call__()
121+
122+
def drain(self):
123+
"""Empty the queue without processing the tasks (tasks will be lost)"""
124+
while True:
125+
try:
126+
self.get_nowait()
127+
except queue.Empty:
128+
break
129+
130+
def join(self):
131+
"""Await completion of workers, requesting them to stop taking new task"""
132+
logger.debug(f"joining all threads for {self.prefix}")
133+
self.no_more = True
134+
for num, t in enumerate(self._workers):
135+
deadline = datetime.datetime.now() + datetime.timedelta(
136+
seconds=thread_deadline_sec
137+
)
138+
logger.debug(f"Giving {self.prefix}{num} {thread_deadline_sec}s to join")
139+
e = threading.Event()
140+
while t.is_alive() and datetime.datetime.now() < deadline:
141+
t.join(1)
142+
e.wait(timeout=2)
143+
if t.is_alive():
144+
logger.debug(f"Thread {self.prefix}{num} is not joining. Skipping…")
145+
else:
146+
logger.debug(f"Thread {self.prefix}{num} joined")
147+
logger.debug(f"all threads joined for {self.prefix}")
148+
149+
def release_halt(self):
150+
"""release the `no_more` flag preventing workers from taking up tasks"""
151+
self.no_more = False
152+
153+
def shutdown(self, wait=True):
154+
"""stop the executor, either somewhat immediately or awaiting completion"""
155+
logger.debug(f"shutting down executor {self.prefix} with {wait=}")
156+
with self._shutdown_lock:
157+
self._shutdown = True
158+
159+
# Drain all work items from the queue
160+
if not wait:
161+
self.drain()
162+
if wait:
163+
self.join()

0 commit comments

Comments
 (0)