Skip to content

chore: introduce APM_TRACING RC product #11980

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions ddtrace/_trace/product.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import enum
import os
import typing as t

from envier import En

from ddtrace.internal.utils.formats import asbool
from ddtrace.internal.utils.formats import parse_tags_str
from ddtrace.settings.http import HttpConfig


requires = ["remote-configuration"]
Expand Down Expand Up @@ -63,3 +66,54 @@ def at_exit(join=False):

if tracer.enabled:
tracer._atexit()


class APMCapabilities(enum.IntFlag):
APM_TRACING_SAMPLE_RATE = 1 << 12
APM_TRACING_LOGS_INJECTION = 1 << 13
APM_TRACING_HTTP_HEADER_TAGS = 1 << 14
APM_TRACING_CUSTOM_TAGS = 1 << 15
APM_TRACING_ENABLED = 1 << 19
APM_TRACING_SAMPLE_RULES = 1 << 29


def apm_tracing_rc(lib_config):
from ddtrace import config

base_rc_config: t.Dict[str, t.Any] = {n: None for n in config._config}

if "tracing_sampling_rules" in lib_config or "tracing_sampling_rate" in lib_config:
global_sampling_rate = lib_config.get("tracing_sampling_rate")
trace_sampling_rules = lib_config.get("tracing_sampling_rules") or []
# returns None if no rules
trace_sampling_rules = config._convert_rc_trace_sampling_rules(trace_sampling_rules, global_sampling_rate)
if trace_sampling_rules:
base_rc_config["_trace_sampling_rules"] = trace_sampling_rules

if "log_injection_enabled" in lib_config:
base_rc_config["_logs_injection"] = lib_config["log_injection_enabled"]

if "tracing_tags" in lib_config:
tags = lib_config["tracing_tags"]
if tags:
tags = config._format_tags(lib_config["tracing_tags"])
base_rc_config["tags"] = tags

if "tracing_enabled" in lib_config and lib_config["tracing_enabled"] is not None:
base_rc_config["_tracing_enabled"] = asbool(lib_config["tracing_enabled"])

if "tracing_header_tags" in lib_config:
tags = lib_config["tracing_header_tags"]
if tags:
tags = config._format_tags(lib_config["tracing_header_tags"])
base_rc_config["_trace_http_header_tags"] = tags

config._set_config_items([(k, v, "remote_config") for k, v in base_rc_config.items()])

# unconditionally handle the case where header tags have been unset
header_tags_conf = config._config["_trace_http_header_tags"]
env_headers = header_tags_conf._env_value or {}
code_headers = header_tags_conf._code_value or {}
non_rc_header_tags = {**code_headers, **env_headers}
selected_header_tags = base_rc_config.get("_trace_http_header_tags") or non_rc_header_tags
config._http = HttpConfig(header_tags=selected_header_tags)
4 changes: 3 additions & 1 deletion ddtrace/internal/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,6 @@ gets extended to add support for additional features.
| Attribute | Description |
|-----------|-------------|
| `requires: list[str]` | A list of other product names that the product depends on |
| `config: DDConfig` | A configuration object; when an instance of `DDConfig`, configuration telemetry is automatically reported |
| `config: DDConfig` | A configuration object; when an instance of `DDConfig`, configuration telemetry is automatically reported |
| `APMCapabilities: Type[enum.IntFlag]` | A set of capabilities that the product provides |
| `apm_tracing_rc: (dict) -> None` | Product-specific remote configuration handler (e.g. remote enablement) |
Empty file.
82 changes: 82 additions & 0 deletions ddtrace/internal/remoteconfig/products/apm_tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import typing as t

from ddtrace import config
from ddtrace.internal.core.event_hub import dispatch
from ddtrace.internal.core.event_hub import on
from ddtrace.internal.logger import get_logger
from ddtrace.internal.remoteconfig import Payload
from ddtrace.internal.remoteconfig._connectors import PublisherSubscriberConnector
from ddtrace.internal.remoteconfig._publishers import RemoteConfigPublisher
from ddtrace.internal.remoteconfig._pubsub import PubSub
from ddtrace.internal.remoteconfig._subscribers import RemoteConfigSubscriber


