Skip to content

Commit ca79351

Browse files
fix(sampling): ensure agent based sampling is not reset after forking and on tracer.configure (#13560)
Builds on 7fbdc9f - Fix: Avoids reinitializing the SpanAggregator on `tracer.configure(...)` and when an application forks. Instead `SpanAggregator.reset()` is called. This operation ensures global configurations are re-applied, trace buffer can be reset, and trace writer is recreated. This ensures agent based sampling rules are not reset. - Clean up - Removes `writer` parameter from `SpanAggregator.__init__(...)` with this change the intialization of the global writer is an implementation detail of the SpanAggregator. There is no longer a need to supply the `SpanAggregator` with a writer on the initialization of the global tracer. - Moves all implementation details of resetting the `SpanAggregator` from `Tracer.configure(...)` and `Tracer._recreate(...)` to `SpanAggregator.reset(...)`. - Removes the initialization of the SpanAggregator from `_default_span_processors_factory`. With this change the global tracer's SpanAggregator is never re-created. It's only modified when `tracer.configure(..)` is used. - Rename `DatadogSampler._service_based_samplers` property to `DatadogSampler._agent_based_sampler` to improve clarity. These sampling rules are no longer supplied via environment variables or a programatic api, they can only be set by the Datadog Agent. - Splits `SpanAggregator.trace_proccessors` into two properties `SpanAggregator.dd_proccessors` and `SpanAggregator.user_processors`. `SpanAggregator.users_proccessors` is set after application startup via `Tracer.configure(..)` while `SpanAggregator.dd_proccessors` is internal to the ddtrace library and should only be set by ddtrace components. This separation allows us to avoid recreating all trace processors when `tracer.configure()` is called. - Adds a more descriptive release note to an unreleased fix. ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --------- Co-authored-by: Brett Langdon <[email protected]>
1 parent d49390c commit ca79351

File tree

13 files changed

+300
-100
lines changed

13 files changed

+300
-100
lines changed

ddtrace/_trace/processor/__init__.py

Lines changed: 81 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from os import environ
55
from threading import RLock
66
from typing import Dict
7-
from typing import Iterable
87
from typing import List
98
from typing import Optional
109

@@ -129,7 +128,13 @@ class TraceSamplingProcessor(TraceProcessor):
129128
Agent even if the dropped trace is not (as is the case when trace stats computation is enabled).
130129
"""
131130

132-
def __init__(self, compute_stats_enabled: bool, single_span_rules: List[SpanSamplingRule], apm_opt_out: bool):
131+
def __init__(
132+
self,
133+
compute_stats_enabled: bool,
134+
single_span_rules: List[SpanSamplingRule],
135+
apm_opt_out: bool,
136+
agent_based_samplers: Optional[dict] = None,
137+
):
133138
super(TraceSamplingProcessor, self).__init__()
134139
self._compute_stats_enabled = compute_stats_enabled
135140
self.single_span_rules = single_span_rules
@@ -138,9 +143,14 @@ def __init__(self, compute_stats_enabled: bool, single_span_rules: List[SpanSamp
138143
# If ASM is enabled but tracing is disabled,
139144
# we need to set the rate limiting to 1 trace per minute
140145
# for the backend to consider the service as alive.
141-
self.sampler = DatadogSampler(rate_limit=1, rate_limit_window=60e9, rate_limit_always_on=True)
146+
self.sampler = DatadogSampler(
147+
rate_limit=1,
148+
rate_limit_window=60e9,
149+
rate_limit_always_on=True,
150+
agent_based_samplers=agent_based_samplers,
151+
)
142152
else:
143-
self.sampler = DatadogSampler()
153+
self.sampler = DatadogSampler(agent_based_samplers=agent_based_samplers)
144154

145155
def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:
146156
if trace:
@@ -261,8 +271,8 @@ def __init__(
261271
self,
262272
partial_flush_enabled: bool,
263273
partial_flush_min_spans: int,
264-
trace_processors: Iterable[TraceProcessor],
265-
writer: Optional[TraceWriter] = None,
274+
dd_processors: Optional[List[TraceProcessor]] = None,
275+
user_processors: Optional[List[TraceProcessor]] = None,
266276
):
267277
# Set partial flushing
268278
self.partial_flush_enabled = partial_flush_enabled
@@ -272,12 +282,10 @@ def __init__(
272282
config._trace_compute_stats, get_span_sampling_rules(), asm_config._apm_opt_out
273283
)
274284
self.tags_processor = TraceTagsProcessor()
275-
self.trace_processors = trace_processors
276-
# Initialize writer
277-
if writer is not None:
278-
self.writer: TraceWriter = writer
279-
elif SpanAggregator._use_log_writer():
280-
self.writer = LogWriter()
285+
self.dd_processors = dd_processors or []
286+
self.user_processors = user_processors or []
287+
if SpanAggregator._use_log_writer():
288+
self.writer: TraceWriter = LogWriter()
281289
else:
282290
verify_url(agent_config.trace_agent_url)
283291
self.writer = AgentWriter(
@@ -308,7 +316,9 @@ def __repr__(self) -> str:
308316
f"{self.partial_flush_min_spans}, "
309317
f"{self.sampling_processor},"
310318
f"{self.tags_processor},"
311-
f"{self.trace_processors}, "
319+
f"{self.dd_processors}, "
320+
f"{self.user_processors}, "
321+
f"{self._span_metrics}, "
312322
f"{self.writer})"
313323
)
314324

@@ -373,7 +383,9 @@ def on_span_finish(self, span: Span) -> None:
373383
finished[0].set_metric("_dd.py.partial_flush", num_finished)
374384

375385
spans: Optional[List[Span]] = finished
376-
for tp in chain(self.trace_processors, [self.sampling_processor, self.tags_processor]):
386+
for tp in chain(
387+
self.dd_processors, self.user_processors, [self.sampling_processor, self.tags_processor]
388+
):
377389
try:
378390
if spans is None:
379391
return
@@ -484,3 +496,58 @@ def _queue_span_count_metrics(self, metric_name: str, tag_name: str, min_count:
484496
TELEMETRY_NAMESPACE.TRACERS, metric_name, count, tags=((tag_name, tag_value),)
485497
)
486498
self._span_metrics[metric_name] = defaultdict(int)
499+
500+
def reset(
501+
self,
502+
user_processors: Optional[List[TraceProcessor]] = None,
503+
compute_stats: Optional[bool] = None,
504+
apm_opt_out: Optional[bool] = None,
505+
appsec_enabled: Optional[bool] = None,
506+
reset_buffer: bool = True,
507+
) -> None:
508+
"""
509+
Resets the internal state of the SpanAggregator, including the writer, sampling processor,
510+
user-defined processors, and optionally the trace buffer and span metrics.
511+
512+
This method is typically used after a process fork or during runtime reconfiguration.
513+
Arguments that are None will not override existing values.
514+
"""
515+
try:
516+
# Stop the writer to ensure it is not running while we reconfigure it.
517+
self.writer.stop()
518+
except ServiceStatusError:
519+
# Writers like AgentWriter may not start until the first trace is encoded.
520+
# Stopping them before that will raise a ServiceStatusError.
521+
pass
522+
523+
if isinstance(self.writer, AgentWriter) and appsec_enabled:
524+
# Ensure AppSec metadata is encoded by setting the API version to v0.4.
525+
self.writer._api_version = "v0.4"
526+
# Re-create the writer to ensure it is consistent with updated configurations (ex: api_version)
527+
self.writer = self.writer.recreate()
528+
529+
# Recreate the sampling processor using new or existing config values.
530+
# If an argument is None, the current value is preserved.
531+
if compute_stats is None:
532+
compute_stats = self.sampling_processor._compute_stats_enabled
533+
if apm_opt_out is None:
534+
apm_opt_out = self.sampling_processor.apm_opt_out
535+
self.sampling_processor = TraceSamplingProcessor(
536+
compute_stats,
537+
get_span_sampling_rules(),
538+
apm_opt_out,
539+
self.sampling_processor.sampler._agent_based_samplers,
540+
)
541+
542+
# Update user processors if provided.
543+
if user_processors is not None:
544+
self.user_processors = user_processors
545+
546+
# Reset the trace buffer and span metrics.
547+
# Useful when forking to prevent sending duplicate spans from parent and child processes.
548+
if reset_buffer:
549+
self._traces = defaultdict(lambda: _Trace())
550+
self._span_metrics = {
551+
"spans_created": defaultdict(int),
552+
"spans_finished": defaultdict(int),
553+
}

ddtrace/_trace/sampler.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class DatadogSampler:
7575
"limiter",
7676
"rules",
7777
"_rate_limit_always_on",
78-
"_by_service_samplers",
78+
"_agent_based_samplers",
7979
)
8080
_default_key = "service:,env:"
8181

@@ -85,6 +85,7 @@ def __init__(
8585
rate_limit: Optional[int] = None,
8686
rate_limit_window: float = 1e9,
8787
rate_limit_always_on: bool = False,
88+
agent_based_samplers: Optional[Dict[str, RateSampler]] = None,
8889
):
8990
"""
9091
Constructor for DatadogSampler sampler
@@ -101,7 +102,7 @@ def __init__(
101102
else:
102103
self.rules: List[SamplingRule] = rules or []
103104
# Set Agent based samplers
104-
self._by_service_samplers: Dict[str, RateSampler] = {}
105+
self._agent_based_samplers = agent_based_samplers or {}
105106
# Set rate limiter
106107
self._rate_limit_always_on: bool = rate_limit_always_on
107108
if rate_limit is None:
@@ -119,10 +120,10 @@ def update_rate_by_service_sample_rates(self, rate_by_service: Dict[str, float])
119120
samplers: Dict[str, RateSampler] = {}
120121
for key, sample_rate in rate_by_service.items():
121122
samplers[key] = RateSampler(sample_rate)
122-
self._by_service_samplers = samplers
123+
self._agent_based_samplers = samplers
123124

124125
def __str__(self):
125-
rates = {key: sampler.sample_rate for key, sampler in self._by_service_samplers.items()}
126+
rates = {key: sampler.sample_rate for key, sampler in self._agent_based_samplers.items()}
126127
return "{}(agent_rates={!r}, limiter={!r}, rules={!r}), rate_limit_always_on={!r}".format(
127128
self.__class__.__name__,
128129
rates,
@@ -181,11 +182,11 @@ def sample(self, span: Span) -> bool:
181182
sample_rate = matched_rule.sample_rate
182183
else:
183184
key = self._key(span.service, span.get_tag(ENV_KEY))
184-
if key in self._by_service_samplers:
185+
if key in self._agent_based_samplers:
185186
# Agent service based sampling
186187
agent_service_based = True
187-
sampled = self._by_service_samplers[key].sample(span)
188-
sample_rate = self._by_service_samplers[key].sample_rate
188+
sampled = self._agent_based_samplers[key].sample(span)
189+
sample_rate = self._agent_based_samplers[key].sample_rate
189190

190191
if matched_rule or self._rate_limit_always_on:
191192
# Avoid rate limiting when trace sample rules and/or sample rates are NOT provided

ddtrace/_trace/tracer.py

Lines changed: 30 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,10 @@
4646
from ddtrace.internal.processor.endpoint_call_counter import EndpointCallCounterProcessor
4747
from ddtrace.internal.runtime import get_runtime_id
4848
from ddtrace.internal.schema.processor import BaseServiceProcessor
49-
from ddtrace.internal.service import ServiceStatusError
5049
from ddtrace.internal.utils import _get_metas_to_propagate
5150
from ddtrace.internal.utils.formats import format_trace_id
5251
from ddtrace.internal.writer import AgentWriter
5352
from ddtrace.internal.writer import HTTPWriter
54-
from ddtrace.internal.writer import TraceWriter
5553
from ddtrace.settings._config import config
5654
from ddtrace.settings.asm import config as asm_config
5755
from ddtrace.settings.peer_service import _ps_config
@@ -89,20 +87,9 @@ def _start_appsec_processor() -> Optional["AppSecSpanProcessor"]:
8987

9088

9189
def _default_span_processors_factory(
92-
trace_filters: List[TraceProcessor],
93-
writer: Optional[TraceWriter],
94-
partial_flush_enabled: bool,
95-
partial_flush_min_spans: int,
9690
profiling_span_processor: EndpointCallCounterProcessor,
97-
) -> Tuple[List[SpanProcessor], Optional["AppSecSpanProcessor"], SpanAggregator]:
91+
) -> Tuple[List[SpanProcessor], Optional["AppSecSpanProcessor"]]:
9892
"""Construct the default list of span processors to use."""
99-
trace_processors: List[TraceProcessor] = []
100-
trace_processors += [
101-
PeerServiceProcessor(_ps_config),
102-
BaseServiceProcessor(),
103-
]
104-
trace_processors += trace_filters
105-
10693
span_processors: List[SpanProcessor] = []
10794
span_processors += [TopLevelSpanProcessor()]
10895

@@ -140,14 +127,7 @@ def _default_span_processors_factory(
140127

141128
span_processors.append(profiling_span_processor)
142129

143-
# These need to run after all the other processors
144-
span_aggregagtor = SpanAggregator(
145-
partial_flush_enabled=partial_flush_enabled,
146-
partial_flush_min_spans=partial_flush_min_spans,
147-
trace_processors=trace_processors,
148-
writer=writer,
149-
)
150-
return span_processors, appsec_processor, span_aggregagtor
130+
return span_processors, appsec_processor
151131

152132

153133
class Tracer(object):
@@ -181,8 +161,6 @@ def __init__(self) -> None:
181161
"Initializing multiple Tracer instances is not supported. Use ``ddtrace.trace.tracer`` instead.",
182162
)
183163

184-
self._user_trace_processors: List[TraceProcessor] = []
185-
186164
# globally set tags
187165
self._tags = config.tags.copy()
188166

@@ -199,12 +177,13 @@ def __init__(self) -> None:
199177
config._trace_compute_stats = False
200178
# Direct link to the appsec processor
201179
self._endpoint_call_counter_span_processor = EndpointCallCounterProcessor()
202-
self._span_processors, self._appsec_processor, self._span_aggregator = _default_span_processors_factory(
203-
self._user_trace_processors,
204-
None,
205-
config._partial_flush_enabled,
206-
config._partial_flush_min_spans,
207-
self._endpoint_call_counter_span_processor,
180+
self._span_processors, self._appsec_processor = _default_span_processors_factory(
181+
self._endpoint_call_counter_span_processor
182+
)
183+
self._span_aggregator = SpanAggregator(
184+
partial_flush_enabled=config._partial_flush_enabled,
185+
partial_flush_min_spans=config._partial_flush_min_spans,
186+
dd_processors=[PeerServiceProcessor(_ps_config), BaseServiceProcessor()],
208187
)
209188
if config._data_streams_enabled:
210189
# Inline the import to avoid pulling in ddsketch or protobuf
@@ -389,13 +368,6 @@ def configure(
389368
if compute_stats_enabled is not None:
390369
config._trace_compute_stats = compute_stats_enabled
391370

392-
if isinstance(self._span_aggregator.writer, AgentWriter):
393-
if appsec_enabled:
394-
self._span_aggregator.writer._api_version = "v0.4"
395-
396-
if trace_processors:
397-
self._user_trace_processors = trace_processors
398-
399371
if any(
400372
x is not None
401373
for x in [
@@ -405,7 +377,9 @@ def configure(
405377
iast_enabled,
406378
]
407379
):
408-
self._recreate()
380+
self._recreate(
381+
trace_processors, compute_stats_enabled, asm_config._apm_opt_out, appsec_enabled, reset_buffer=False
382+
)
409383

410384
if context_provider is not None:
411385
self.context_provider = context_provider
@@ -433,31 +407,31 @@ def _generate_diagnostic_logs(self):
433407

434408
def _child_after_fork(self):
435409
self._pid = getpid()
436-
self._recreate()
410+
self._recreate(reset_buffer=True)
437411
self._new_process = True
438412

439-
def _recreate(self):
440-
"""Re-initialize the tracer's processors and trace writer. This method should only be used in tests."""
413+
def _recreate(
414+
self,
415+
trace_processors: Optional[List[TraceProcessor]] = None,
416+
compute_stats_enabled: Optional[bool] = None,
417+
apm_opt_out: Optional[bool] = None,
418+
appsec_enabled: Optional[bool] = None,
419+
reset_buffer: bool = True,
420+
) -> None:
421+
"""Re-initialize the tracer's processors and trace writer"""
441422
# Stop the writer.
442423
# This will stop the periodic thread in HTTPWriters, preventing memory leaks and unnecessary I/O.
443-
try:
444-
self._span_aggregator.writer.stop()
445-
except ServiceStatusError:
446-
# Some writers (ex: AgentWriter), start when the first trace chunk is encoded. Stopping
447-
# the writer before that point will raise a ServiceStatusError.
448-
pass
449-
# Re-create the background writer thread
450-
rules = self._span_aggregator.sampling_processor.sampler._by_service_samplers
451-
self._span_aggregator.writer = self._span_aggregator.writer.recreate()
452424
self.enabled = config._tracing_enabled
453-
self._span_processors, self._appsec_processor, self._span_aggregator = _default_span_processors_factory(
454-
self._user_trace_processors,
455-
self._span_aggregator.writer,
456-
self._span_aggregator.partial_flush_enabled,
457-
self._span_aggregator.partial_flush_min_spans,
425+
self._span_aggregator.reset(
426+
user_processors=trace_processors,
427+
compute_stats=compute_stats_enabled,
428+
apm_opt_out=apm_opt_out,
429+
appsec_enabled=appsec_enabled,
430+
reset_buffer=reset_buffer,
431+
)
432+
self._span_processors, self._appsec_processor = _default_span_processors_factory(
458433
self._endpoint_call_counter_span_processor,
459434
)
460-
self._span_aggregator.sampling_processor.sampler._by_service_samplers = rules.copy()
461435

462436
def _start_span_after_shutdown(
463437
self,

ddtrace/internal/ci_visibility/recorder.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -634,7 +634,7 @@ def disable(cls) -> None:
634634
log.debug("%s disabled", cls.__name__)
635635

636636
def _start_service(self) -> None:
637-
tracer_filters = self.tracer._user_trace_processors
637+
tracer_filters = self.tracer._span_aggregator.user_processors
638638
if not any(isinstance(tracer_filter, TraceCiVisibilityFilter) for tracer_filter in tracer_filters):
639639
tracer_filters += [TraceCiVisibilityFilter(self._tags, self._service)] # type: ignore[arg-type]
640640
self.tracer.configure(trace_processors=tracer_filters)

ddtrace/internal/writer/writer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,7 @@ def recreate(self) -> HTTPWriter:
553553
api_version=self._api_version,
554554
headers=self._headers,
555555
report_metrics=self._report_metrics,
556+
response_callback=self._response_cb,
556557
)
557558
return new_instance
558559

ddtrace/opentracer/tracer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def __init__(
104104
trace_processors = None
105105
if isinstance(self._config.get(keys.SETTINGS), dict) and self._config[keys.SETTINGS].get("FILTERS"): # type: ignore[union-attr]
106106
trace_processors = self._config[keys.SETTINGS]["FILTERS"] # type: ignore[index]
107-
self._dd_tracer._user_trace_processors = trace_processors
107+
self._dd_tracer._span_aggregator.user_processors = trace_processors
108108

109109
if self._config[keys.ENABLED]:
110110
self._dd_tracer.enabled = self._config[keys.ENABLED]
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
tracing: Resolves a sampling issue where agent-based sampling rates were not correctly applied after a process forked or the tracer was reconfigured.

0 commit comments

Comments
 (0)