Skip to content

Commit cdad8b4

Browse files
author
Michele Mancioppi
authored
Natural span context continuation for boto3 sqs receive_message (#152)
1 parent 3a195fa commit cdad8b4

File tree

16 files changed

+345
-15
lines changed

16 files changed

+345
-15
lines changed

.dockerignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.gitignore

.pre-commit-config.yaml

+12-3
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,23 @@ repos:
88
additional_dependencies: ['click==8.0.4']
99

1010
- repo: https://github.com/pre-commit/mirrors-mypy
11-
rev: v0.971
11+
rev: v0.982
1212
hooks:
1313
- id: mypy
14-
entry: venv/bin/mypy
14+
entry: mypy
15+
additional_dependencies:
16+
- types-attrs==19.1.0
17+
- types-boto==2.49.17
18+
- types-PyMySQL==1.0.19
19+
- types-PyYAML==6.0.11
20+
- types-redis==4.3.20
21+
- types-requests==2.28.11.2
22+
- types-setuptools==65.5.0.2
23+
- types-urllib3<1.27
1524
exclude: '(src/test/|noxfile\.py)|(src/lumigo_opentelemetry/external/.*)'
1625
args: ['--explicit-package-bases', # Needed for src/ci
1726
'--namespace-packages', # Needed for opentelemetry package
18-
'--disallow-any-generics', '--disallow-untyped-defs', '--disallow-incomplete-defs', '--check-untyped-defs', '--no-implicit-optional', '--warn-redundant-casts', '--warn-unused-ignores', '--warn-return-any', '--no-implicit-reexport', '--strict-equality', '--ignore-missing-imports']
27+
'--disallow-any-generics', '--disallow-untyped-defs', '--disallow-incomplete-defs', '--check-untyped-defs', '--no-implicit-optional', '--warn-redundant-casts', '--warn-return-any', '--no-implicit-reexport', '--strict-equality', '--ignore-missing-imports']
1928

2029
- repo: https://gitlab.com/pycqa/flake8
2130
rev: 5.0.4

README.md

+17
Original file line numberDiff line numberDiff line change
@@ -193,3 +193,20 @@ tracer_provider.force_flush()
193193