requires = ["remote-configuration"]


log = get_logger(__name__)


def _rc_callback(payloads: t.Sequence[Payload]) -> None:
for payload in payloads:
if payload.metadata is None or (content := payload.content) is None:
continue

if (service_target := t.cast(t.Optional[dict], content.get("service_target"))) is not None:
if (service := t.cast(str, service_target.get("service"))) is not None and service != config.service:
continue

if (env := t.cast(str, service_target.get("env"))) is not None and env != config.env:
continue

if (lib_config := t.cast(dict, content.get("lib_config"))) is not None:
dispatch("apm-tracing.rc", (lib_config,))


class APMTracingAdapter(PubSub):
__publisher_class__ = RemoteConfigPublisher
__subscriber_class__ = RemoteConfigSubscriber
__shared_data__ = PublisherSubscriberConnector()

def __init__(self):
self._publisher = self.__publisher_class__(self.__shared_data__)
self._subscriber = self.__subscriber_class__(self.__shared_data__, _rc_callback, "APM_TRACING")


def post_preload():
pass


def start():
if config._remote_config_enabled:
from ddtrace.internal.products import manager
from ddtrace.internal.remoteconfig.worker import remoteconfig_poller

remoteconfig_poller.register(
"APM_TRACING",
APMTracingAdapter(),
restart_on_fork=True,
capabilities=[
cap for product in manager.__products__.values() for cap in getattr(product, "APMCapabilities", [])
],
)

# Register remote config handlers
for name, product in manager.__products__.items():
if (rc_handler := getattr(product, "apm_tracing_rc", None)) is not None:
on("apm-tracing.rc", rc_handler, name)


def restart(join=False):
pass


def stop(join=False):
if config._remote_config_enabled:
from ddtrace.internal.remoteconfig.worker import remoteconfig_poller

remoteconfig_poller.unregister("APM_TRACING")


def at_exit(join=False):
stop(join=join)
Original file line number Diff line number Diff line change
@@ -1,32 +1,7 @@
import enum

from ddtrace import config
from ddtrace.internal.remoteconfig._connectors import PublisherSubscriberConnector
from ddtrace.internal.remoteconfig._publishers import RemoteConfigPublisher
from ddtrace.internal.remoteconfig._pubsub import PubSub
from ddtrace.internal.remoteconfig._pubsub import RemoteConfigSubscriber
from ddtrace.internal.remoteconfig.client import config as rc_config


class GlobalConfigPubSub(PubSub):
__publisher_class__ = RemoteConfigPublisher
__subscriber_class__ = RemoteConfigSubscriber
__shared_data__ = PublisherSubscriberConnector()

def __init__(self, callback):
self._publisher = self.__publisher_class__(self.__shared_data__, None)
self._subscriber = self.__subscriber_class__(self.__shared_data__, callback, "GlobalConfig")


class Capabilities(enum.IntFlag):
APM_TRACING_SAMPLE_RATE = 1 << 12
APM_TRACING_LOGS_INJECTION = 1 << 13
APM_TRACING_HTTP_HEADER_TAGS = 1 << 14
APM_TRACING_CUSTOM_TAGS = 1 << 15
APM_TRACING_ENABLED = 1 << 19
APM_TRACING_SAMPLE_RULES = 1 << 29


# TODO: Modularize better into their own respective components
def _register_rc_products() -> None:
"""Enable fetching configuration from Datadog."""
Expand All @@ -35,10 +10,8 @@ def _register_rc_products() -> None:
from ddtrace.internal.flare.handler import _tracerFlarePubSub
from ddtrace.internal.remoteconfig.worker import remoteconfig_poller

remoteconfig_pubsub = GlobalConfigPubSub(config._handle_remoteconfig)
flare = Flare(trace_agent_url=config._trace_agent_url, api_key=config._dd_api_key, ddconfig=config.__dict__)
tracerflare_pubsub = _tracerFlarePubSub()(_handle_tracer_flare, flare)
remoteconfig_poller.register("APM_TRACING", remoteconfig_pubsub, capabilities=Capabilities)
remoteconfig_poller.register("AGENT_CONFIG", tracerflare_pubsub)
remoteconfig_poller.register("AGENT_TASK", tracerflare_pubsub)

