Skip to content

Commit

Permalink
Merge branch '3.x-staging' into yunkim/fix-langchain-deprecated-cost
Browse files Browse the repository at this point in the history
  • Loading branch information
Yun-Kim authored Feb 5, 2025
2 parents 00649e5 + 33ad7b3 commit 1573404
Show file tree
Hide file tree
Showing 65 changed files with 3,752 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .github/COMMIT_TEMPLATE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ feat/fix/docs/refactor/ci(xxx): commit title here
# mysqlpython, openai, opentelemetry, opentracer, profile, psycopg, pylibmc, pymemcache,
# pymongo, pymysql, pynamodb, pyodbc, pyramid, pytest, redis, rediscluster, requests, rq,
# sanic, snowflake, sourcecode, sqlalchemy, starlette, stdlib, structlog, subprocess,
# telemetry, test_logging, tornado, tracer, unittest, urllib3, vendor, vertica, wsgi,
# telemetry, test_logging, tornado, tracer, unittest, urllib3, valkey, vendor, vertica, wsgi,
# yaaredis
6 changes: 6 additions & 0 deletions .gitlab/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
redis:
name: registry.ddbuild.io/redis:7.0.7
alias: redis
valkey:
name: registry.ddbuild.io/images/mirror/valkey:8.0-alpine
alias: valkey
kafka:
name: registry.ddbuild.io/images/mirror/apache/kafka:3.8.0
alias: kafka
Expand All @@ -54,6 +57,9 @@
rediscluster:
name: registry.ddbuild.io/images/mirror/grokzen/redis-cluster:6.2.0
alias: rediscluster
valkeycluster:
name: registry.ddbuild.io/images/mirror/grokzen/redis-cluster:6.2.0
alias: valkeycluster
elasticsearch:
name: registry.ddbuild.io/images/mirror/library/elasticsearch:7.17.23
alias: elasticsearch
Expand Down
26 changes: 26 additions & 0 deletions .riot/requirements/11ac941.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# This file is autogenerated by pip-compile with Python 3.8
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/11ac941.in
#
async-timeout==5.0.1
attrs==24.3.0
coverage[toml]==7.6.1
exceptiongroup==1.2.2
hypothesis==6.45.0
importlib-metadata==8.5.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==5.0.0
pytest-mock==3.14.0
pytest-randomly==3.15.0
sortedcontainers==2.4.0
tomli==2.2.1
valkey==6.0.2
zipp==3.20.2
26 changes: 26 additions & 0 deletions .riot/requirements/1e98e9b.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# This file is autogenerated by pip-compile with Python 3.9
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/1e98e9b.in
#
async-timeout==5.0.1
attrs==24.3.0
coverage[toml]==7.6.10
exceptiongroup==1.2.2
hypothesis==6.45.0
importlib-metadata==8.5.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
tomli==2.2.1
valkey==6.0.2
zipp==3.21.0
22 changes: 22 additions & 0 deletions .riot/requirements/4aa2a2a.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# This file is autogenerated by pip-compile with Python 3.11
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/4aa2a2a.in
#
async-timeout==5.0.1
attrs==24.3.0
coverage[toml]==7.6.10
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
valkey==6.0.2
21 changes: 21 additions & 0 deletions .riot/requirements/7219cf4.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# This file is autogenerated by pip-compile with Python 3.13
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/7219cf4.in
#
attrs==24.3.0
coverage[toml]==7.6.10
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
valkey==6.0.2
21 changes: 21 additions & 0 deletions .riot/requirements/b96b665.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/b96b665.in
#
attrs==24.3.0
coverage[toml]==7.6.10
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
valkey==6.0.2
24 changes: 24 additions & 0 deletions .riot/requirements/dd68acc.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/dd68acc.in
#
async-timeout==5.0.1
attrs==24.3.0
coverage[toml]==7.6.10
exceptiongroup==1.2.2
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
tomli==2.2.1
valkey==6.0.2
1 change: 1 addition & 0 deletions ddtrace/_monkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
"unittest": True,
"coverage": False,
"selenium": True,
"valkey": True,
}


