Skip to content

Commit

Permalink
feat: add valkey instrumentation support (#12060)
Browse files Browse the repository at this point in the history
**This PR is the work of [AhmadMasry](https://github.com/AhmadMasry)**

Moving from #12003 since we
have issues running certain tests on forked repos.

The Valkey instrumentation is based on the current implementation of the
Redis instrumentation, but keeping into consideration that the two
projects may deviate and lose compatibility, the Valkey instrumentation
is created as a separated module.

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met 
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

---------

Co-authored-by: AhmadMasry <[email protected]>
Co-authored-by: Ahmad Al-Masry <[email protected]>
Co-authored-by: Munir Abdinur <[email protected]>
  • Loading branch information
4 people committed Feb 6, 2025
1 parent 3dc623d commit 278d98b
Show file tree
Hide file tree
Showing 63 changed files with 3,750 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/COMMIT_TEMPLATE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ feat/fix/docs/refactor/ci(xxx): commit title here
# mysqlpython, openai, opentelemetry, opentracer, profile, psycopg, pylibmc, pymemcache,
# pymongo, pymysql, pynamodb, pyodbc, pyramid, pytest, redis, rediscluster, requests, rq,
# sanic, snowflake, sourcecode, sqlalchemy, starlette, stdlib, structlog, subprocess,
# telemetry, test_logging, tornado, tracer, unittest, urllib3, vendor, vertica, wsgi,
# telemetry, test_logging, tornado, tracer, unittest, urllib3, valkey, vendor, vertica, wsgi,
# yaaredis
6 changes: 6 additions & 0 deletions .gitlab/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
redis:
name: registry.ddbuild.io/redis:7.0.7
alias: redis
valkey:
name: registry.ddbuild.io/images/mirror/valkey:8.0-alpine
alias: valkey
kafka:
name: registry.ddbuild.io/images/mirror/apache/kafka:3.8.0
alias: kafka
Expand All @@ -54,6 +57,9 @@
rediscluster:
name: registry.ddbuild.io/images/mirror/grokzen/redis-cluster:6.2.0
alias: rediscluster
valkeycluster:
name: registry.ddbuild.io/images/mirror/grokzen/redis-cluster:6.2.0
alias: valkeycluster
elasticsearch:
name: registry.ddbuild.io/images/mirror/library/elasticsearch:7.17.23
alias: elasticsearch
Expand Down
26 changes: 26 additions & 0 deletions .riot/requirements/11ac941.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# This file is autogenerated by pip-compile with Python 3.8
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/11ac941.in
#
async-timeout==5.0.1
attrs==24.3.0
coverage[toml]==7.6.1
exceptiongroup==1.2.2
hypothesis==6.45.0
importlib-metadata==8.5.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==5.0.0
pytest-mock==3.14.0
pytest-randomly==3.15.0
sortedcontainers==2.4.0
tomli==2.2.1
valkey==6.0.2
zipp==3.20.2
26 changes: 26 additions & 0 deletions .riot/requirements/1e98e9b.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# This file is autogenerated by pip-compile with Python 3.9
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/1e98e9b.in
#
async-timeout==5.0.1
attrs==24.3.0
coverage[toml]==7.6.10
exceptiongroup==1.2.2
hypothesis==6.45.0
importlib-metadata==8.5.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
tomli==2.2.1
valkey==6.0.2
zipp==3.21.0
22 changes: 22 additions & 0 deletions .riot/requirements/4aa2a2a.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# This file is autogenerated by pip-compile with Python 3.11
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/4aa2a2a.in
#
async-timeout==5.0.1
attrs==24.3.0
coverage[toml]==7.6.10
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
valkey==6.0.2
21 changes: 21 additions & 0 deletions .riot/requirements/7219cf4.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# This file is autogenerated by pip-compile with Python 3.13
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/7219cf4.in
#
attrs==24.3.0
coverage[toml]==7.6.10
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
valkey==6.0.2
21 changes: 21 additions & 0 deletions .riot/requirements/b96b665.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/b96b665.in
#
attrs==24.3.0
coverage[toml]==7.6.10
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
valkey==6.0.2
24 changes: 24 additions & 0 deletions .riot/requirements/dd68acc.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/dd68acc.in
#
async-timeout==5.0.1
attrs==24.3.0
coverage[toml]==7.6.10
exceptiongroup==1.2.2
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
tomli==2.2.1
valkey==6.0.2
1 change: 1 addition & 0 deletions ddtrace/_monkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
"unittest": True,
"coverage": False,
"selenium": True,
"valkey": True,
}


Expand Down
8 changes: 8 additions & 0 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,11 @@ def _on_redis_command_post(ctx: core.ExecutionContext, rowcount):
ctx.span.set_metric(db.ROWCOUNT, rowcount)


def _on_valkey_command_post(ctx: core.ExecutionContext, rowcount):
if rowcount is not None:
ctx.span.set_metric(db.ROWCOUNT, rowcount)


def _on_test_visibility_enable(config) -> None:
from ddtrace.internal.ci_visibility import CIVisibility

Expand Down Expand Up @@ -797,6 +802,8 @@ def listen():
core.on("botocore.kinesis.GetRecords.post", _on_botocore_kinesis_getrecords_post)
core.on("redis.async_command.post", _on_redis_command_post)
core.on("redis.command.post", _on_redis_command_post)
core.on("valkey.async_command.post", _on_valkey_command_post)
core.on("valkey.command.post", _on_valkey_command_post)
core.on("azure.functions.request_call_modifier", _on_azure_functions_request_span_modifier)
core.on("azure.functions.start_response", _on_azure_functions_start_response)

Expand Down Expand Up @@ -838,6 +845,7 @@ def listen():
"botocore.patched_stepfunctions_api_call",
"botocore.patched_bedrock_api_call",
"redis.command",
"valkey.command",
"rq.queue.enqueue_job",
"rq.traced_queue_fetch_job",
"rq.worker.perform_job",
Expand Down
96 changes: 96 additions & 0 deletions ddtrace/_trace/utils_valkey.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""
Some utils used by the dogtrace valkey integration
"""

from contextlib import contextmanager
from typing import List
from typing import Optional

from ddtrace.constants import _ANALYTICS_SAMPLE_RATE_KEY
from ddtrace.constants import _SPAN_MEASURED_KEY
from ddtrace.constants import SPAN_KIND
from ddtrace.contrib import trace_utils
from ddtrace.contrib.internal.valkey_utils import _extract_conn_tags
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.ext import db
from ddtrace.ext import valkey as valkeyx
from ddtrace.internal import core
from ddtrace.internal.constants import COMPONENT
from ddtrace.internal.schema import schematize_cache_operation
from ddtrace.internal.utils.formats import stringify_cache_args


format_command_args = stringify_cache_args


def _set_span_tags(
span, pin, config_integration, args: Optional[List], instance, query: Optional[List], is_cluster: bool = False
):
span.set_tag_str(SPAN_KIND, SpanKind.CLIENT)
span.set_tag_str(COMPONENT, config_integration.integration_name)
span.set_tag_str(db.SYSTEM, valkeyx.APP)
span.set_tag(_SPAN_MEASURED_KEY)
if query is not None:
span_name = schematize_cache_operation(valkeyx.RAWCMD, cache_provider=valkeyx.APP) # type: ignore[operator]
span.set_tag_str(span_name, query)
if pin.tags:
span.set_tags(pin.tags)
# some valkey clients do not have a connection_pool attribute (ex. aiovalkey v1.3)
if not is_cluster and hasattr(instance, "connection_pool"):
span.set_tags(_extract_conn_tags(instance.connection_pool.connection_kwargs))
if args is not None:
span.set_metric(valkeyx.ARGS_LEN, len(args))
else:
for attr in ("command_stack", "_command_stack"):
if hasattr(instance, attr):
span.set_metric(valkeyx.PIPELINE_LEN, len(getattr(instance, attr)))
# set analytics sample rate if enabled
span.set_tag(_ANALYTICS_SAMPLE_RATE_KEY, config_integration.get_analytics_sample_rate())


@contextmanager
def _instrument_valkey_cmd(pin, config_integration, instance, args):
query = stringify_cache_args(args, cmd_max_len=config_integration.cmd_max_length)
with core.context_with_data(
"valkey.command",
span_name=schematize_cache_operation(valkeyx.CMD, cache_provider=valkeyx.APP),
pin=pin,
service=trace_utils.ext_service(pin, config_integration),
span_type=SpanTypes.VALKEY,
resource=query.split(" ")[0] if config_integration.resource_only_command else query,
) as ctx, ctx.span as span:
_set_span_tags(span, pin, config_integration, args, instance, query)
yield ctx


@contextmanager
def _instrument_valkey_execute_pipeline(pin, config_integration, cmds, instance, is_cluster=False):
cmd_string = resource = "\n".join(cmds)
if config_integration.resource_only_command:
resource = "\n".join([cmd.split(" ")[0] for cmd in cmds])

with pin.tracer.trace(
schematize_cache_operation(valkeyx.CMD, cache_provider=valkeyx.APP),
resource=resource,
service=trace_utils.ext_service(pin, config_integration),
span_type=SpanTypes.VALKEY,
) as span:
_set_span_tags(span, pin, config_integration, None, instance, cmd_string)
yield span


@contextmanager
def _instrument_valkey_execute_async_cluster_pipeline(pin, config_integration, cmds, instance):
cmd_string = resource = "\n".join(cmds)
if config_integration.resource_only_command:
resource = "\n".join([cmd.split(" ")[0] for cmd in cmds])

with pin.tracer.trace(
schematize_cache_operation(valkeyx.CMD, cache_provider=valkeyx.APP),
resource=resource,
service=trace_utils.ext_service(pin, config_integration),
span_type=SpanTypes.VALKEY,
) as span:
_set_span_tags(span, pin, config_integration, None, instance, cmd_string)
yield span
36 changes: 36 additions & 0 deletions ddtrace/contrib/internal/valkey/asyncio_patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from ddtrace import config
from ddtrace._trace.utils_valkey import _instrument_valkey_cmd
from ddtrace._trace.utils_valkey import _instrument_valkey_execute_async_cluster_pipeline
from ddtrace._trace.utils_valkey import _instrument_valkey_execute_pipeline
from ddtrace.contrib.internal.valkey_utils import _run_valkey_command_async
from ddtrace.internal.utils.formats import stringify_cache_args
from ddtrace.trace import Pin


async def instrumented_async_execute_command(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return await func(*args, **kwargs)

with _instrument_valkey_cmd(pin, config.valkey, instance, args) as ctx:
return await _run_valkey_command_async(ctx=ctx, func=func, args=args, kwargs=kwargs)


async def instrumented_async_execute_pipeline(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return await func(*args, **kwargs)

cmds = [stringify_cache_args(c, cmd_max_len=config.valkey.cmd_max_length) for c, _ in instance.command_stack]
with _instrument_valkey_execute_pipeline(pin, config.valkey, cmds, instance):
return await func(*args, **kwargs)


async def instrumented_async_execute_cluster_pipeline(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return await func(*args, **kwargs)

cmds = [stringify_cache_args(c.args, cmd_max_len=config.valkey.cmd_max_length) for c in instance._command_stack]
with _instrument_valkey_execute_async_cluster_pipeline(pin, config.valkey, cmds, instance):
return await func(*args, **kwargs)
Loading

0 comments on commit 278d98b

Please sign in to comment.