Skip to content

Commit 135ae7c

Browse files
ZStriker19AhmadMasrymabdinur
authored
feat: add valkey instrumentation support (#12060)
**This PR is the work of [AhmadMasry](https://github.com/AhmadMasry)** Moving from #12003 since we have issues running certain tests on forked repos. The Valkey instrumentation is based on the current implementation of the Redis instrumentation, but keeping into consideration that the two projects may deviate and lose compatibility, the Valkey instrumentation is created as a separated module. ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --------- Co-authored-by: AhmadMasry <[email protected]> Co-authored-by: Ahmad Al-Masry <[email protected]> Co-authored-by: Munir Abdinur <[email protected]>
1 parent 8f8aebe commit 135ae7c

File tree

63 files changed

+3750
-1
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+3750
-1
lines changed

.github/COMMIT_TEMPLATE.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,5 @@ feat/fix/docs/refactor/ci(xxx): commit title here
2929
# mysqlpython, openai, opentelemetry, opentracer, profile, psycopg, pylibmc, pymemcache,
3030
# pymongo, pymysql, pynamodb, pyodbc, pyramid, pytest, redis, rediscluster, requests, rq,
3131
# sanic, snowflake, sourcecode, sqlalchemy, starlette, stdlib, structlog, subprocess,
32-
# telemetry, test_logging, tornado, tracer, unittest, urllib3, vendor, vertica, wsgi,
32+
# telemetry, test_logging, tornado, tracer, unittest, urllib3, valkey, vendor, vertica, wsgi,
3333
# yaaredis

.gitlab/services.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
redis:
2929
name: registry.ddbuild.io/redis:7.0.7
3030
alias: redis
31+
valkey:
32+
name: registry.ddbuild.io/images/mirror/valkey:8.0-alpine
33+
alias: valkey
3134
kafka:
3235
name: registry.ddbuild.io/images/mirror/apache/kafka:3.8.0
3336
alias: kafka
@@ -54,6 +57,9 @@
5457
rediscluster:
5558
name: registry.ddbuild.io/images/mirror/grokzen/redis-cluster:6.2.0
5659
alias: rediscluster
60+
valkeycluster:
61+
name: registry.ddbuild.io/images/mirror/grokzen/redis-cluster:6.2.0
62+
alias: valkeycluster
5763
elasticsearch:
5864
name: registry.ddbuild.io/images/mirror/library/elasticsearch:7.17.23
5965
alias: elasticsearch

.riot/requirements/11ac941.txt

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#
2+
# This file is autogenerated by pip-compile with Python 3.8
3+
# by the following command:
4+
#
5+
# pip-compile --allow-unsafe --no-annotate .riot/requirements/11ac941.in
6+
#
7+
async-timeout==5.0.1
8+
attrs==24.3.0
9+
coverage[toml]==7.6.1
10+
exceptiongroup==1.2.2
11+
hypothesis==6.45.0
12+
importlib-metadata==8.5.0
13+
iniconfig==2.0.0
14+
mock==5.1.0
15+
opentracing==2.4.0
16+
packaging==24.2
17+
pluggy==1.5.0
18+
pytest==8.3.4
19+
pytest-asyncio==0.23.7
20+
pytest-cov==5.0.0
21+
pytest-mock==3.14.0
22+
pytest-randomly==3.15.0
23+
sortedcontainers==2.4.0
24+
tomli==2.2.1
25+
valkey==6.0.2
26+
zipp==3.20.2

.riot/requirements/1e98e9b.txt

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#
2+
# This file is autogenerated by pip-compile with Python 3.9
3+
# by the following command:
4+
#
5+
# pip-compile --allow-unsafe --no-annotate .riot/requirements/1e98e9b.in
6+
#
7+
async-timeout==5.0.1
8+
attrs==24.3.0
9+
coverage[toml]==7.6.10
10+
exceptiongroup==1.2.2
11+
hypothesis==6.45.0
12+
importlib-metadata==8.5.0
13+
iniconfig==2.0.0
14+
mock==5.1.0
15+
opentracing==2.4.0
16+
packaging==24.2
17+
pluggy==1.5.0
18+
pytest==8.3.4
19+
pytest-asyncio==0.23.7
20+
pytest-cov==6.0.0
21+
pytest-mock==3.14.0
22+
pytest-randomly==3.16.0
23+
sortedcontainers==2.4.0
24+
tomli==2.2.1
25+
valkey==6.0.2
26+
zipp==3.21.0

.riot/requirements/4aa2a2a.txt

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#
2+
# This file is autogenerated by pip-compile with Python 3.11
3+
# by the following command:
4+
#
5+
# pip-compile --allow-unsafe --no-annotate .riot/requirements/4aa2a2a.in
6+
#
7+
async-timeout==5.0.1
8+
attrs==24.3.0
9+
coverage[toml]==7.6.10
10+
hypothesis==6.45.0
11+
iniconfig==2.0.0
12+
mock==5.1.0
13+
opentracing==2.4.0
14+
packaging==24.2
15+
pluggy==1.5.0
16+
pytest==8.3.4
17+
pytest-asyncio==0.23.7
18+
pytest-cov==6.0.0
19+
pytest-mock==3.14.0
20+
pytest-randomly==3.16.0
21+
sortedcontainers==2.4.0
22+
valkey==6.0.2

.riot/requirements/7219cf4.txt

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#
2+
# This file is autogenerated by pip-compile with Python 3.13
3+
# by the following command:
4+
#
5+
# pip-compile --allow-unsafe --no-annotate .riot/requirements/7219cf4.in
6+
#
7+
attrs==24.3.0
8+
coverage[toml]==7.6.10
9+
hypothesis==6.45.0
10+
iniconfig==2.0.0
11+
mock==5.1.0
12+
opentracing==2.4.0
13+
packaging==24.2
14+
pluggy==1.5.0
15+
pytest==8.3.4
16+
pytest-asyncio==0.23.7
17+
pytest-cov==6.0.0
18+
pytest-mock==3.14.0
19+
pytest-randomly==3.16.0
20+
sortedcontainers==2.4.0
21+
valkey==6.0.2

.riot/requirements/b96b665.txt

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#
2+
# This file is autogenerated by pip-compile with Python 3.12
3+
# by the following command:
4+
#
5+
# pip-compile --allow-unsafe --no-annotate .riot/requirements/b96b665.in
6+
#
7+
attrs==24.3.0
8+
coverage[toml]==7.6.10
9+
hypothesis==6.45.0
10+
iniconfig==2.0.0
11+
mock==5.1.0
12+
opentracing==2.4.0
13+
packaging==24.2
14+
pluggy==1.5.0
15+
pytest==8.3.4
16+
pytest-asyncio==0.23.7
17+
pytest-cov==6.0.0
18+
pytest-mock==3.14.0
19+
pytest-randomly==3.16.0
20+
sortedcontainers==2.4.0
21+
valkey==6.0.2

.riot/requirements/dd68acc.txt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#
2+
# This file is autogenerated by pip-compile with Python 3.10
3+
# by the following command:
4+
#
5+
# pip-compile --allow-unsafe --no-annotate .riot/requirements/dd68acc.in
6+
#
7+
async-timeout==5.0.1
8+
attrs==24.3.0
9+
coverage[toml]==7.6.10
10+
exceptiongroup==1.2.2
11+
hypothesis==6.45.0
12+
iniconfig==2.0.0
13+
mock==5.1.0
14+
opentracing==2.4.0
15+
packaging==24.2
16+
pluggy==1.5.0
17+
pytest==8.3.4
18+
pytest-asyncio==0.23.7
19+
pytest-cov==6.0.0
20+
pytest-mock==3.14.0
21+
pytest-randomly==3.16.0
22+
sortedcontainers==2.4.0
23+
tomli==2.2.1
24+
valkey==6.0.2

ddtrace/_monkey.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
"unittest": True,
106106
"coverage": False,
107107
"selenium": True,
108+
"valkey": True,
108109
}
109110

110111

ddtrace/_trace/trace_handlers.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,11 @@ def _on_redis_command_post(ctx: core.ExecutionContext, rowcount):
688688
ctx.span.set_metric(db.ROWCOUNT, rowcount)
689689

690690

691+
def _on_valkey_command_post(ctx: core.ExecutionContext, rowcount):
692+
if rowcount is not None:
693+
ctx.span.set_metric(db.ROWCOUNT, rowcount)
694+
695+
691696
def _on_test_visibility_enable(config) -> None:
692697
from ddtrace.internal.ci_visibility import CIVisibility
693698

@@ -797,6 +802,8 @@ def listen():
797802
core.on("botocore.kinesis.GetRecords.post", _on_botocore_kinesis_getrecords_post)
798803
core.on("redis.async_command.post", _on_redis_command_post)
799804
core.on("redis.command.post", _on_redis_command_post)
805+
core.on("valkey.async_command.post", _on_valkey_command_post)
806+
core.on("valkey.command.post", _on_valkey_command_post)
800807
core.on("azure.functions.request_call_modifier", _on_azure_functions_request_span_modifier)
801808
core.on("azure.functions.start_response", _on_azure_functions_start_response)
802809

@@ -838,6 +845,7 @@ def listen():
838845
"botocore.patched_stepfunctions_api_call",
839846
"botocore.patched_bedrock_api_call",
840847
"redis.command",
848+
"valkey.command",
841849
"rq.queue.enqueue_job",
842850
"rq.traced_queue_fetch_job",
843851
"rq.worker.perform_job",

ddtrace/_trace/utils_valkey.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
"""
2+
Some utils used by the dogtrace valkey integration
3+
"""
4+
5+
from contextlib import contextmanager
6+
from typing import List
7+
from typing import Optional
8+
9+
from ddtrace.constants import _ANALYTICS_SAMPLE_RATE_KEY
10+
from ddtrace.constants import _SPAN_MEASURED_KEY
11+
from ddtrace.constants import SPAN_KIND
12+
from ddtrace.contrib import trace_utils
13+
from ddtrace.contrib.internal.valkey_utils import _extract_conn_tags
14+
from ddtrace.ext import SpanKind
15+
from ddtrace.ext import SpanTypes
16+
from ddtrace.ext import db
17+
from ddtrace.ext import valkey as valkeyx
18+
from ddtrace.internal import core
19+
from ddtrace.internal.constants import COMPONENT
20+
from ddtrace.internal.schema import schematize_cache_operation
21+
from ddtrace.internal.utils.formats import stringify_cache_args
22+
23+
24+
format_command_args = stringify_cache_args
25+
26+
27+
def _set_span_tags(
28+
span, pin, config_integration, args: Optional[List], instance, query: Optional[List], is_cluster: bool = False
29+
):
30+
span.set_tag_str(SPAN_KIND, SpanKind.CLIENT)
31+
span.set_tag_str(COMPONENT, config_integration.integration_name)
32+
span.set_tag_str(db.SYSTEM, valkeyx.APP)
33+
span.set_tag(_SPAN_MEASURED_KEY)
34+
if query is not None:
35+
span_name = schematize_cache_operation(valkeyx.RAWCMD, cache_provider=valkeyx.APP) # type: ignore[operator]
36+
span.set_tag_str(span_name, query)
37+
if pin.tags:
38+
span.set_tags(pin.tags)
39+
# some valkey clients do not have a connection_pool attribute (ex. aiovalkey v1.3)
40+
if not is_cluster and hasattr(instance, "connection_pool"):
41+
span.set_tags(_extract_conn_tags(instance.connection_pool.connection_kwargs))
42+
if args is not None:
43+
span.set_metric(valkeyx.ARGS_LEN, len(args))
44+
else:
45+
for attr in ("command_stack", "_command_stack"):
46+
if hasattr(instance, attr):
47+
span.set_metric(valkeyx.PIPELINE_LEN, len(getattr(instance, attr)))
48+
# set analytics sample rate if enabled
49+
span.set_tag(_ANALYTICS_SAMPLE_RATE_KEY, config_integration.get_analytics_sample_rate())
50+
51+
52+
@contextmanager
53+
def _instrument_valkey_cmd(pin, config_integration, instance, args):
54+
query = stringify_cache_args(args, cmd_max_len=config_integration.cmd_max_length)
55+
with core.context_with_data(
56+
"valkey.command",
57+
span_name=schematize_cache_operation(valkeyx.CMD, cache_provider=valkeyx.APP),
58+
pin=pin,
59+
service=trace_utils.ext_service(pin, config_integration),
60+
span_type=SpanTypes.VALKEY,
61+
resource=query.split(" ")[0] if config_integration.resource_only_command else query,
62+
) as ctx, ctx.span as span:
63+
_set_span_tags(span, pin, config_integration, args, instance, query)
64+
yield ctx
65+
66+
67+
@contextmanager
68+
def _instrument_valkey_execute_pipeline(pin, config_integration, cmds, instance, is_cluster=False):
69+
cmd_string = resource = "\n".join(cmds)
70+
if config_integration.resource_only_command:
71+
resource = "\n".join([cmd.split(" ")[0] for cmd in cmds])
72+
73+
with pin.tracer.trace(
74+
schematize_cache_operation(valkeyx.CMD, cache_provider=valkeyx.APP),
75+
resource=resource,
76+
service=trace_utils.ext_service(pin, config_integration),
77+
span_type=SpanTypes.VALKEY,
78+
) as span:
79+
_set_span_tags(span, pin, config_integration, None, instance, cmd_string)
80+
yield span
81+
82+
83+
@contextmanager
84+
def _instrument_valkey_execute_async_cluster_pipeline(pin, config_integration, cmds, instance):
85+
cmd_string = resource = "\n".join(cmds)
86+
if config_integration.resource_only_command:
87+
resource = "\n".join([cmd.split(" ")[0] for cmd in cmds])
88+
89+
with pin.tracer.trace(
90+
schematize_cache_operation(valkeyx.CMD, cache_provider=valkeyx.APP),
91+
resource=resource,
92+
service=trace_utils.ext_service(pin, config_integration),
93+
span_type=SpanTypes.VALKEY,
94+
) as span:
95+
_set_span_tags(span, pin, config_integration, None, instance, cmd_string)
96+
yield span
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from ddtrace import config
2+
from ddtrace._trace.utils_valkey import _instrument_valkey_cmd
3+
from ddtrace._trace.utils_valkey import _instrument_valkey_execute_async_cluster_pipeline
4+
from ddtrace._trace.utils_valkey import _instrument_valkey_execute_pipeline
5+
from ddtrace.contrib.internal.valkey_utils import _run_valkey_command_async
6+
from ddtrace.internal.utils.formats import stringify_cache_args
7+
from ddtrace.trace import Pin
8+
9+
10+
async def instrumented_async_execute_command(func, instance, args, kwargs):
11+
pin = Pin.get_from(instance)
12+
if not pin or not pin.enabled():
13+
return await func(*args, **kwargs)
14+
15+
with _instrument_valkey_cmd(pin, config.valkey, instance, args) as ctx:
16+
return await _run_valkey_command_async(ctx=ctx, func=func, args=args, kwargs=kwargs)
17+
18+
19+
async def instrumented_async_execute_pipeline(func, instance, args, kwargs):
20+
pin = Pin.get_from(instance)
21+
if not pin or not pin.enabled():
22+
return await func(*args, **kwargs)
23+
24+
cmds = [stringify_cache_args(c, cmd_max_len=config.valkey.cmd_max_length) for c, _ in instance.command_stack]
25+
with _instrument_valkey_execute_pipeline(pin, config.valkey, cmds, instance):
26+
return await func(*args, **kwargs)
27+
28+
29+
async def instrumented_async_execute_cluster_pipeline(func, instance, args, kwargs):
30+
pin = Pin.get_from(instance)
31+
if not pin or not pin.enabled():
32+
return await func(*args, **kwargs)
33+
34+
cmds = [stringify_cache_args(c.args, cmd_max_len=config.valkey.cmd_max_length) for c in instance._command_stack]
35+
with _instrument_valkey_execute_async_cluster_pipeline(pin, config.valkey, cmds, instance):
36+
return await func(*args, **kwargs)

0 commit comments

Comments
 (0)