|
7 | 7 | import os |
8 | 8 | import os.path |
9 | 9 | from pathlib import Path |
| 10 | +import time |
10 | 11 | import uuid |
11 | 12 | from urllib.parse import parse_qs |
12 | 13 | from typing import TypedDict, Literal, Any |
@@ -473,47 +474,135 @@ def _wait_for_no_capture_reason_span(self, data: dict): |
473 | 474 |
|
474 | 475 | def wait_for_code_origin_span(self, timeout: int = 5, threshold: int | None = None) -> bool: |
475 | 476 | self._span_found = False |
| 477 | + start_time = time.time() |
| 478 | + |
| 479 | + # Root cause of flakiness: There's a race condition where a trace file might be written |
| 480 | + # and ingested between threshold calculation and wait_for setup. When wait_for checks |
| 481 | + # existing data first, if that file doesn't have code origin yet (due to tracer timing), |
| 482 | + # it gets skipped. Then wait_for waits for new data, but since the file is already |
| 483 | + # ingested, no new data event fires, causing a timeout. |
| 484 | + # |
| 485 | + # Solution: Preserve the original threshold (max file number BEFORE the request) to ensure |
| 486 | + # we only check files written after the request. However, we also need to account for the |
| 487 | + # fact that files might be written and ingested very quickly. The key insight is that we |
| 488 | + # should use the original threshold as-is - it represents the boundary between "before |
| 489 | + # request" and "after request". Any file with number > threshold was written after the |
| 490 | + # request, regardless of when it was ingested. |
| 491 | + # |
| 492 | + # The race condition is mitigated by calculating threshold right before sending the request |
| 493 | + # (already done in calling code), minimizing the window where files can be written. |
476 | 494 | if threshold is None: |
477 | 495 | threshold = self._get_max_trace_file_number() |
| 496 | + logger.debug( |
| 497 | + f"[CODE_ORIGIN_DEBUG] wait_for_code_origin_span: threshold was None, calculated new threshold={threshold}" |
| 498 | + ) |
| 499 | + else: |
| 500 | + logger.debug(f"[CODE_ORIGIN_DEBUG] wait_for_code_origin_span: using provided threshold={threshold}") |
478 | 501 |
|
479 | | - interfaces.agent.wait_for( |
480 | | - lambda data: self._wait_for_code_origin_span(data, threshold=threshold), |
481 | | - timeout=timeout, |
| 502 | + # Check current max file number right before wait_for to see if any files were written |
| 503 | + # between threshold calculation and wait_for call |
| 504 | + current_max_before_wait = self._get_max_trace_file_number() |
| 505 | + logger.debug( |
| 506 | + f"[CODE_ORIGIN_DEBUG] wait_for_code_origin_span: threshold={threshold}, current_max_before_wait={current_max_before_wait}, diff={current_max_before_wait - threshold}" |
| 507 | + ) |
| 508 | + |
| 509 | + # Check what files exist in existing data that match our criteria |
| 510 | + existing_files_above_threshold = [] |
| 511 | + for data in interfaces.agent.get_data(_TRACES_PATH): |
| 512 | + log_filename_found = re.search(r"/(\d+)__", data["log_filename"]) |
| 513 | + if log_filename_found: |
| 514 | + file_number = int(log_filename_found.group(1)) |
| 515 | + if file_number > threshold: |
| 516 | + existing_files_above_threshold.append((file_number, data["log_filename"])) |
| 517 | + |
| 518 | + if existing_files_above_threshold: |
| 519 | + logger.debug( |
| 520 | + f"[CODE_ORIGIN_DEBUG] wait_for_code_origin_span: Found {len(existing_files_above_threshold)} existing files above threshold: {existing_files_above_threshold}" |
| 521 | + ) |
| 522 | + else: |
| 523 | + logger.debug("[CODE_ORIGIN_DEBUG] wait_for_code_origin_span: No existing files above threshold found") |
| 524 | + |
| 525 | + # Create a closure to capture start_time |
| 526 | + def wait_function(data: dict) -> bool: |
| 527 | + return self._wait_for_code_origin_span(data, threshold=threshold, start_time=start_time) |
| 528 | + |
| 529 | + interfaces.agent.wait_for(wait_function, timeout=timeout) |
| 530 | + |
| 531 | + elapsed = time.time() - start_time |
| 532 | + logger.debug( |
| 533 | + f"[CODE_ORIGIN_DEBUG] wait_for_code_origin_span: finished after {elapsed:.3f}s, span_found={self._span_found}" |
482 | 534 | ) |
483 | 535 | return self._span_found |
484 | 536 |
|
485 | 537 | def _get_max_trace_file_number(self) -> int: |
486 | 538 | """Get the maximum trace file number currently in the agent interface.""" |
487 | 539 | max_number = 0 |
| 540 | + file_numbers = [] |
488 | 541 | for data in interfaces.agent.get_data(_TRACES_PATH): |
489 | 542 | log_filename_found = re.search(r"/(\d+)__", data["log_filename"]) |
490 | 543 | if log_filename_found: |
491 | 544 | file_number = int(log_filename_found.group(1)) |
| 545 | + file_numbers.append(file_number) |
492 | 546 | max_number = max(max_number, file_number) |
| 547 | + logger.debug( |
| 548 | + f"[CODE_ORIGIN_DEBUG] _get_max_trace_file_number: max={max_number}, all_files={sorted(file_numbers)[-10:] if len(file_numbers) > 10 else sorted(file_numbers)}" |
| 549 | + ) |
493 | 550 | return max_number |
494 | 551 |
|
495 | | - def _wait_for_code_origin_span(self, data: dict, *, threshold: int) -> bool: |
| 552 | + def _wait_for_code_origin_span(self, data: dict, *, threshold: int, start_time: float | None = None) -> bool: |
496 | 553 | if data["path"] == _TRACES_PATH: |
497 | 554 | log_filename_found = re.search(r"/(\d+)__", data["log_filename"]) |
498 | 555 | if not log_filename_found: |
499 | 556 | return False |
500 | 557 |
|
501 | 558 | log_number = int(log_filename_found.group(1)) |
| 559 | + elapsed = (time.time() - start_time) if start_time else None |
| 560 | + logger.debug( |
| 561 | + f"[CODE_ORIGIN_DEBUG] _wait_for_code_origin_span: checking file {log_number}, threshold={threshold}, elapsed={elapsed:.3f}s" |
| 562 | + if elapsed |
| 563 | + else f"[CODE_ORIGIN_DEBUG] _wait_for_code_origin_span: checking file {log_number}, threshold={threshold}" |
| 564 | + ) |
| 565 | + |
502 | 566 | if log_number > threshold: |
| 567 | + logger.debug( |
| 568 | + f"[CODE_ORIGIN_DEBUG] _wait_for_code_origin_span: file {log_number} > threshold {threshold}, checking content" |
| 569 | + ) |
503 | 570 | content = data["request"]["content"] |
504 | 571 | if content: |
| 572 | + web_spans_found = 0 |
| 573 | + code_origin_found_count = 0 |
505 | 574 | for payload in content["tracerPayloads"]: |
506 | 575 | for chunk in payload["chunks"]: |
507 | 576 | for span in chunk["spans"]: |
508 | 577 | resource, resource_type = span.get("resource", ""), span.get("type") |
509 | 578 |
|
510 | 579 | if resource.startswith("GET") and resource_type == "web": |
| 580 | + web_spans_found += 1 |
511 | 581 | code_origin_type = span["meta"].get("_dd.code_origin.type", "") |
| 582 | + logger.debug( |
| 583 | + f"[CODE_ORIGIN_DEBUG] _wait_for_code_origin_span: file {log_number}, span resource={resource}, code_origin_type={code_origin_type}" |
| 584 | + ) |
512 | 585 |
|
513 | 586 | if code_origin_type == "entry": |
| 587 | + code_origin_found_count += 1 |
| 588 | + logger.debug( |
| 589 | + f"[CODE_ORIGIN_DEBUG] _wait_for_code_origin_span: file {log_number}, FOUND code origin 'entry' in span {resource}" |
| 590 | + ) |
514 | 591 | self._span_found = True |
515 | 592 | return True |
516 | 593 |
|
| 594 | + logger.debug( |
| 595 | + f"[CODE_ORIGIN_DEBUG] _wait_for_code_origin_span: file {log_number}, checked {web_spans_found} web spans, found {code_origin_found_count} with code origin" |
| 596 | + ) |
| 597 | + else: |
| 598 | + logger.debug( |
| 599 | + f"[CODE_ORIGIN_DEBUG] _wait_for_code_origin_span: file {log_number}, no content in request" |
| 600 | + ) |
| 601 | + else: |
| 602 | + logger.debug( |
| 603 | + f"[CODE_ORIGIN_DEBUG] _wait_for_code_origin_span: file {log_number} <= threshold {threshold}, skipping" |
| 604 | + ) |
| 605 | + |
517 | 606 | return False |
518 | 607 |
|
519 | 608 | def wait_for_telemetry(self, telemetry_type: str, timeout: int = 5) -> dict | None: |
|
0 commit comments