|
1 |
| -"""Structured logger utility for creating JSON logs. |
| 1 | +"""Temporary migration compatibility file. |
2 | 2 |
|
3 |
| -The Delphi group uses two ~identical versions of this file. |
4 |
| -Try to keep them in sync with edits, for sanity. |
5 |
| - https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py |
6 |
| - https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py |
7 |
| -""" |
8 |
| - |
9 |
| -import contextlib |
10 |
| -import logging |
11 |
| -import multiprocessing |
12 |
| -import os |
13 |
| -import sys |
14 |
| -import threading |
15 |
| -from traceback import format_exception |
16 |
| - |
17 |
| -import structlog |
18 |
| - |
19 |
| - |
20 |
| -def handle_exceptions(logger): |
21 |
| - """Handle exceptions using the provided logger.""" |
22 |
| - |
23 |
| - def exception_handler(scope, etype, value, traceback): |
24 |
| - logger.exception("Top-level exception occurred", |
25 |
| - scope=scope, exc_info=(etype, value, traceback)) |
26 |
| - |
27 |
| - def sys_exception_handler(etype, value, traceback): |
28 |
| - exception_handler("sys", etype, value, traceback) |
29 |
| - |
30 |
| - def threading_exception_handler(args): |
31 |
| - if args.exc_type == SystemExit and args.exc_value.code == 0: |
32 |
| - # `sys.exit(0)` is considered "successful termination": |
33 |
| - # https://docs.python.org/3/library/sys.html#sys.exit |
34 |
| - logger.debug("normal thread exit", thread=args.thread, |
35 |
| - stack="".join( |
36 |
| - format_exception( |
37 |
| - args.exc_type, args.exc_value, args.exc_traceback))) |
38 |
| - else: |
39 |
| - exception_handler(f"thread: {args.thread}", |
40 |
| - args.exc_type, args.exc_value, args.exc_traceback) |
41 |
| - |
42 |
| - sys.excepthook = sys_exception_handler |
43 |
| - threading.excepthook = threading_exception_handler |
44 |
| - |
45 |
| - |
46 |
| -def get_structured_logger(name=__name__, |
47 |
| - filename=None, |
48 |
| - log_exceptions=True): |
49 |
| - """Create a new structlog logger. |
50 |
| -
|
51 |
| - Use the logger returned from this in indicator code using the standard |
52 |
| - wrapper calls, e.g.: |
53 |
| -
|
54 |
| - logger = get_structured_logger(__name__) |
55 |
| - logger.warning("Error", type="Signal too low"). |
56 |
| -
|
57 |
| - The output will be rendered as JSON which can easily be consumed by logs |
58 |
| - processors. |
59 |
| -
|
60 |
| - See the structlog documentation for details. |
61 |
| -
|
62 |
| - Parameters |
63 |
| - --------- |
64 |
| - name: Name to use for logger (included in log lines), __name__ from caller |
65 |
| - is a good choice. |
66 |
| - filename: An (optional) file to write log output. |
67 |
| - """ |
68 |
| - # Set the underlying logging configuration |
69 |
| - if "LOG_DEBUG" in os.environ: |
70 |
| - log_level = logging.DEBUG |
71 |
| - else: |
72 |
| - log_level = logging.INFO |
73 |
| - |
74 |
| - logging.basicConfig( |
75 |
| - format="%(message)s", |
76 |
| - level=log_level, |
77 |
| - handlers=[logging.StreamHandler()]) |
78 |
| - |
79 |
| - def add_pid(_logger, _method_name, event_dict): |
80 |
| - """Add current PID to the event dict.""" |
81 |
| - event_dict["pid"] = os.getpid() |
82 |
| - return event_dict |
83 |
| - |
84 |
| - # Configure structlog. This uses many of the standard suggestions from |
85 |
| - # the structlog documentation. |
86 |
| - structlog.configure( |
87 |
| - processors=[ |
88 |
| - # Filter out log levels we are not tracking. |
89 |
| - structlog.stdlib.filter_by_level, |
90 |
| - # Include logger name in output. |
91 |
| - structlog.stdlib.add_logger_name, |
92 |
| - # Include log level in output. |
93 |
| - structlog.stdlib.add_log_level, |
94 |
| - # Include PID in output. |
95 |
| - add_pid, |
96 |
| - # Allow formatting into arguments e.g., logger.info("Hello, %s", |
97 |
| - # name) |
98 |
| - structlog.stdlib.PositionalArgumentsFormatter(), |
99 |
| - # Add timestamps. |
100 |
| - structlog.processors.TimeStamper(fmt="iso"), |
101 |
| - # Match support for exception logging in the standard logger. |
102 |
| - structlog.processors.StackInfoRenderer(), |
103 |
| - structlog.processors.format_exc_info, |
104 |
| - # Decode unicode characters |
105 |
| - structlog.processors.UnicodeDecoder(), |
106 |
| - # Render as JSON |
107 |
| - structlog.processors.JSONRenderer(), |
108 |
| - ], |
109 |
| - # Use a dict class for keeping track of data. |
110 |
| - context_class=dict, |
111 |
| - # Use a standard logger for the actual log call. |
112 |
| - logger_factory=structlog.stdlib.LoggerFactory(), |
113 |
| - # Use a standard wrapper class for utilities like log.warning() |
114 |
| - wrapper_class=structlog.stdlib.BoundLogger, |
115 |
| - # Cache the logger |
116 |
| - cache_logger_on_first_use=True, |
117 |
| - ) |
118 |
| - |
119 |
| - # Create the underlying python logger and wrap it with structlog |
120 |
| - system_logger = logging.getLogger(name) |
121 |
| - if filename and not system_logger.handlers: |
122 |
| - system_logger.addHandler(logging.FileHandler(filename)) |
123 |
| - system_logger.setLevel(log_level) |
124 |
| - logger = structlog.wrap_logger(system_logger) |
125 |
| - |
126 |
| - if log_exceptions: |
127 |
| - handle_exceptions(logger) |
128 |
| - |
129 |
| - return logger |
| 3 | +Can be removed once this line |
130 | 4 |
|
| 5 | + https://github.com/cmu-delphi/delphi-epidata/blob/69835d1d7795eaf9a710d9f4903fef22a07e8fdf/src/client/delphi_epidata.py#L19 |
131 | 6 |
|
132 |
| -class LoggerThread(): |
133 |
| - """ |
134 |
| - A construct to use a logger from multiprocessing workers/jobs. |
135 |
| -
|
136 |
| - the bare structlog loggers are thread-safe but not multiprocessing-safe. |
137 |
| - a `LoggerThread` will spawn a thread that listens to a mp.Queue |
138 |
| - and logs messages from it with the provided logger, |
139 |
| - so other processes can send logging messages to it |
140 |
| - via the logger-like `SubLogger` interface. |
141 |
| - the SubLogger even logs the pid of the caller. |
142 |
| -
|
143 |
| - this is good to use with a set of jobs that are part of a mp.Pool, |
144 |
| - but isnt recommended for general use |
145 |
| - because of overhead from threading and multiprocessing, |
146 |
| - and because it might introduce lag to log messages. |
147 |
| -
|
148 |
| - somewhat inspired by: |
149 |
| - docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes |
150 |
| - """ |
151 |
| - |
152 |
| - class SubLogger(): |
153 |
| - """MP-safe logger-like interface to convey log messages to a listening LoggerThread.""" |
154 |
| - |
155 |
| - def __init__(self, queue): |
156 |
| - """Create SubLogger with a bound queue.""" |
157 |
| - self.queue = queue |
158 |
| - |
159 |
| - def _log(self, level, *args, **kwargs): |
160 |
| - kwargs_plus = {'sub_pid': multiprocessing.current_process().pid} |
161 |
| - kwargs_plus.update(kwargs) |
162 |
| - self.queue.put([level, args, kwargs_plus]) |
163 |
| - |
164 |
| - def debug(self, *args, **kwargs): |
165 |
| - """Log a DEBUG level message.""" |
166 |
| - self._log(logging.DEBUG, *args, **kwargs) |
167 |
| - |
168 |
| - def info(self, *args, **kwargs): |
169 |
| - """Log an INFO level message.""" |
170 |
| - self._log(logging.INFO, *args, **kwargs) |
171 |
| - |
172 |
| - def warning(self, *args, **kwargs): |
173 |
| - """Log a WARNING level message.""" |
174 |
| - self._log(logging.WARNING, *args, **kwargs) |
175 |
| - |
176 |
| - def error(self, *args, **kwargs): |
177 |
| - """Log an ERROR level message.""" |
178 |
| - self._log(logging.ERROR, *args, **kwargs) |
179 |
| - |
180 |
| - def critical(self, *args, **kwargs): |
181 |
| - """Log a CRITICAL level message.""" |
182 |
| - self._log(logging.CRITICAL, *args, **kwargs) |
183 |
| - |
184 |
| - |
185 |
| - def get_sublogger(self): |
186 |
| - """Retrieve SubLogger for this LoggerThread.""" |
187 |
| - return self.sublogger |
188 |
| - |
189 |
| - def __init__(self, logger, q=None): |
190 |
| - """Create and start LoggerThread with supplied logger, creating a queue if not provided.""" |
191 |
| - self.logger = logger |
192 |
| - if q: |
193 |
| - self.msg_queue = q |
194 |
| - else: |
195 |
| - self.msg_queue = multiprocessing.Queue() |
196 |
| - |
197 |
| - def logger_thread_worker(): |
198 |
| - logger.info('thread started') |
199 |
| - while True: |
200 |
| - msg = self.msg_queue.get() |
201 |
| - if msg == 'STOP': |
202 |
| - logger.debug('received stop signal') |
203 |
| - break |
204 |
| - level, args, kwargs = msg |
205 |
| - if level in [logging.DEBUG, logging.INFO, logging.WARNING, |
206 |
| - logging.ERROR, logging.CRITICAL]: |
207 |
| - logger.log(level, *args, **kwargs) |
208 |
| - else: |
209 |
| - logger.error('received unknown logging level! exiting...', |
210 |
| - level=level, args_kwargs=(args, kwargs)) |
211 |
| - break |
212 |
| - logger.debug('stopping thread') |
213 |
| - |
214 |
| - self.thread = threading.Thread(target=logger_thread_worker, |
215 |
| - name="LoggerThread__"+logger.name) |
216 |
| - logger.debug('starting thread') |
217 |
| - self.thread.start() |
218 |
| - |
219 |
| - self.sublogger = LoggerThread.SubLogger(self.msg_queue) |
220 |
| - self.running = True |
221 |
| - |
222 |
| - def stop(self): |
223 |
| - """Terminate this LoggerThread.""" |
224 |
| - if not self.running: |
225 |
| - self.logger.warning('thread already stopped') |
226 |
| - return |
227 |
| - self.logger.debug('sending stop signal') |
228 |
| - self.msg_queue.put('STOP') |
229 |
| - self.thread.join() |
230 |
| - self.running = False |
231 |
| - self.logger.info('thread stopped') |
232 |
| - |
233 |
| - |
234 |
| -@contextlib.contextmanager |
235 |
| -def pool_and_threadedlogger(logger, *poolargs): |
236 |
| - """ |
237 |
| - Provide (to a context) a multiprocessing Pool and a proxy to the supplied logger. |
238 |
| -
|
239 |
| - Emulates the multiprocessing.Pool() context manager, |
240 |
| - but also provides (via a LoggerThread) a SubLogger proxy to logger |
241 |
| - that can be safely used by pool workers. |
242 |
| - The SubLogger proxy interface supports these methods: debug, info, warning, error, |
243 |
| - and critical. |
244 |
| - Also "cleans up" the pool by waiting for workers to complete |
245 |
| - as it exits the context. |
246 |
| - """ |
247 |
| - with multiprocessing.Manager() as manager: |
248 |
| - logger_thread = LoggerThread(logger, manager.Queue()) |
249 |
| - try: |
250 |
| - with multiprocessing.Pool(*poolargs) as pool: |
251 |
| - yield pool, logger_thread.get_sublogger() |
252 |
| - pool.close() |
253 |
| - pool.join() |
254 |
| - finally: |
255 |
| - logger_thread.stop() |
| 7 | +no longer imports from `delphi_utils.logger` directly. |
| 8 | +""" |
| 9 | +from delphi_logger import get_structured_logger # pylint: disable=unused-import |
0 commit comments