Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run-end-to-end.yml
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ jobs:
run: ./run.sh DEBUGGER_SYMDB
- name: Run DEBUGGER_INPRODUCT_ENABLEMENT scenario
if: always() && steps.build.outcome == 'success' && contains(inputs.scenarios, '"DEBUGGER_INPRODUCT_ENABLEMENT"')
run: ./run.sh DEBUGGER_INPRODUCT_ENABLEMENT
run: ./run.sh DEBUGGER_INPRODUCT_ENABLEMENT --force-dd-trace-debug
- name: Run DEBUGGER_TELEMETRY scenario
if: always() && steps.build.outcome == 'success' && contains(inputs.scenarios, '"DEBUGGER_TELEMETRY"')
run: ./run.sh DEBUGGER_TELEMETRY
Expand Down
71 changes: 69 additions & 2 deletions tests/debugger/test_debugger_inproduct_enablement.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from utils import features, scenarios, context, logger, bug, missing_feature
import json
import time
import time as time_module

TIMEOUT = 5

Expand Down Expand Up @@ -173,13 +174,79 @@ class Test_Debugger_InProduct_Enablement_Code_Origin(debugger.BaseDebuggerTest):
########### code origin ############
def _check_code_origin(self):
"""Send a request and check if code origin spans are present."""
# Calculate threshold right before sending request to minimize race condition window.
# This ensures we only check traces generated after this point, avoiding issues where
# trace files might be ingested between threshold calculation and wait_for setup.
# If the threshold is calculated too early, a trace file might be written and ingested
# before wait_for is called. If that file doesn't have code origin (due to timing),
# it will be in the existing data when wait_for checks, will be skipped, and then
# wait_for will wait for new data that never comes (since the file is already ingested).
threshold_start = time_module.time()
threshold = self._get_max_trace_file_number()
threshold_elapsed = time_module.time() - threshold_start
logger.debug(
f"[CODE_ORIGIN_DEBUG] _check_code_origin: calculated threshold={threshold} in {threshold_elapsed:.3f}s"
)

request_start = time_module.time()
self.send_weblog_request("/")
return self.wait_for_code_origin_span(TIMEOUT)
request_elapsed = time_module.time() - request_start
logger.debug(f"[CODE_ORIGIN_DEBUG] _check_code_origin: sent request in {request_elapsed:.3f}s")

wait_start = time_module.time()
result = self.wait_for_code_origin_span(TIMEOUT, threshold=threshold)
wait_elapsed = time_module.time() - wait_start
logger.debug(
f"[CODE_ORIGIN_DEBUG] _check_code_origin: wait_for_code_origin_span returned {result} after {wait_elapsed:.3f}s"
)
return result

def _set_code_origin_and_check(self, *, enabled: bool | None):
"""Set code origin via remote config and check if spans are present."""
# Send remote config and wait for acknowledgment. The send_rc_apm_tracing call
# internally waits for acknowledgment and sleeps 2 seconds via send_state.
#
# Root cause of flakiness: There's a race condition where the remote config is
# acknowledged but not yet fully applied to the tracer's internal state when we
# send the weblog request. This is especially problematic in CI environments with
# higher latency. The trace is generated without code origin, causing wait_for_code_origin_span
# to timeout.
#
# Solution: The threshold calculation happens in _check_code_origin, which is called
# after send_rc_apm_tracing returns. However, the real issue is that we need to ensure
# the config is fully applied, not just acknowledged. The key insight is that we should
# calculate the threshold as late as possible - right before sending the request - to
# ensure we only look for traces generated after the config is applied. By inlining
# the threshold calculation here (instead of in _check_code_origin), we ensure it happens
# after the remote config is sent and right before the request, minimizing the race window.
rc_start = time_module.time()
self.send_rc_apm_tracing(code_origin_enabled=enabled)
return self._check_code_origin()
rc_elapsed = time_module.time() - rc_start
logger.debug(
f"[CODE_ORIGIN_DEBUG] _set_code_origin_and_check: send_rc_apm_tracing(enabled={enabled}) completed in {rc_elapsed:.3f}s"
)

