Skip to content

Commit 18653fa

Browse files
fix(profiling): clear finished thread ids from ThreadSpanLinks (#11235)
## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --------- Co-authored-by: Emmett Butler <[email protected]>
1 parent 3141c39 commit 18653fa

File tree

7 files changed

+141
-6
lines changed

7 files changed

+141
-6
lines changed

ddtrace/internal/datadog/profiling/stack_v2/include/constants.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#pragma once
2+
13
#include "dd_wrapper/include/constants.hpp"
24

35
// Default sampling frequency in microseconds. This will almost certainly be overridden by dynamic sampling.

ddtrace/internal/datadog/profiling/stack_v2/include/thread_span_links.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@ struct Span
2121
, span_type(_span_type)
2222
{
2323
}
24+
25+
// for testing
26+
bool operator==(const Span& other) const
27+
{
28+
return span_id == other.span_id && local_root_span_id == other.local_root_span_id &&
29+
span_type == other.span_type;
30+
}
2431
};
2532

2633
class ThreadSpanLinks
@@ -38,6 +45,7 @@ class ThreadSpanLinks
3845

3946
void link_span(uint64_t thread_id, uint64_t span_id, uint64_t local_root_span_id, std::string span_type);
4047
const std::optional<Span> get_active_span_from_thread_id(uint64_t thread_id);
48+
void unlink_span(uint64_t thread_id);
4149
void reset();
4250

4351
static void postfork_child();

ddtrace/internal/datadog/profiling/stack_v2/src/stack_v2.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ stack_v2_thread_unregister(PyObject* self, PyObject* args)
8686
}
8787

8888
Sampler::get().unregister_thread(id);
89+
ThreadSpanLinks::get_instance().unlink_span(id);
8990
Py_RETURN_NONE;
9091
}
9192