194194
# Now the Python process can terminate, with all the spans closed so far sent to Lumigo
195195
```
196+
197+
### Consuming SQS messages with Boto3 receive_message
198+
199+
Messaging instrumentations that retrieve messages from queues tend to be counter-intuitive for end-users: when retrivieng one of more messages from the queue, one would natutally expect that all calls done _using data from those messages_, e.g., sending their content to a database or another queue, would result in spans that are children of the describing the retrivieng of those messages.
200+
201+
Consider the following scenario, which is supported by the `boto3` SQS `receive_message` instrumentation of the Lumigo OpenTelemetry Distro for Python:
202+
203+
```python
204+
response = client.receive_message(...) # Instrumentation creates a `span_0` span
205+
206+
for message in response.get("Messages", []):
207+
# The SQS.ReceiveMessage span is active in this scope
208+
with trace.start_as_current_span("span_1"): # span_0 is the parent of span_1
209+
do_something()
210+
```
211+
212+
Without the scope provided by the iterator over `response["Messages"]`, `span_1` would be without a parent span, and that would result in a separate invocation and a separate transaction in Lumigo.

noxfile.py

+63-2
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
from __future__ import annotations
22
import os
3+
import sys
34
import tempfile
4-
from xml.etree import ElementTree
55
import time
66
from typing import List, Union, Optional
7+
from xml.etree import ElementTree
78

89
import nox
910
import requests
1011
import yaml
1112

12-
from src.ci.tested_versions_utils import (
13+
# Ensure nox can load local packages
14+
repo_dir = os.path.dirname(__file__)
15+
if repo_dir not in sys.path:
16+
sys.path.append(repo_dir)
17+
18+
from src.ci.tested_versions_utils import ( # noqa: E402
1319
NonSemanticVersion,
1420
SemanticVersion,
1521
TestedVersions,
@@ -117,6 +123,61 @@ def dependency_versions_to_be_tested(
117123
]
118124

119125

126+
@nox.session(python=python_versions())
127+
@nox.parametrize(
128+
"boto3_version",
129+
dependency_versions_to_be_tested(
130+
directory="boto3",
131+
dependency_name="boto3",
132+
test_untested_versions=should_test_only_untested_versions(),
133+
),
134+
)
135+
def integration_tests_boto3_sqs(
136+
session,
137+
boto3_version,
138+
):
139+
with TestedVersions.save_tests_result("botocore", "boto3", boto3_version):
140+
install_package("boto3", boto3_version, session)
141+
142+
session.install(".")
143+
144+
abs_path = os.path.abspath("src/test/integration/boto3-sqs/")
145+
with tempfile.NamedTemporaryFile(suffix=".txt", prefix=abs_path) as temp_file:
146+
full_path = f"{temp_file}.txt"
147+
148+
with session.chdir("src/test/integration/boto3-sqs"):
149+
session.install("-r", "requirements_others.txt")
150+
151+
try:
152+
session.run(
153+
"sh",
154+
"./scripts/run_app",
155+
env={
156+
"LUMIGO_DEBUG_SPANDUMP": full_path,
157+
},
158+
external=True,
159+
) # One happy day we will have https://github.com/wntrblm/nox/issues/198
160+
161+
# TODO Make this deterministic
162+
# Give time for app to start
163+
time.sleep(8)
164+
165+
session.run(
166+
"pytest",
167+
"--tb",
168+
"native",
169+
"--log-cli-level=INFO",
170+
"--color=yes",
171+
"-v",
172+
"./tests/test_boto3_sqs.py",
173+
env={
174+
"LUMIGO_DEBUG_SPANDUMP": full_path,
175+
},
176+
)
177+
finally:
178+
kill_process_and_clean_outputs(full_path, "run_app", session)
179+
180+
120181
@nox.session(python=python_versions())
121182
@nox.parametrize(
122183
"boto3_version",

requirements.txt

+7-7
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
pre-commit==2.20.0
22
nox==2022.1.7
3-
black==22.3.0
4-
click==8.0.4
5-
flake8==4.0.1
63
pytest==7.1.1
74
mock==4.0.3
85
pytest-cov==3.0.0
96
capturer==3.0
107
attrs==21.4.0
118
requests==2.27.1
9+
httpretty==1.1.4
1210
moto==3.1.5
1311
itsdangerous==2.0.1
1412
Jinja2==3.0.3
@@ -19,9 +17,11 @@ zipp==3.8.0
1917
psutil==5.9.1
2018
pyyaml==6.0
2119
attrs==21.4.0
22-
mypy==0.971
23-
types-requests==2.28.10
24-
types-PyMySQL==1.0.19
20+
types-attrs==19.1.0
2521
types-boto==2.49.17
26-
types-redis==4.3.20
22+
types-PyMySQL==1.0.19
2723
types-PyYAML==6.0.11
24+
types-redis==4.3.20
25+
types-requests==2.28.11.2
26+
types-setuptools==65.5.0.2
27+
types-urllib3<1.27

src/lumigo_opentelemetry/external/botocore/extensions/sqs.py

+71-2
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@
1717
_AwsSdkExtension,
1818
_BotoResultT,
1919
)
20+
from opentelemetry import context as context_api
2021
from opentelemetry.semconv.trace import SpanAttributes
21-
from opentelemetry.trace.span import Span
22+
from opentelemetry.trace import get_current_span
23+
from opentelemetry.trace.propagation import _SPAN_KEY
24+
from opentelemetry.trace.span import NonRecordingSpan, Span
25+
from typing import Generator
2226

2327
_SUPPORTED_OPERATIONS = ["SendMessage", "SendMessageBatch", "ReceiveMessage"]
2428

2529

2630
class _SqsExtension(_AwsSdkExtension):
31+
2732
def extract_attributes(self, attributes: _AttributeMapT):
2833
queue_url = self._call_context.params.get("QueueUrl")
2934
if queue_url:
@@ -52,4 +57,68 @@ def on_success(self, span: Span, result: _BotoResultT):
5257
span.set_attribute(
5358
SpanAttributes.MESSAGING_MESSAGE_ID,
5459
result["Messages"][0]["MessageId"],
55-
)
60+
)
61+
62+
# Replace the 'Messages' list with a list that restores `span` as trace
63+
# context over iterations.
64+
ctx = _SqsExtension.ScopeContext(span)
65+
result["Messages"] = _SqsExtension.ContextableList(result["Messages"], ctx)
66+
67+
class ScopeContext:
68+
def __init__(self, span: Span):
69+
self._span = span
70+
self._scope_depth = 0
71+
self._token = None
72+
73+
def _enter(self):
74+
# NonRecordingSpan is what we get when there is no span in the context
75+
if self._scope_depth == 0 and isinstance(get_current_span(), NonRecordingSpan):
76+
# If there is not current active span (e.g. in case of an overarching entry span),
77+
# we restore as active the "ReceiveMessage" span. We neither record exceptions on it
78+
# nor set statuses, as the "ReceiveMessage" span is already ended when it is used as
79+
# parent by spans created while iterating on the messages.
80+
self._token = context_api.attach(context_api.set_value(_SPAN_KEY, self._span))
81+
self._scope_depth = 1
82+
else:
83+
self._scope_depth = self._scope_depth + 1
84+
85+
def _exit(self):
86+
if not self._scope_depth:
87+
# Something very fishy going on!
88+
return
89+
90+
self._scope_depth = self._scope_depth - 1
91+
92+
if self._scope_depth == 0 and self._token:
93+
# Clear the context outside of the iteration, so that a second iteration would
94+
# revert to the previous context
95+
context_api.detach(self._token)
96+
self._token = None
97+
98+
class ContextableList(list):
99+
"""
100+
Since the classic way to process SQS messages is using a `for` loop, without a well defined scope like a
101+
callback - we are doing something similar to the instrumentation of Kafka-python and instrumenting the
102+
`__iter__` functions and the `__getitem__` functions to set the span context of the "ReceiveMessage" span.
103+
104+
Since the return value from an `SQS.ReceiveMessage` returns a builtin list, we cannot wrap it and change
105+
all of the calls for `list.__iter__` and `list.__getitem__` - therefore we use ContextableList.
106+
107+
This object also implements the `__enter__` and `__exit__` objects for context managers.
108+
"""
109+
def __init__(self, l, scopeContext):
110+
super().__init__(l)
111+
self._scopeContext = scopeContext
112+
113+
# Iterator support
114+
def __iter__(self) -> Generator:
115+
# Try / finally ensures that we detach the context over a `break` issues from inside an iteration
116+
try:
117+
self._scopeContext._enter()
118+
119+
index = 0
120+
while index < len(self):
121+
yield self[index]
122+
index = index + 1
123+
finally:
124+
self._scopeContext._exit()

src/lumigo_opentelemetry/libs/general_utils.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
try:
2-
from collections import Iterable
2+
from collections import Iterable # type: ignore
33
except ImportError:
44
# collections.Iterables was removed in Python 3.10
55
from collections.abc import Iterable
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
bin
2+
include
3+
lib
4+
lib64
5+
share
6+
pyvenv.cfg
+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import boto3
2+
3+
from lumigo_opentelemetry import tracer_provider
4+
from opentelemetry.trace import SpanKind
5+
6+
from moto import mock_sqs
7+
8+
9+
@mock_sqs
10+
def run():
11+
tracer = tracer_provider.get_tracer(__name__)
12+
13+
client = boto3.client("sqs", region_name="eu-central-1")
14+
queue = client.create_queue(QueueName="test")
15+
16+
client.send_message(
17+
QueueUrl=queue["QueueUrl"],
18+
MessageBody="Message_1",
19+
)
20+
client.send_message(
21+
QueueUrl=queue["QueueUrl"],
22+
MessageBody="Message_2",
23+
)
24+
25+
messages_copy = None
26+
27+
response = client.receive_message(QueueUrl=queue["QueueUrl"], MaxNumberOfMessages=1)
28+
29+
if response:
30+
if "Messages" in response and response["Messages"]:
31+
for message in response["Messages"]:
32+
# In the iterator, we restore the span for client.receive_message
33+
with tracer.start_as_current_span(
34+
"consuming_message_1", kind=SpanKind.INTERNAL
35+
) as processing_span:
36+
processing_span.set_attribute("message.body", message["Body"])
37+
38+
with tracer.start_as_current_span(
39+
"consuming_message_2", kind=SpanKind.INTERNAL
40+
) as processing_span:
41+
processing_span.set_attribute("foo", "bar")
42+
43+
messages_copy = response["Messages"].copy()
44+
45+
for message in messages_copy:
46+
with tracer.start_as_current_span(
47+
"iterator_on_copy", kind=SpanKind.INTERNAL
48+
) as span:
49+
span.set_attribute("foo", "bar")
50+
51+
response = client.receive_message(QueueUrl=queue["QueueUrl"], MaxNumberOfMessages=1)
52+
if response:
53+
for message in response["Messages"]:
54+
# In the iterator, we must restore the span for client.receive_message
55+
break # Here we are supposed to lose the trace context
56+
57+
with tracer.start_as_current_span(
58+
"after_iterator_break", kind=SpanKind.INTERNAL
59+
) as span:
60+
span.set_attribute("foo", "bar")
61+
62+
tracer_provider.force_flush()
63+
64+
65+
if __name__ == "__main__":
66+
run()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import pytest
2+
3+
from test.test_utils.spans_parser import SpansContainer
4+
5+
6+
@pytest.fixture(autouse=True)
7+
def increment_spans_counter():
8+
SpansContainer.increment_spans()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pytest
2+
psutil
3+
moto
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#!/bin/sh -e
2+
3+
script_dir=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd)
4+
5+
6+
python3 $(dirname $script_dir)/app/main.py

0 commit comments

Comments
 (0)