Skip to content

Commit fce17db

Browse files
bourbonkkallen-k1m
andauthored
asyncio: fix duplicate instrumentation (#3408)
* #3383 fix duplicate instrument * #3383 fix duplicate instrument * feedback * feat(asyncio): add weakref-based tracking for instrumented objects * Use WeakKeyDictionary to safely track instrumented objects * feedback * feedback * feedback --------- Co-authored-by: allen <[email protected]>
1 parent 9c969f3 commit fce17db

File tree

4 files changed

+164
-1
lines changed

4 files changed

+164
-1
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111
1212
## Unreleased
1313

14+
- `opentelemetry-instrumentation-asyncio` Fix duplicate instrumentation.
15+
([[#3383](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3383)])
16+
1417
### Added
1518

1619
### Fixed
@@ -39,9 +42,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3942
([#3113](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3113))
4043
- `opentelemetry-instrumentation-grpc` Fix error when using gprc versions <= 1.50.0 with unix sockets.
4144
([[#3393](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3393)])
45+
- `opentelemetry-instrumentation-asyncio` Fix duplicate instrumentation.
46+
([[#3383](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3383)])
4247
- `opentelemetry-instrumentation-aiokafka` Fix send_and_wait method no headers kwargs error.
4348
([[#3332](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3332)])
4449

50+
4551
## Version 1.31.0/0.52b0 (2025-03-12)
4652

4753
### Added

instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py

+22-1
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ def func():
9393

9494
from wrapt import wrap_function_wrapper as _wrap
9595

96+
from opentelemetry.instrumentation.asyncio.instrumentation_state import (
97+
_is_instrumented,
98+
)
9699
from opentelemetry.instrumentation.asyncio.package import _instruments
97100
from opentelemetry.instrumentation.asyncio.utils import (
98101
get_coros_to_trace,
@@ -237,7 +240,12 @@ def wrap_taskgroup_create_task(method, instance, args, kwargs) -> None:
237240
)
238241

239242
def trace_to_thread(self, func: callable):
240-
"""Trace a function."""
243+
"""
244+
Trace a function, but if already instrumented, skip double-wrapping.
245+
"""
246+
if _is_instrumented(func):
247+
return func
248+
241249
start = default_timer()
242250
func_name = getattr(func, "__name__", None)
243251
if func_name is None and isinstance(func, functools.partial):
@@ -270,6 +278,13 @@ def trace_item(self, coro_or_future):
270278
return coro_or_future
271279

272280
async def trace_coroutine(self, coro):
281+
"""
282+
Wrap a coroutine so that we measure its duration, metrics, etc.
283+
If already instrumented, simply 'await coro' to preserve call behavior.
284+
"""
285+
if _is_instrumented(coro):
286+
return await coro
287+
273288
if not hasattr(coro, "__name__"):
274289
return await coro
275290
start = default_timer()
@@ -303,6 +318,12 @@ async def trace_coroutine(self, coro):
303318
self.record_process(start, attr, span, exception)
304319

305320
def trace_future(self, future):
321+
"""
322+
Wrap a Future's done callback. If already instrumented, skip re-wrapping.
323+
"""
324+
if _is_instrumented(future):
325+
return future
326+
306327
start = default_timer()
307328
span = (
308329
self._tracer.start_span(f"{ASYNCIO_PREFIX} future")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Instrumentation State Tracker
17+
18+
This module provides helper functions to safely track whether a coroutine,
19+
Future, or function has already been instrumented by the OpenTelemetry
20+
asyncio instrumentation layer.
21+
22+
Some Python objects (like coroutines or functions) may not support setting
23+
custom attributes or weak references. To avoid memory leaks and runtime
24+
errors, this module uses a WeakKeyDictionary to safely track instrumented
25+
objects.
26+
27+
If an object cannot be weak-referenced, it is silently skipped.
28+
29+
Usage:
30+
if not _is_instrumented(obj):
31+
_mark_instrumented(obj)
32+
# instrument the object...
33+
"""
34+
35+
import weakref
36+
from typing import Any
37+
38+
# A global WeakSet to track instrumented objects.
39+
# Entries are automatically removed when the objects are garbage collected.
40+
_instrumented_tasks = weakref.WeakSet()
41+
42+
43+
def _is_instrumented(obj: Any) -> bool:
44+
"""
45+
Check whether the object has already been instrumented.
46+
If not, mark it as instrumented (only if weakref is supported).
47+
48+
Args:
49+
obj: A coroutine, function, or Future.
50+
51+
Returns:
52+
True if the object was already instrumented.
53+
False if the object is not trackable (no weakref support), or just marked now.
54+
55+
Note:
56+
In Python 3.12+, some internal types like `async_generator_asend`
57+
raise TypeError when weakref is attempted.
58+
"""
59+
try:
60+
if obj in _instrumented_tasks:
61+
return True
62+
_instrumented_tasks.add(obj)
63+
return False
64+
except TypeError:
65+
# Object doesn't support weak references → can't track instrumentation
66+
return False
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
A general test verifying that when the same Future objects (or coroutines) are
17+
repeatedly instrumented (for example, via `trace_future`), callback references
18+
do not leak. In this example, we mimic a typical scenario where a small set of
19+
Futures might be reused throughout an application's lifecycle.
20+
"""
21+
22+
import asyncio
23+
24+
from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
25+
from opentelemetry.test.test_base import TestBase
26+
27+
28+
class TestAsyncioDuplicateInstrument(TestBase):
29+
"""
30+
Tests whether repeated instrumentation of the same Futures leads to
31+
exponential callback growth (potential memory leak).
32+
"""
33+
34+
def setUp(self):
35+
super().setUp()
36+
self.loop = asyncio.new_event_loop()
37+
asyncio.set_event_loop(self.loop)
38+
39+
self.instrumentor = AsyncioInstrumentor()
40+
self.instrumentor.instrument()
41+
42+
def tearDown(self):
43+
self.instrumentor.uninstrument()
44+
self.loop.close()
45+
asyncio.set_event_loop(None)
46+
super().tearDown()
47+
48+
def test_duplicate_instrumentation_of_futures(self):
49+
"""
50+
If instrumentor.trace_future is called multiple times on the same Future,
51+
we should NOT see an unbounded accumulation of callbacks.
52+
"""
53+
fut1 = asyncio.Future()
54+
fut2 = asyncio.Future()
55+
56+
num_iterations = 10
57+
for _ in range(num_iterations):
58+
self.instrumentor.trace_future(fut1)
59+
self.instrumentor.trace_future(fut2)
60+
61+
self.assertLessEqual(
62+
len(fut1._callbacks),
63+
1,
64+
f"fut1 has {len(fut1._callbacks)} callbacks. Potential leak!",
65+
)
66+
self.assertLessEqual(
67+
len(fut2._callbacks),
68+
1,
69+
f"fut2 has {len(fut2._callbacks)} callbacks. Potential leak!",
70+
)

0 commit comments

Comments
 (0)