Skip to content

Commit bf97f39

Browse files
committed
munir: move telemetry logger to a handler
munir: add telemetry log handler
1 parent f613c49 commit bf97f39

File tree

3 files changed

+78
-109
lines changed

3 files changed

+78
-109
lines changed

ddtrace/internal/logger.py

Lines changed: 2 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import collections
22
import logging
33
import os
4-
import traceback
54
import typing
65
from typing import Optional # noqa:F401
76
from typing import cast # noqa:F401
@@ -47,7 +46,7 @@ def get_logger(name):
4746
logger = manager.loggerDict[name]
4847
if isinstance(manager.loggerDict[name], logging.PlaceHolder):
4948
placeholder = logger
50-
logger = _new_logger(name=name)
49+
logger = DDLogger(name=name)
5150
manager.loggerDict[name] = logger
5251
# DEV: `_fixupChildren` and `_fixupParents` have been around for awhile,
5352
# DEV: but add the `hasattr` guard... just in case.
@@ -56,7 +55,7 @@ def get_logger(name):
5655
if hasattr(manager, "_fixupParents"):
5756
manager._fixupParents(logger)
5857
else:
59-
logger = _new_logger(name=name)
58+
logger = DDLogger(name=name)
6059
manager.loggerDict[name] = logger
6160
if hasattr(manager, "_fixupParents"):
6261
manager._fixupParents(logger)
@@ -65,13 +64,6 @@ def get_logger(name):
6564
return cast(DDLogger, logger)
6665

6766