Expand Down
8 changes: 8 additions & 0 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,11 @@ def _on_redis_command_post(ctx: core.ExecutionContext, rowcount):
ctx.span.set_metric(db.ROWCOUNT, rowcount)


def _on_valkey_command_post(ctx: core.ExecutionContext, rowcount):
if rowcount is not None:
ctx.span.set_metric(db.ROWCOUNT, rowcount)


def _on_test_visibility_enable(config) -> None:
from ddtrace.internal.ci_visibility import CIVisibility

Expand Down Expand Up @@ -797,6 +802,8 @@ def listen():
core.on("botocore.kinesis.GetRecords.post", _on_botocore_kinesis_getrecords_post)
core.on("redis.async_command.post", _on_redis_command_post)
core.on("redis.command.post", _on_redis_command_post)
core.on("valkey.async_command.post", _on_valkey_command_post)
core.on("valkey.command.post", _on_valkey_command_post)
core.on("azure.functions.request_call_modifier", _on_azure_functions_request_span_modifier)
core.on("azure.functions.start_response", _on_azure_functions_start_response)

Expand Down Expand Up @@ -838,6 +845,7 @@ def listen():
"botocore.patched_stepfunctions_api_call",
"botocore.patched_bedrock_api_call",
"redis.command",
"valkey.command",
"rq.queue.enqueue_job",
"rq.traced_queue_fetch_job",
"rq.worker.perform_job",
Expand Down
2 changes: 0 additions & 2 deletions ddtrace/_trace/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
from ddtrace.internal.serverless import in_gcp_function
from ddtrace.internal.service import ServiceStatusError
from ddtrace.internal.utils import _get_metas_to_propagate
from ddtrace.internal.utils.deprecations import DDTraceDeprecationWarning
from ddtrace.internal.utils.formats import format_trace_id
from ddtrace.internal.utils.http import verify_url
from ddtrace.internal.writer import AgentResponse
Expand All @@ -69,7 +68,6 @@
from ddtrace.settings import Config
from ddtrace.settings.asm import config as asm_config
from ddtrace.settings.peer_service import _ps_config
from ddtrace.vendor.debtcollector import deprecate


