Skip to content

Commit

Permalink
fix(internal): move logging rate limiter to log filter (#12243)
Browse files Browse the repository at this point in the history
PGB-61

Fixes #4856

Improve and fix internal `ddtrace.internal.logger.get_logger`
implementation.

Today we enforce that all internal `ddtrace` loggers use a customer
class implementation of `logging.Logger` to ensure we can apply custom
logic like forwarding internal error logs to telemetry and rate limiting
the volume of ddtrace logs allowed.

However, the current implementation requires knowledge and use of
internal `logging.getLogger` details, and it tries to replicate some of
the behavior. This has caused us to miss a key lock around a shared
resource that was expected.


This change in implementation moves to apply our logic via a log filter
instead of a customer logger class.

A filter approach will provide the same logic with a much more simple
approach which will be easier to reason and maintain which will avoid
the bug/race condition found.

The downside to this approach is that loggers can have multiple filters,
and the first filter to determine that a log record should not be logged
will prevent the other filters from running. This means if a user
applies a log filter to one of our `ddtrace.*` loggers, then we may miss
collecting and forwarding the internal errors to telemetry. Otherwise,
if a user filter prevents a log from being logged, then it won't apply
towards our rate limit (which is acceptable/correct).

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [ ] Reviewer has checked that all the criteria below are met 
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

---------

Co-authored-by: Yun Kim <[email protected]>
Co-authored-by: Nicole Cybul <[email protected]>
Co-authored-by: Nick Ripley <[email protected]>
Co-authored-by: William Conti <[email protected]>
Co-authored-by: Christophe Papazian <[email protected]>
Co-authored-by: Munir Abdinur <[email protected]>
Co-authored-by: Laplie Anderson <[email protected]>
  • Loading branch information
8 people authored Feb 7, 2025
1 parent a8dfadf commit 122caa6
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 338 deletions.
216 changes: 58 additions & 158 deletions ddtrace/internal/logger.py
Original file line number Diff line number Diff line change
@@ -1,180 +1,80 @@
import collections
import logging
import os
import typing
from typing import Optional # noqa:F401
from typing import cast # noqa:F401
from typing import DefaultDict
from typing import Tuple


if typing.TYPE_CHECKING:
from typing import Any # noqa:F401
from typing import DefaultDict # noqa:F401
from typing import Tuple # noqa:F401


def get_logger(name):
# type: (str) -> DDLogger
def get_logger(name: str) -> logging.Logger:
"""
Retrieve or create a ``DDLogger`` instance.
This function mirrors the behavior of `logging.getLogger`.
Retrieve or create a ``Logger`` instance with consistent behavior for internal use.
If no logger with the provided name has been fetched before then
a new one is created.
Configure all loggers with a rate limiter filter to prevent excessive logging.
If a previous logger has been created then it is returned.
DEV: We do not want to mess with `logging.setLoggerClass()`
That will totally mess with the user's loggers, we want
just our own, selective loggers to be DDLoggers
:param name: The name of the logger to fetch or create
:type name: str
:return: The logger instance
:rtype: ``DDLogger``
"""
# DEV: `logging.Logger.manager` refers to the single root `logging.Manager` instance
# https://github.com/python/cpython/blob/48769a28ad6ef4183508951fa6a378531ace26a4/Lib/logging/__init__.py#L1824-L1826 # noqa:E501
manager = logging.Logger.manager
logger = logging.getLogger(name)
# addFilter will only add the filter if it is not already present
logger.addFilter(log_filter)
return logger

# If the logger does not exist yet, create it
# DEV: `Manager.loggerDict` is a dict mapping logger name to logger
# DEV: This is a simplified version of `logging.Manager.getLogger`
# https://github.com/python/cpython/blob/48769a28ad6ef4183508951fa6a378531ace26a4/Lib/logging/__init__.py#L1221-L1253 # noqa:E501
# DEV: _fixupParents could be adding a placeholder, we want to replace it if that's the case
if name in manager.loggerDict:
logger = manager.loggerDict[name]
if isinstance(manager.loggerDict[name], logging.PlaceHolder):
placeholder = logger
logger = DDLogger(name=name)
manager.loggerDict[name] = logger
# DEV: `_fixupChildren` and `_fixupParents` have been around for awhile,
# DEV: but add the `hasattr` guard... just in case.
if hasattr(manager, "_fixupChildren"):
manager._fixupChildren(placeholder, logger)
if hasattr(manager, "_fixupParents"):
manager._fixupParents(logger)
else:
logger = DDLogger(name=name)
manager.loggerDict[name] = logger
if hasattr(manager, "_fixupParents"):
manager._fixupParents(logger)

# Return our logger
return cast(DDLogger, logger)
# Named tuple used for keeping track of a log lines current time bucket and the number of log lines skipped
LoggingBucket = collections.namedtuple("LoggingBucket", ("bucket", "skipped"))
# Dict to keep track of the current time bucket per name/level/pathname/lineno
_buckets: DefaultDict[Tuple[str, int, str, int], LoggingBucket] = collections.defaultdict(lambda: LoggingBucket(0, 0))

# Allow 1 log record per name/level/pathname/lineno every 60 seconds by default
# Allow configuring via `DD_TRACE_LOGGING_RATE`
# DEV: `DD_TRACE_LOGGING_RATE=0` means to disable all rate limiting
_rate_limit = int(os.getenv("DD_TRACE_LOGGING_RATE", default=60))

def hasHandlers(self):
# type: (DDLogger) -> bool
"""
See if this logger has any handlers configured.
Loop through all handlers for this logger and its parents in the
logger hierarchy. Return True if a handler was found, else False.
Stop searching up the hierarchy whenever a logger with the "propagate"
attribute set to zero is found - that will be the last logger which
is checked for the existence of handlers.

https://github.com/python/cpython/blob/8f192d12af82c4dc40730bf59814f6a68f68f950/Lib/logging/__init__.py#L1629
def log_filter(record: logging.LogRecord) -> bool:
"""
c = self
rv = False
while c:
if c.handlers:
rv = True
break
if not c.propagate:
break
else:
c = c.parent # type: ignore
return rv
Function used to determine if a log record should be outputted or not (True = output, False = skip).

class DDLogger(logging.Logger):
"""
Custom rate limited logger used by ``ddtrace``
This logger class is used to rate limit the output of
log messages from within the ``ddtrace`` package.
This function will:
- Log all records with a level of ERROR or higher with telemetry
- Rate limit log records based on the logger name, record level, filename, and line number
"""
if record.levelno >= logging.ERROR:
# avoid circular import
from ddtrace.internal import telemetry

# Named tuple used for keeping track of a log lines current time bucket and the number of log lines skipped
LoggingBucket = collections.namedtuple("LoggingBucket", ("bucket", "skipped"))

def __init__(self, *args, **kwargs):
# type: (*Any, **Any) -> None
"""Constructor for ``DDLogger``"""
super(DDLogger, self).__init__(*args, **kwargs)

# Dict to keep track of the current time bucket per name/level/pathname/lineno
self.buckets = collections.defaultdict(
lambda: DDLogger.LoggingBucket(0, 0)
) # type: DefaultDict[Tuple[str, int, str, int], DDLogger.LoggingBucket]
# currently we only have one error code
full_file_name = os.path.join(record.pathname, record.filename)
telemetry.telemetry_writer.add_error(1, record.msg % record.args, full_file_name, record.lineno)

# Allow 1 log record per name/level/pathname/lineno every 60 seconds by default
# Allow configuring via `DD_TRACE_LOGGING_RATE`
# DEV: `DD_TRACE_LOGGING_RATE=0` means to disable all rate limiting
rate_limit = os.getenv("DD_TRACE_LOGGING_RATE", default=None)

if rate_limit is not None:
self.rate_limit = int(rate_limit)
else:
self.rate_limit = 60

def handle(self, record):
# type: (logging.LogRecord) -> None
"""
Function used to call the handlers for a log line.
This implementation will first determine if this log line should
be logged or rate limited, and then call the base ``logging.Logger.handle``
function if it should be logged
DEV: This method has all of it's code inlined to reduce on functions calls
:param record: The log record being logged
:type record: ``logging.LogRecord``
"""
if record.levelno >= logging.ERROR:
# avoid circular import
from ddtrace.internal import telemetry

# currently we only have one error code
full_file_name = os.path.join(record.pathname, record.filename)
telemetry.telemetry_writer.add_error(1, record.msg % record.args, full_file_name, record.lineno)

# If rate limiting has been disabled (`DD_TRACE_LOGGING_RATE=0`) then apply no rate limit
# If the logging is in debug, then do not apply any limits to any log
if not self.rate_limit or self.getEffectiveLevel() == logging.DEBUG:
super(DDLogger, self).handle(record)
return
logger = logging.getLogger(record.name)

# If rate limiting has been disabled (`DD_TRACE_LOGGING_RATE=0`) then apply no rate limit
# If the logger is set to debug, then do not apply any limits to any log
if not _rate_limit or logger.getEffectiveLevel() == logging.DEBUG:
return True
# Allow 1 log record by name/level/pathname/lineno every X seconds
# DEV: current unix time / rate (e.g. 300 seconds) = time bucket
# int(1546615098.8404942 / 300) = 515538
# DEV: LogRecord `created` is a unix timestamp/float
# DEV: LogRecord has `levelname` and `levelno`, we want `levelno` e.g. `logging.DEBUG = 10`
current_bucket = int(record.created / self.rate_limit)

# Limit based on logger name, record level, filename, and line number
# ('ddtrace.writer', 'DEBUG', '../site-packages/ddtrace/writer.py', 137)
# This way each unique log message can get logged at least once per time period
# DEV: LogRecord has `levelname` and `levelno`, we want `levelno` e.g. `logging.DEBUG = 10`
key = (record.name, record.levelno, record.pathname, record.lineno)

# Only log this message if the time bucket has changed from the previous time we ran
logging_bucket = self.buckets[key]
if logging_bucket.bucket != current_bucket:
# Append count of skipped messages if we have skipped some since our last logging
if logging_bucket.skipped:
record.msg = "{}, %s additional messages skipped".format(record.msg)
record.args = record.args + (logging_bucket.skipped,) # type: ignore

# DEV: current unix time / rate (e.g. 300 seconds) = time bucket
# int(1546615098.8404942 / 300) = 515538
# DEV: LogRecord `created` is a unix timestamp/float
# DEV: LogRecord has `levelname` and `levelno`, we want `levelno` e.g. `logging.DEBUG = 10`
current_bucket = int(record.created / _rate_limit)
# Limit based on logger name, record level, filename, and line number
# ('ddtrace.writer', 'DEBUG', '../site-packages/ddtrace/writer.py', 137)
# This way each unique log message can get logged at least once per time period
# DEV: LogRecord has `levelname` and `levelno`, we want `levelno` e.g. `logging.DEBUG = 10`
key = (record.name, record.levelno, record.pathname, record.lineno)
# Only log this message if the time bucket has changed from the previous time we ran
logging_bucket = _buckets[key]
if logging_bucket.bucket != current_bucket:
# Append count of skipped messages if we have skipped some since our last logging
if logging_bucket.skipped:
record.msg = "{}, %s additional messages skipped".format(record.msg)
record.args = record.args + (logging_bucket.skipped,) # type: ignore
# Reset our bucket
self.buckets[key] = DDLogger.LoggingBucket(current_bucket, 0)

# Call the base handle to actually log this record
super(DDLogger, self).handle(record)
else:
# Increment the count of records we have skipped
# DEV: `self.buckets[key]` is a tuple which is immutable so recreate instead
self.buckets[key] = DDLogger.LoggingBucket(logging_bucket.bucket, logging_bucket.skipped + 1)
_buckets[key] = LoggingBucket(current_bucket, 0)
# Actually log this record
return True
# Increment the count of records we have skipped
# DEV: `buckets[key]` is a tuple which is immutable so recreate instead
_buckets[key] = LoggingBucket(logging_bucket.bucket, logging_bucket.skipped + 1)
# Skip this log message
return False
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
internal: Fix ``ddtrace`` internal logger initialization mutating an unlocked shared resource.
Loading

0 comments on commit 122caa6

Please sign in to comment.