68-
def _new_logger(name):
69-
if _TelemetryConfig.LOG_COLLECTION_ENABLED:
70-
if name.startswith("ddtrace.contrib."):
71-
return DDTelemetryLogger(name=name)
72-
return DDLogger(name=name)
73-
74-
7567
def hasHandlers(self):
7668
# type: (DDLogger) -> bool
7769
"""
@@ -142,14 +134,6 @@ def handle(self, record):
142134
:param record: The log record being logged
143135
:type record: ``logging.LogRecord``
144136
"""
145-
if record.levelno >= logging.ERROR:
146-
# avoid circular import
147-
from ddtrace.internal import telemetry
148-
149-
# currently we only have one error code
150-
full_file_name = os.path.join(record.pathname, record.filename)
151-
telemetry.telemetry_writer.add_error(1, record.msg % record.args, full_file_name, record.lineno)
152-
153137
# If rate limiting has been disabled (`DD_TRACE_LOGGING_RATE=0`) then apply no rate limit
154138
# If the logging is in debug, then do not apply any limits to any log
155139
if not self.rate_limit or self.getEffectiveLevel() == logging.DEBUG:
@@ -186,91 +170,3 @@ def handle(self, record):
186170
# Increment the count of records we have skipped
187171
# DEV: `self.buckets[key]` is a tuple which is immutable so recreate instead
188172
self.buckets[key] = DDLogger.LoggingBucket(logging_bucket.bucket, logging_bucket.skipped + 1)
189-
190-
191-
class DDTelemetryLogger(DDLogger):
192-
"""
193-
Logger that intercepts and reports exceptions to the telemetry.
194-
"""
195-
196-
def __init__(self, *args, **kwargs):
197-
# type: (*Any, **Any) -> None
198-
"""Constructor for ``DDTelemetryLogger``"""
199-
super(DDTelemetryLogger, self).__init__(*args, **kwargs)
200-
201-
self.telemetry_log_buckets = collections.defaultdict(
202-
lambda: DDLogger.LoggingBucket(0, 0)
203-
) # type: DefaultDict[Tuple[str, int, str, int], DDLogger.LoggingBucket]
204-
205-
def handle(self, record):
206-
# type: (logging.LogRecord) -> None
207-
208-
from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL
209-
210-
key = (record.name, record.levelno, record.pathname, record.lineno)
211-
current_bucket = int(record.created / _TelemetryConfig.TELEMETRY_HEARTBEAT_INTERVAL)
212-
key_bucket = self.telemetry_log_buckets[key]
213-
if key_bucket.bucket == current_bucket:
214-
self.telemetry_log_buckets[key] = DDLogger.LoggingBucket(key_bucket.bucket, key_bucket.skipped + 1)
215-
else:
216-
self.telemetry_log_buckets[key] = DDLogger.LoggingBucket(current_bucket, 0)
217-
level = (
218-
TELEMETRY_LOG_LEVEL.ERROR
219-
if record.levelno >= logging.ERROR
220-
else TELEMETRY_LOG_LEVEL.WARNING
221-
if record.levelno == logging.WARNING
222-
else TELEMETRY_LOG_LEVEL.DEBUG
223-
)
224-
from ddtrace.internal import telemetry
225-
226-
tags = {
227-
"lib_language": "python",
228-
}
229-
stack_trace = _format_stack_trace(record.exc_info) if record.exc_info is not None else None
230-
if record.levelno >= logging.ERROR or stack_trace is not None:
231-
# Report only an error or an exception with a stack trace
232-
telemetry.telemetry_writer.add_log(
233-
level, record.msg, tags=tags, stack_trace=stack_trace, count=key_bucket.skipped + 1
234-
)
235-
236-
super().handle(record)
237-
238-
239-
def _format_stack_trace(exc_info):
240-
exc_type, exc_value, exc_traceback = exc_info
241-
if exc_traceback:
242-
tb = traceback.extract_tb(exc_traceback)
243-
formatted_tb = ["Traceback (most recent call last):"]
244-
for filename, lineno, funcname, srcline in tb:
245-
if _should_redact(filename):
246-
formatted_tb.append(" <REDACTED>")
247-
else:
248-
relative_filename = _format_file_path(filename)
249-
formatted_line = f' File "{relative_filename}", line {lineno}, in {funcname}\n {srcline}'
250-
formatted_tb.append(formatted_line)
251-
formatted_tb.append(f"{exc_type.__module__}.{exc_type.__name__}: {exc_value}")
252-
return "\n".join(formatted_tb)
253-
return None
254-
255-
256-
def _should_redact(filename):
257-
return not "ddtrace" in filename
258-
259-
260-
CWD = os.getcwd()
261-
262-
263-
def _format_file_path(filename):
264-
try:
265-
return os.path.relpath(filename, start=CWD)
266-
except ValueError:
267-
return filename
268-
269-
270-
class _TelemetryConfig:
271-
TELEMETRY_ENABLED = os.getenv("DD_INSTRUMENTATION_TELEMETRY_ENABLED", "true").lower() in ("true", "1")
272-
LOG_COLLECTION_ENABLED = TELEMETRY_ENABLED and os.getenv("DD_TELEMETRY_LOG_COLLECTION_ENABLED", "true").lower() in (
273-
"true",
274-
"1",
275-
)
276-
TELEMETRY_HEARTBEAT_INTERVAL = int(os.getenv("DD_TELEMETRY_HEARTBEAT_INTERVAL", "60"))