ddtrace/internal/datadog/profiling/stack_v2/src/thread_span_links.cpp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ ThreadSpanLinks::link_span(uint64_t thread_id, uint64_t span_id, uint64_t local_
1212
{
1313
std::lock_guard<std::mutex> lock(mtx);
1414

15-
if (thread_id_to_span.find(thread_id) == thread_id_to_span.end()) {
15+
auto it = thread_id_to_span.find(thread_id);
16+
if (it == thread_id_to_span.end()) {
1617
thread_id_to_span[thread_id] = std::make_unique<Span>(span_id, local_root_span_id, span_type);
18+
} else {
19+
it->second->span_id = span_id;
20+
it->second->local_root_span_id = local_root_span_id;
21+
it->second->span_type = span_type;
1722
}
18-
thread_id_to_span[thread_id]->span_id = span_id;
19-
thread_id_to_span[thread_id]->local_root_span_id = local_root_span_id;
20-
thread_id_to_span[thread_id]->span_type = span_type;
2123
}
2224

2325
const std::optional<Span>
@@ -33,6 +35,14 @@ ThreadSpanLinks::get_active_span_from_thread_id(uint64_t thread_id)
3335
return span;
3436
}
3537

38+
void
39+
ThreadSpanLinks::unlink_span(uint64_t thread_id)
40+
{
41+
std::lock_guard<std::mutex> lock(mtx);
42+
43+
thread_id_to_span.erase(thread_id); // This is a no-op if the key is not found
44+
}
45+
3646
void
3747
ThreadSpanLinks::reset()
3848
{

ddtrace/internal/datadog/profiling/stack_v2/test/thread_span_links.cpp

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
#include "thread_span_links.hpp"
2+
3+
#include <gmock/gmock.h>
14
#include <gtest/gtest.h>
25

6+
#include <optional>
7+
#include <random>
38
#include <string>
49
#include <thread>
5-
6-
#include "thread_span_links.hpp"
10+
#include <unordered_set>
711

812
static void
913
get()
@@ -39,6 +43,48 @@ TEST(ThreadSpanLinksConcurrency, GetSetRace)
3943
t2.join();
4044
}
4145

46+
TEST(ThreadSpanLinks, ClearFinished)
47+
{
48+
unsigned int num_thread_ids = 100;
49+
std::unordered_set<uint64_t> thread_ids;
50+
51+
std::random_device rd;
52+
std::mt19937 gen(rd());
53+
std::uniform_int_distribution<uint64_t> dis(0, UINT64_MAX);
54+
55+
// Generate random 100 native thread ids
56+
for (unsigned int i = 0; i < num_thread_ids; i++) {
57+
thread_ids.insert(dis(gen));
58+
}
59+
60+
// Call link_span with the thread ids
61+
for (auto thread_id : thread_ids) {
62+
Datadog::ThreadSpanLinks::get_instance().link_span(thread_id, thread_id, thread_id, "test");
63+
}
64+
65+
std::unordered_set<uint64_t> finished_threads;
66+
std::uniform_real_distribution<double> real_dis(0, 1);
67+
68+
for (auto thread_id : thread_ids) {
69+
if (real_dis(gen) < 0.5) {
70+
finished_threads.insert(thread_id);
71+
Datadog::ThreadSpanLinks::get_instance().unlink_span(thread_id);
72+
}
73+
}
74+
75+
// Check that the unseen ids are removed
76+
for (auto thread_id : thread_ids) {
77+
std::optional<Datadog::Span> span_opt =
78+
Datadog::ThreadSpanLinks::get_instance().get_active_span_from_thread_id(thread_id);
79+
if (finished_threads.find(thread_id) == finished_threads.end()) {
80+
EXPECT_EQ(span_opt, Datadog::Span(thread_id, thread_id, "test"));
81+
82+
} else {
83+
EXPECT_EQ(span_opt, std::nullopt);
84+
}
85+
}
86+
}
87+
4288
int
4389
main(int argc, char** argv)
4490
{
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
fixes:
3+
- |
4+
profiling: when a Python thread finishes, this change frees memory used for mapping
5+
its thread id to ``Span``. The mapping is populated and used when
6+
`DD_PROFILING_ENDPOINT_COLLECTION_ENABLED`` and
7+
``DD_PROFILING_STACK_V2_ENABLED`` were set to enable grouping of profiles
8+
for endpoints.

tests/profiling_v2/collector/test_stack.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import os
22
import sys
3+
import threading
34
import time
5+
from unittest.mock import patch
46
import uuid
57

68
import pytest
@@ -9,6 +11,7 @@
911
from ddtrace import tracer
1012
from ddtrace.internal.datadog.profiling import ddup
1113
from ddtrace.profiling.collector import stack
14+
from ddtrace.settings.profiling import config
1215
from tests.profiling.collector import pprof_utils
1316

1417

@@ -111,6 +114,63 @@ def test_push_span(stack_v2_enabled, tmp_path):
111114
)
112115

113116

117+
def test_push_span_unregister_thread(tmp_path, monkeypatch):
118+
if sys.version_info[:2] == (3, 7):
119+
pytest.skip("stack_v2 is not supported on Python 3.7")
120+
121+
with patch("ddtrace.internal.datadog.profiling.stack_v2.unregister_thread") as unregister_thread:
122+
monkeypatch.setattr(config.stack, "v2_enabled", True)
123+
tracer._endpoint_call_counter_span_processor.enable()
124+
125+
test_name = "test_push_span_unregister_thread"
126+
pprof_prefix = str(tmp_path / test_name)
127+
output_filename = pprof_prefix + "." + str(os.getpid())
128+
129+
assert ddup.is_available
130+
ddup.config(env="test", service=test_name, version="my_version", output_filename=pprof_prefix)
131+
ddup.start()
132+
133+
resource = str(uuid.uuid4())
134+
span_type = ext.SpanTypes.WEB
135+
136+
def target_fun():
137+
for _ in range(5):
138+
time.sleep(0.01)
139+
140+
with stack.StackCollector(
141+
None,
142+
tracer=tracer,
143+
endpoint_collection_enabled=True,
144+
ignore_profiler=True, # this is not necessary, but it's here to trim samples
145+
_stack_collector_v2_enabled=True,
146+
):
147+
with tracer.trace("foobar", resource=resource, span_type=span_type) as span:
148+
span_id = span.span_id
149+
local_root_span_id = span._local_root.span_id
150+
t = threading.Thread(target=target_fun)
151+
t.start()
152+
t.join()
153+
thread_id = t.ident
154+
ddup.upload()
155+
156+
profile = pprof_utils.parse_profile(output_filename)
157+
samples = pprof_utils.get_samples_with_label_key(profile, "span id")
158+
assert len(samples) > 0
159+
for sample in samples:
160+
pprof_utils.assert_stack_event(
161+
profile,
162+
sample,
163+
expected_event=pprof_utils.StackEvent(
164+
span_id=span_id,
165+
local_root_span_id=local_root_span_id,
166+
trace_type=span_type,
167+
trace_endpoint=resource,
168+
),
169+
)
170+
171+
unregister_thread.assert_called_once_with(thread_id)
172+
173+
114174
@pytest.mark.parametrize("stack_v2_enabled", [True, False])
115175
def test_push_non_web_span(stack_v2_enabled, tmp_path):
116176
if sys.version_info[:2] == (3, 7) and stack_v2_enabled:

0 commit comments

Comments
 (0)