log = get_logger(__name__)
Expand Down
96 changes: 96 additions & 0 deletions ddtrace/_trace/utils_valkey.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""
Some utils used by the dogtrace valkey integration
"""

from contextlib import contextmanager
from typing import List
from typing import Optional

from ddtrace.constants import _ANALYTICS_SAMPLE_RATE_KEY
from ddtrace.constants import _SPAN_MEASURED_KEY
from ddtrace.constants import SPAN_KIND
from ddtrace.contrib import trace_utils
from ddtrace.contrib.internal.valkey_utils import _extract_conn_tags
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.ext import db
from ddtrace.ext import valkey as valkeyx
from ddtrace.internal import core
from ddtrace.internal.constants import COMPONENT
from ddtrace.internal.schema import schematize_cache_operation
from ddtrace.internal.utils.formats import stringify_cache_args


format_command_args = stringify_cache_args


def _set_span_tags(
span, pin, config_integration, args: Optional[List], instance, query: Optional[List], is_cluster: bool = False
):
span.set_tag_str(SPAN_KIND, SpanKind.CLIENT)
span.set_tag_str(COMPONENT, config_integration.integration_name)
span.set_tag_str(db.SYSTEM, valkeyx.APP)
span.set_tag(_SPAN_MEASURED_KEY)
if query is not None:
span_name = schematize_cache_operation(valkeyx.RAWCMD, cache_provider=valkeyx.APP) # type: ignore[operator]
span.set_tag_str(span_name, query)
if pin.tags:
span.set_tags(pin.tags)
# some valkey clients do not have a connection_pool attribute (ex. aiovalkey v1.3)
if not is_cluster and hasattr(instance, "connection_pool"):
span.set_tags(_extract_conn_tags(instance.connection_pool.connection_kwargs))
if args is not None:
span.set_metric(valkeyx.ARGS_LEN, len(args))
else:
for attr in ("command_stack", "_command_stack"):
if hasattr(instance, attr):
span.set_metric(valkeyx.PIPELINE_LEN, len(getattr(instance, attr)))
# set analytics sample rate if enabled
span.set_tag(_ANALYTICS_SAMPLE_RATE_KEY, config_integration.get_analytics_sample_rate())


@contextmanager
def _instrument_valkey_cmd(pin, config_integration, instance, args):
query = stringify_cache_args(args, cmd_max_len=config_integration.cmd_max_length)
with core.context_with_data(
"valkey.command",
span_name=schematize_cache_operation(valkeyx.CMD, cache_provider=valkeyx.APP),
pin=pin,
service=trace_utils.ext_service(pin, config_integration),
span_type=SpanTypes.VALKEY,
resource=query.split(" ")[0] if config_integration.resource_only_command else query,
) as ctx, ctx.span as span:
_set_span_tags(span, pin, config_integration, args, instance, query)
yield ctx


@contextmanager
def _instrument_valkey_execute_pipeline(pin, config_integration, cmds, instance, is_cluster=False):
cmd_string = resource = "\n".join(cmds)
if config_integration.resource_only_command:
resource = "\n".join([cmd.split(" ")[0] for cmd in cmds])

with pin.tracer.trace(
schematize_cache_operation(valkeyx.CMD, cache_provider=valkeyx.APP),
resource=resource,
service=trace_utils.ext_service(pin, config_integration),
span_type=SpanTypes.VALKEY,
) as span:
_set_span_tags(span, pin, config_integration, None, instance, cmd_string)
yield span


@contextmanager
def _instrument_valkey_execute_async_cluster_pipeline(pin, config_integration, cmds, instance):
cmd_string = resource = "\n".join(cmds)
if config_integration.resource_only_command:
resource = "\n".join([cmd.split(" ")[0] for cmd in cmds])

with pin.tracer.trace(
schematize_cache_operation(valkeyx.CMD, cache_provider=valkeyx.APP),
resource=resource,
service=trace_utils.ext_service(pin, config_integration),
span_type=SpanTypes.VALKEY,
) as span:
_set_span_tags(span, pin, config_integration, None, instance, cmd_string)
yield span
36 changes: 36 additions & 0 deletions ddtrace/contrib/internal/valkey/asyncio_patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from ddtrace import config
from ddtrace._trace.utils_valkey import _instrument_valkey_cmd
from ddtrace._trace.utils_valkey import _instrument_valkey_execute_async_cluster_pipeline
from ddtrace._trace.utils_valkey import _instrument_valkey_execute_pipeline
from ddtrace.contrib.internal.valkey_utils import _run_valkey_command_async
from ddtrace.internal.utils.formats import stringify_cache_args
from ddtrace.trace import Pin


async def instrumented_async_execute_command(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return await func(*args, **kwargs)

with _instrument_valkey_cmd(pin, config.valkey, instance, args) as ctx:
return await _run_valkey_command_async(ctx=ctx, func=func, args=args, kwargs=kwargs)


async def instrumented_async_execute_pipeline(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return await func(*args, **kwargs)

cmds = [stringify_cache_args(c, cmd_max_len=config.valkey.cmd_max_length) for c, _ in instance.command_stack]
with _instrument_valkey_execute_pipeline(pin, config.valkey, cmds, instance):
return await func(*args, **kwargs)


async def instrumented_async_execute_cluster_pipeline(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return await func(*args, **kwargs)

cmds = [stringify_cache_args(c.args, cmd_max_len=config.valkey.cmd_max_length) for c in instance._command_stack]
with _instrument_valkey_execute_async_cluster_pipeline(pin, config.valkey, cmds, instance):
return await func(*args, **kwargs)
Loading

0 comments on commit 1573404

Please sign in to comment.