ddtrace/internal/telemetry/logging.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import logging
2+
import os
3+
import traceback
4+
5+
from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL
6+
7+
8+
class DDTelemetryLogHandler(logging.StreamHandler):
9+
CWD = os.getcwd()
10+
11+
def __init__(self, telemetry_writer):
12+
self.telemetry_writer = telemetry_writer
13+
super().__init__()
14+
15+
def emit(self, record):
16+
# type: (logging.LogRecord) -> None
17+
if record.levelno >= logging.ERROR:
18+
# Capture start up errors
19+
full_file_name = os.path.join(record.pathname, record.filename)
20+
self.telemetry_writer.add_error(1, record.msg % record.args, full_file_name, record.lineno)
21+
22+
# Capture errors logged in the ddtrace integrations
23+
if record.name.startswith("ddtrace.contrib"):
24+
telemetry_level = (
25+
TELEMETRY_LOG_LEVEL.ERROR
26+
if record.levelno >= logging.ERROR
27+
else TELEMETRY_LOG_LEVEL.WARNING
28+
if record.levelno == logging.WARNING
29+
else TELEMETRY_LOG_LEVEL.DEBUG
30+
)
31+
# Only collect telemetry for logs with a traceback
32+
stack_trace = self._format_stack_trace(record.exc_info)
33+
if stack_trace is not None:
34+
# Report only exceptions with a stack trace
35+
self.telemetry_writer.add_log(
36+
telemetry_level,
37+
record.msg,
38+
# Do we need to set this tag? Should we allow telemetry intake to infer this value?
39+
tags={"lib_language": "python"},
40+
stack_trace=stack_trace,
41+
)
42+
43+
def _format_stack_trace(self, exc_info):
44+
if exc_info is None:
45+
return None
46+
exc_type, exc_value, exc_traceback = exc_info
47+
if exc_traceback:
48+
tb = traceback.extract_tb(exc_traceback)
49+
formatted_tb = ["Traceback (most recent call last):"]
50+
for filename, lineno, funcname, srcline in tb:
51+
if self._should_redact(filename):
52+
formatted_tb.append(" <REDACTED>")
53+
else:
54+
relative_filename = self._format_file_path(filename)
55+
formatted_line = f' File "{relative_filename}", line {lineno}, in {funcname}\n {srcline}'
56+
formatted_tb.append(formatted_line)
57+
formatted_tb.append(f"{exc_type.__module__}.{exc_type.__name__}: {exc_value}")
58+
return "\n".join(formatted_tb)
59+
60+
def _should_redact(self, filename):
61+
return "ddtrace" not in filename
62+
63+
def _format_file_path(self, filename):
64+
try:
65+
return os.path.relpath(filename, start=self.CWD)
66+
except ValueError:
67+
return filename

ddtrace/internal/telemetry/writer.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
from .data import get_host_info
4040
from .data import get_python_config_vars
4141
from .data import update_imported_dependencies
42+
from .logging import DDTelemetryLogHandler
4243
from .metrics import CountMetric
4344
from .metrics import DistributionMetric
4445
from .metrics import GaugeMetric
@@ -68,6 +69,10 @@ class _TelemetryConfig:
6869
INSTALL_TYPE = os.environ.get("DD_INSTRUMENTATION_INSTALL_TYPE", None)
6970
INSTALL_TIME = os.environ.get("DD_INSTRUMENTATION_INSTALL_TIME", None)
7071
FORCE_START = asbool(os.environ.get("_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED", "false"))
72+
LOG_COLLECTION_ENABLED = TELEMETRY_ENABLED and os.getenv("DD_TELEMETRY_LOG_COLLECTION_ENABLED", "true").lower() in (
73+
"true",
74+
"1",
75+
)
7176

7277

7378
class LogData(dict):
@@ -222,6 +227,8 @@ def __init__(self, is_periodic=True, agentless=None):
222227
# Force app started for unit tests
223228
if _TelemetryConfig.FORCE_START:
224229
self._app_started()
230+
if _TelemetryConfig.LOG_COLLECTION_ENABLED:
231+
getLogger("ddtrace").addHandler(DDTelemetryLogHandler(self))
225232

226233
def enable(self):
227234
# type: () -> bool
@@ -484,7 +491,7 @@ def add_configurations(self, configuration_list):
484491
"value": value,
485492
}
486493

487-
def add_log(self, level, message, stack_trace="", tags=None, count=1):
494+
def add_log(self, level, message, stack_trace="", tags=None):
488495
"""
489496
Queues log. This event is meant to send library logs to Datadog’s backend through the Telemetry intake.
490497
This will make support cycles easier and ensure we know about potentially silent issues in libraries.
@@ -504,8 +511,7 @@ def add_log(self, level, message, stack_trace="", tags=None, count=1):
504511
data["tags"] = ",".join(["%s:%s" % (k, str(v).lower()) for k, v in tags.items()])
505512
if stack_trace:
506513
data["stack_trace"] = stack_trace
507-
if count > 1:
508-
data["count"] = count
514+
# Logs are hashed using the message, level, tags, and stack_trace. This should prevent duplicatation.
509515
self._logs.add(data)
510516

511517
def add_gauge_metric(self, namespace: TELEMETRY_NAMESPACE, name: str, value: float, tags: MetricTagType = None):

0 commit comments

Comments
 (0)