# Calculate threshold right before sending request to minimize race condition window.
# This ensures we only check traces generated after the config change.
threshold_start = time_module.time()
threshold = self._get_max_trace_file_number()
threshold_elapsed = time_module.time() - threshold_start
logger.debug(
f"[CODE_ORIGIN_DEBUG] _set_code_origin_and_check: calculated threshold={threshold} in {threshold_elapsed:.3f}s"
)

request_start = time_module.time()
self.send_weblog_request("/")
request_elapsed = time_module.time() - request_start
logger.debug(f"[CODE_ORIGIN_DEBUG] _set_code_origin_and_check: sent request in {request_elapsed:.3f}s")

wait_start = time_module.time()
result = self.wait_for_code_origin_span(TIMEOUT, threshold=threshold)
wait_elapsed = time_module.time() - wait_start
logger.debug(
f"[CODE_ORIGIN_DEBUG] _set_code_origin_and_check: wait_for_code_origin_span returned {result} after {wait_elapsed:.3f}s"
)
return result

def setup_inproduct_enablement_code_origin(self):
self.initialize_weblog_remote_config()
Expand Down
102 changes: 96 additions & 6 deletions tests/debugger/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
import os.path
from pathlib import Path
import time
import uuid
from urllib.parse import parse_qs
from typing import TypedDict, Literal, Any
Expand Down Expand Up @@ -471,48 +472,137 @@ def _wait_for_no_capture_reason_span(self, data: dict):
logger.debug(f"No capture reason span found: {self._no_capture_reason_span_found}")
return self._no_capture_reason_span_found

def wait_for_code_origin_span(self, timeout: int = 5) -> bool:
def wait_for_code_origin_span(self, timeout: int = 5, threshold: int | None = None) -> bool:
self._span_found = False
threshold = self._get_max_trace_file_number()
start_time = time.time()

# Root cause of flakiness: There's a race condition where a trace file might be written
# and ingested between threshold calculation and wait_for setup. When wait_for checks
# existing data first, if that file doesn't have code origin yet (due to tracer timing),
# it gets skipped. Then wait_for waits for new data, but since the file is already
# ingested, no new data event fires, causing a timeout.
#
# Solution: Preserve the original threshold (max file number BEFORE the request) to ensure
# we only check files written after the request. However, we also need to account for the
# fact that files might be written and ingested very quickly. The key insight is that we
# should use the original threshold as-is - it represents the boundary between "before
# request" and "after request". Any file with number > threshold was written after the
# request, regardless of when it was ingested.
#
# The race condition is mitigated by calculating threshold right before sending the request
# (already done in calling code), minimizing the window where files can be written.
if threshold is None:
threshold = self._get_max_trace_file_number()
logger.debug(
f"[CODE_ORIGIN_DEBUG] wait_for_code_origin_span: threshold was None, calculated new threshold={threshold}"
)
else:
logger.debug(f"[CODE_ORIGIN_DEBUG] wait_for_code_origin_span: using provided threshold={threshold}")

