Skip to content

Commit ac36ce2

Browse files
committed
Make ConcurrentMultiSpanProcessor fork safe
1 parent 3750c14 commit ac36ce2

File tree

3 files changed

+85
-0
lines changed

3 files changed

+85
-0
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2424
([#4847](https://github.com/open-telemetry/opentelemetry-python/pull/4847))
2525
- Prevent possible endless recursion from happening in `SimpleLogRecordProcessor.on_emit`,
2626
([#4799](https://github.com/open-telemetry/opentelemetry-python/pull/4799)).
27+
- Make ConcurrentMultiSpanProcessor fork safe
28+
([#4862](https://github.com/open-telemetry/opentelemetry-python/pull/4862))
2729

2830
## Version 1.39.0/0.60b0 (2025-12-03)
2931

opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
import concurrent.futures
1919
import json
2020
import logging
21+
import os
2122
import threading
2223
import traceback
2324
import typing
25+
import weakref
2426
from os import environ
2527
from time import time_ns
2628
from types import MappingProxyType, TracebackType
@@ -238,6 +240,16 @@ def __init__(self, num_threads: int = 2):
238240
# iterating through it on "on_start" and "on_end".
239241
self._span_processors = () # type: Tuple[SpanProcessor, ...]
240242
self._lock = threading.Lock()
243+
self._init_executor(num_threads)
244+
if hasattr(os, "register_at_fork"):
245+
# Only the main thread is kept in forked processed, the executor
246+
# needs to be re-instantiated to get a fresh pool of threads:
247+
weak_reinit = weakref.WeakMethod(self._init_executor)
248+
os.register_at_fork(
249+
after_in_child=lambda: weak_reinit()(num_threads)
250+
)
251+
252+
def _init_executor(self, num_threads: int) -> None:
241253
self._executor = concurrent.futures.ThreadPoolExecutor(
242254
max_workers=num_threads
243255
)

opentelemetry-sdk/tests/trace/test_span_processor.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,13 @@
1313
# limitations under the License.
1414

1515
import abc
16+
import gc
17+
import multiprocessing
18+
import os
1619
import time
1720
import typing
1821
import unittest
22+
import weakref
1923
from platform import python_implementation, system
2024
from threading import Event
2125
from typing import Optional
@@ -26,6 +30,10 @@
2630
from opentelemetry import trace as trace_api
2731
from opentelemetry.context import Context
2832
from opentelemetry.sdk import trace
33+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
34+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
35+
InMemorySpanExporter,
36+
)
2937

3038

3139
def span_event_start_fmt(span_processor_name, span_name):
@@ -486,3 +494,66 @@ def test_force_flush_late_by_span_processor(self):
486494
for mock_processor in mocks:
487495
self.assertEqual(1, mock_processor.force_flush.call_count)
488496
multi_processor.shutdown()
497+
498+
def test_processor_gc(self):
499+
multi_processor = trace.ConcurrentMultiSpanProcessor(5)
500+
weak_ref = weakref.ref(multi_processor)
501+
multi_processor.shutdown()
502+
503+
# When the processor is garbage collected
504+
del multi_processor
505+
gc.collect()
506+
507+
# Then the reference to the processor should no longer exist
508+
self.assertIsNone(
509+
weak_ref(),
510+
"The ConcurrentMultiSpanProcessor object created by this test wasn't garbage collected",
511+
)
512+
513+
@unittest.skipUnless(hasattr(os, "fork"), "needs *nix")
514+
def test_batch_span_processor_fork(self):
515+
multiprocessing_context = multiprocessing.get_context("fork")
516+
tracer_provider = trace.TracerProvider()
517+
tracer = tracer_provider.get_tracer(__name__)
518+
exporter = InMemorySpanExporter()
519+
multi_processor = trace.ConcurrentMultiSpanProcessor(2)
520+
multi_processor.add_span_processor(SimpleSpanProcessor(exporter))
521+
tracer_provider.add_span_processor(multi_processor)
522+
523+
# Use the ConcurrentMultiSpanProcessor in the main process.
524+
# This is necessary in this test to start using the underlying ThreadPoolExecutor and avoid false positive:
525+
with tracer.start_as_current_span("main process before fork span"):
526+
pass
527+
assert (
528+
exporter.get_finished_spans()[-1].name
529+
== "main process before fork span"
530+
)
531+
532+
# The forked ConcurrentMultiSpanProcessor is usable in the child process:
533+
def child(conn):
534+
with tracer.start_as_current_span("child process span"):
535+
pass
536+
conn.send(exporter.get_finished_spans()[-1].name)
537+
conn.close()
538+
539+
parent_conn, child_conn = multiprocessing_context.Pipe()
540+
process = multiprocessing_context.Process(
541+
target=child, args=(child_conn,)
542+
)
543+
process.start()
544+
has_response = parent_conn.poll(timeout=5)
545+
if not has_response:
546+
process.kill()
547+
self.fail(
548+
"The child process did not send any message after 5 seconds, it's very probably locked"
549+
)
550+
process.join(timeout=5)
551+
assert parent_conn.recv() == "child process span"
552+
553+
# The ConcurrentMultiSpanProcessor is still usable in the main process after the child process termination:
554+
with tracer.start_as_current_span("main process after fork span"):
555+
pass
556+
assert (
557+
exporter.get_finished_spans()[-1].name
558+
== "main process after fork span"
559+
)

0 commit comments

Comments
 (0)