Expand Down
65 changes: 1 addition & 64 deletions ddtrace/settings/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,10 +582,7 @@ def __init__(self):
x_datadog_tags_max_length = _get_config("DD_TRACE_X_DATADOG_TAGS_MAX_LENGTH", 512, int)
if x_datadog_tags_max_length < 0:
log.warning(
(
"Invalid value %r provided for DD_TRACE_X_DATADOG_TAGS_MAX_LENGTH, "
"only non-negative values allowed"
),
("Invalid value %r provided for DD_TRACE_X_DATADOG_TAGS_MAX_LENGTH, only non-negative values allowed"),
x_datadog_tags_max_length,
)
x_datadog_tags_max_length = 0
Expand Down Expand Up @@ -806,66 +803,6 @@ def _get_source(self, item):
# type: (str) -> str
return self._config[item].source()

def _handle_remoteconfig(self, data_list, test_tracer=None):
# data_list is a list of Payload objects
# type: (Any, Any) -> None

if len(data_list) == 0:
log.warning("unexpected number of RC payloads")
return
data = [payload.content for payload in data_list]

# Check if 'lib_config' is a key in the dictionary since other items can be sent in the payload
config = None
for config_item in data:
if isinstance(config_item, Dict):
if "lib_config" in config_item:
config = config_item
break

# If no data is submitted then the RC config has been deleted. Revert the settings.
base_rc_config = {n: None for n in self._config}

if config and "lib_config" in config:
lib_config = config["lib_config"]
if "tracing_sampling_rules" in lib_config or "tracing_sampling_rate" in lib_config:
global_sampling_rate = lib_config.get("tracing_sampling_rate")
trace_sampling_rules = lib_config.get("tracing_sampling_rules") or []
# returns None if no rules
trace_sampling_rules = self._convert_rc_trace_sampling_rules(trace_sampling_rules, global_sampling_rate)
if trace_sampling_rules:
base_rc_config["_trace_sampling_rules"] = trace_sampling_rules # type: ignore[assignment]

if "log_injection_enabled" in lib_config:
base_rc_config["_logs_injection"] = lib_config["log_injection_enabled"]

if "tracing_tags" in lib_config:
tags = lib_config["tracing_tags"]
if tags:
tags = self._format_tags(lib_config["tracing_tags"])
base_rc_config["tags"] = tags

if "tracing_enabled" in lib_config and lib_config["tracing_enabled"] is not None:
base_rc_config["_tracing_enabled"] = asbool(lib_config["tracing_enabled"]) # type: ignore[assignment]

if "tracing_header_tags" in lib_config:
tags = lib_config["tracing_header_tags"]
if tags:
tags = self._format_tags(lib_config["tracing_header_tags"])
base_rc_config["_trace_http_header_tags"] = tags
self._set_config_items([(k, v, "remote_config") for k, v in base_rc_config.items()])
# called unconditionally to handle the case where header tags have been unset
self._handle_remoteconfig_header_tags(base_rc_config)

def _handle_remoteconfig_header_tags(self, base_rc_config):
"""Implements precedence order between remoteconfig header tags from code, env, and RC"""
header_tags_conf = self._config["_trace_http_header_tags"]
env_headers = header_tags_conf._env_value or {}
code_headers = header_tags_conf._code_value or {}
non_rc_header_tags = {**code_headers, **env_headers}
selected_header_tags = base_rc_config.get("_trace_http_header_tags") or non_rc_header_tags
self._http = HttpConfig(header_tags=selected_header_tags)

def _format_tags(self, tags: List[Union[str, Dict]]) -> Dict[str, str]:
if not tags:
return {}
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,13 @@ ddtrace = "ddtrace.contrib.internal.pytest.plugin"
"ddtrace.pytest_benchmark" = "ddtrace.contrib.internal.pytest_benchmark.plugin"