interfaces.agent.wait_for(
lambda data: self._wait_for_code_origin_span(data, threshold=threshold),
timeout=timeout,
# Check current max file number right before wait_for to see if any files were written
# between threshold calculation and wait_for call
current_max_before_wait = self._get_max_trace_file_number()
logger.debug(
f"[CODE_ORIGIN_DEBUG] wait_for_code_origin_span: threshold={threshold}, current_max_before_wait={current_max_before_wait}, diff={current_max_before_wait - threshold}"
)

# Check what files exist in existing data that match our criteria
existing_files_above_threshold = []
for data in interfaces.agent.get_data(_TRACES_PATH):
log_filename_found = re.search(r"/(\d+)__", data["log_filename"])
if log_filename_found:
file_number = int(log_filename_found.group(1))
if file_number > threshold:
existing_files_above_threshold.append((file_number, data["log_filename"]))

if existing_files_above_threshold:
logger.debug(
f"[CODE_ORIGIN_DEBUG] wait_for_code_origin_span: Found {len(existing_files_above_threshold)} existing files above threshold: {existing_files_above_threshold}"
)
else:
logger.debug("[CODE_ORIGIN_DEBUG] wait_for_code_origin_span: No existing files above threshold found")

# Create a closure to capture start_time
def wait_function(data: dict) -> bool:
return self._wait_for_code_origin_span(data, threshold=threshold, start_time=start_time)

interfaces.agent.wait_for(wait_function, timeout=timeout)

elapsed = time.time() - start_time
logger.debug(
f"[CODE_ORIGIN_DEBUG] wait_for_code_origin_span: finished after {elapsed:.3f}s, span_found={self._span_found}"
)
return self._span_found

def _get_max_trace_file_number(self) -> int:
"""Get the maximum trace file number currently in the agent interface."""
max_number = 0
file_numbers = []
for data in interfaces.agent.get_data(_TRACES_PATH):
log_filename_found = re.search(r"/(\d+)__", data["log_filename"])
if log_filename_found:
file_number = int(log_filename_found.group(1))
file_numbers.append(file_number)
max_number = max(max_number, file_number)
logger.debug(
f"[CODE_ORIGIN_DEBUG] _get_max_trace_file_number: max={max_number}, all_files={sorted(file_numbers)[-10:] if len(file_numbers) > 10 else sorted(file_numbers)}"
)
return max_number

def _wait_for_code_origin_span(self, data: dict, *, threshold: int) -> bool:
def _wait_for_code_origin_span(self, data: dict, *, threshold: int, start_time: float | None = None) -> bool:
if data["path"] == _TRACES_PATH:
log_filename_found = re.search(r"/(\d+)__", data["log_filename"])
if not log_filename_found:
return False

log_number = int(log_filename_found.group(1))
elapsed = (time.time() - start_time) if start_time else None
logger.debug(
f"[CODE_ORIGIN_DEBUG] _wait_for_code_origin_span: checking file {log_number}, threshold={threshold}, elapsed={elapsed:.3f}s"
if elapsed
else f"[CODE_ORIGIN_DEBUG] _wait_for_code_origin_span: checking file {log_number}, threshold={threshold}"
)

if log_number > threshold:
logger.debug(
f"[CODE_ORIGIN_DEBUG] _wait_for_code_origin_span: file {log_number} > threshold {threshold}, checking content"
)
content = data["request"]["content"]
if content:
web_spans_found = 0
code_origin_found_count = 0
for payload in content["tracerPayloads"]:
for chunk in payload["chunks"]:
for span in chunk["spans"]:
resource, resource_type = span.get("resource", ""), span.get("type")

if resource.startswith("GET") and resource_type == "web":
web_spans_found += 1
code_origin_type = span["meta"].get("_dd.code_origin.type", "")
logger.debug(
f"[CODE_ORIGIN_DEBUG] _wait_for_code_origin_span: file {log_number}, span resource={resource}, code_origin_type={code_origin_type}"
)

if code_origin_type == "entry":
code_origin_found_count += 1
logger.debug(
f"[CODE_ORIGIN_DEBUG] _wait_for_code_origin_span: file {log_number}, FOUND code origin 'entry' in span {resource}"
)
self._span_found = True
return True

logger.debug(
f"[CODE_ORIGIN_DEBUG] _wait_for_code_origin_span: file {log_number}, checked {web_spans_found} web spans, found {code_origin_found_count} with code origin"
)
else:
logger.debug(
f"[CODE_ORIGIN_DEBUG] _wait_for_code_origin_span: file {log_number}, no content in request"
)
else:
logger.debug(
f"[CODE_ORIGIN_DEBUG] _wait_for_code_origin_span: file {log_number} <= threshold {threshold}, skipping"
)

return False

def wait_for_telemetry(self, telemetry_type: str, timeout: int = 5) -> dict | None:
Expand Down
Loading