[project.entry-points.'ddtrace.products']
"apm-tracing-rc" = "ddtrace.internal.remoteconfig.products.apm_tracing"
"code-origin-for-spans" = "ddtrace.debugging._products.code_origin.span"
"dynamic-instrumentation" = "ddtrace.debugging._products.dynamic_instrumentation"
"exception-replay" = "ddtrace.debugging._products.exception_replay"
"live-debugger" = "ddtrace.debugging._products.live_debugger"
"error-tracking" = "ddtrace.errortracking.product"
"remote-configuration" = "ddtrace.internal.remoteconfig.product"
"remote-configuration" = "ddtrace.internal.remoteconfig.products.client"
"symbol-database" = "ddtrace.internal.symbol_db.product"
"appsec" = "ddtrace.internal.appsec.product"
"iast" = "ddtrace.internal.iast.product"
Expand Down
2 changes: 1 addition & 1 deletion riotfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def select_pys(min_version: str = MIN_PYTHON_VERSION, max_version: str = MAX_PYT
name="integration",
# Enabling coverage for integration tests breaks certain tests in CI
# Also, running two separate pytest sessions, the ``civisibility`` one with --no-ddtrace
command="pytest --no-ddtrace --no-cov --ignore-glob='*civisibility*' {cmdargs} tests/integration/",
command="pytest -vv --no-ddtrace --no-cov --ignore-glob='*civisibility*' {cmdargs} tests/integration/",
pkgs={"msgpack": [latest], "coverage": latest, "pytest-randomly": latest},
pys=select_pys(),
venvs=[
Expand Down
15 changes: 9 additions & 6 deletions tests/integration/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,27 +136,28 @@ def test_remoteconfig_sampling_rate_default(test_agent_session, run_python_code_
"""
from ddtrace import config, tracer
from tests.internal.test_settings import _base_rc_config
from tests.internal.test_settings import call_apm_tracing_rc

with tracer.trace("test") as span:
pass
assert span.get_metric("_dd.rule_psr") is None

config._handle_remoteconfig(_base_rc_config({"tracing_sampling_rate": 0.5}))
call_apm_tracing_rc(_base_rc_config({"tracing_sampling_rate": 0.5}))
with tracer.trace("test") as span:
pass
assert span.get_metric("_dd.rule_psr") == 0.5

config._handle_remoteconfig(_base_rc_config({"tracing_sampling_rate": None}))
call_apm_tracing_rc(_base_rc_config({"tracing_sampling_rate": None}))
with tracer.trace("test") as span:
pass
assert span.get_metric("_dd.rule_psr") is None, "Unsetting remote config trace sample rate"

config._handle_remoteconfig(_base_rc_config({"tracing_sampling_rate": 0.8}))
call_apm_tracing_rc(_base_rc_config({"tracing_sampling_rate": 0.8}))
with tracer.trace("test") as span:
pass
assert span.get_metric("_dd.rule_psr") == 0.8

config._handle_remoteconfig(_base_rc_config({"tracing_sampling_rate": None}))
call_apm_tracing_rc(_base_rc_config({"tracing_sampling_rate": None}))
with tracer.trace("test") as span:
pass
assert span.get_metric("_dd.rule_psr") is None, "(second time) unsetting remote config trace sample rate"
Expand All @@ -182,8 +183,9 @@ def test_remoteconfig_sampling_rate_telemetry(test_agent_session, run_python_cod
"""
from ddtrace import config, tracer
from tests.internal.test_settings import _base_rc_config
from tests.internal.test_settings import call_apm_tracing_rc

config._handle_remoteconfig(
call_apm_tracing_rc(
_base_rc_config(
{
"tracing_sampling_rules": [
Expand Down Expand Up @@ -230,8 +232,9 @@ def test_remoteconfig_header_tags_telemetry(test_agent_session, run_python_code_
from ddtrace import config, tracer
from ddtrace.contrib import trace_utils
from tests.internal.test_settings import _base_rc_config
from tests.internal.test_settings import call_apm_tracing_rc

config._handle_remoteconfig(_base_rc_config({
call_apm_tracing_rc(_base_rc_config({
"tracing_header_tags": [
{"header": "used", "tag_name":"header_tag_69"},
{"header": "unused", "tag_name":"header_tag_70"},
Expand Down
Loading
Loading