Skip to content

Commit

Permalink
Merge branch 'main' into chore/trigger-probe
Browse files Browse the repository at this point in the history
  • Loading branch information
P403n1x87 authored Feb 12, 2025
2 parents 23fd710 + b4711ea commit 57efadc
Show file tree
Hide file tree
Showing 62 changed files with 2,605 additions and 584 deletions.
14 changes: 0 additions & 14 deletions .gitlab/benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,25 +91,11 @@ check-big-regressions:
KUBERNETES_SERVICE_ACCOUNT_OVERWRITE: dd-trace-py

benchmark-serverless:
stage: benchmarks
image: $SLS_CI_IMAGE
tags: ["arch:amd64"]
when: on_success
needs: [ "benchmark-serverless-trigger" ]
allow_failure: true
script:
- git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.ddbuild.io/DataDog/serverless-tools.git ./serverless-tools && cd ./serverless-tools
- ./ci/check_trigger_status.sh
except:
- main # fails on main - ideally it would succeed

benchmark-serverless-trigger:
stage: benchmarks
trigger:
project: DataDog/serverless-tools
strategy: depend
needs: []
allow_failure: true
variables:
UPSTREAM_PIPELINE_ID: $CI_PIPELINE_ID
UPSTREAM_PROJECT_URL: $CI_PROJECT_URL
Expand Down
27 changes: 0 additions & 27 deletions .riot/requirements/1d4e95e.txt

This file was deleted.

4 changes: 0 additions & 4 deletions ddtrace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@
from .internal.compat import PYTHON_VERSION_INFO # noqa: E402
from .internal.utils.deprecations import DDTraceDeprecationWarning # noqa: E402

# TODO(munir): Remove the imports below in v3.0
from ddtrace._trace import pin as _p # noqa: E402, F401
from ddtrace._trace import span as _s # noqa: E402, F401
from ddtrace._trace import tracer as _t # noqa: E402, F401
from ddtrace.vendor import debtcollector
from .version import get_version # noqa: E402

Expand Down
116 changes: 116 additions & 0 deletions ddtrace/_trace/_inferred_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import logging
from typing import Dict
from typing import Union

from ddtrace import config
from ddtrace._trace.span import Span
from ddtrace.constants import SPAN_KIND
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.ext import http
from ddtrace.internal.constants import COMPONENT
from ddtrace.propagation.http import _extract_header_value
from ddtrace.propagation.http import _possible_header


log = logging.getLogger(__name__)

# Checking lower case and upper case versions per WSGI spec following ddtrace/propagation/http.py's
# logic to extract http headers
POSSIBLE_PROXY_HEADER_SYSTEM = _possible_header("x-dd-proxy")
POSSIBLE_PROXY_HEADER_START_TIME_MS = _possible_header("x-dd-proxy-request-time-ms")
POSSIBLE_PROXY_HEADER_PATH = _possible_header("x-dd-proxy-path")
POSSIBLE_PROXY_HEADER_HTTPMETHOD = _possible_header("x-dd-proxy-httpmethod")
POSSIBLE_PROXY_HEADER_DOMAIN = _possible_header("x-dd-proxy-domain-name")
POSSIBLE_PROXY_HEADER_STAGE = _possible_header("x-dd-proxy-stage")

supported_proxies: Dict[str, Dict[str, str]] = {
"aws-apigateway": {"span_name": "aws.apigateway", "component": "aws-apigateway"}
}


def create_inferred_proxy_span_if_headers_exist(ctx, headers, child_of, tracer) -> None:
if not headers:
return None

normalized_headers = normalize_headers(headers)

proxy_context = extract_inferred_proxy_context(normalized_headers)

if not proxy_context:
return None

proxy_span_info = supported_proxies[proxy_context["proxy_system_name"]]

span = tracer.start_span(
proxy_span_info["span_name"],
service=proxy_context.get("domain_name", config._get_service()),
resource=proxy_context["method"] + " " + proxy_context["path"],
span_type=SpanTypes.WEB,
activate=True,
child_of=child_of,
)
span.start_ns = int(proxy_context["request_time"]) * 1000000

set_inferred_proxy_span_tags(span, proxy_context)

# we need a callback to finish the api gateway span, this callback will be added to the child spans finish callbacks
def finish_callback(_):
span.finish()

if span:
ctx.set_item("inferred_proxy_span", span)
ctx.set_item("inferred_proxy_finish_callback", finish_callback)
ctx.set_item("headers", headers)


def set_inferred_proxy_span_tags(span, proxy_context) -> Span:
span.set_tag_str(COMPONENT, supported_proxies[proxy_context["proxy_system_name"]]["component"])
span.set_tag_str(SPAN_KIND, SpanKind.INTERNAL)

span.set_tag_str(http.METHOD, proxy_context["method"])
span.set_tag_str(http.URL, f"{proxy_context['domain_name']}{proxy_context['path']}")
span.set_tag_str(http.ROUTE, proxy_context["path"])
span.set_tag_str("stage", proxy_context["stage"])

span.set_tag_str("_dd.inferred_span", "1")
return span


def extract_inferred_proxy_context(headers) -> Union[None, Dict[str, str]]:
proxy_header_system = str(_extract_header_value(POSSIBLE_PROXY_HEADER_SYSTEM, headers))
proxy_header_start_time_ms = str(_extract_header_value(POSSIBLE_PROXY_HEADER_START_TIME_MS, headers))
proxy_header_path = str(_extract_header_value(POSSIBLE_PROXY_HEADER_PATH, headers))
proxy_header_httpmethod = str(_extract_header_value(POSSIBLE_PROXY_HEADER_HTTPMETHOD, headers))
proxy_header_domain = str(_extract_header_value(POSSIBLE_PROXY_HEADER_DOMAIN, headers))
proxy_header_stage = str(_extract_header_value(POSSIBLE_PROXY_HEADER_STAGE, headers))

# Exit if any of the required headers are not present
if (
not proxy_header_system
or not proxy_header_start_time_ms
or not proxy_header_path
or not proxy_header_httpmethod
or not proxy_header_domain
or not proxy_header_stage
):
return None

if not (proxy_header_system and proxy_header_system in supported_proxies):
log.debug(
"Received headers to create inferred proxy span but headers include an unsupported proxy type", headers
)
return None

return {
"request_time": proxy_header_start_time_ms,
"method": proxy_header_httpmethod,
"path": proxy_header_path,
"stage": proxy_header_stage,
"domain_name": proxy_header_domain,
"proxy_system_name": proxy_header_system,
}


def normalize_headers(headers) -> Dict[str, str]:
return {key.lower(): value for key, value in headers.items()}
5 changes: 4 additions & 1 deletion ddtrace/_trace/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,11 +520,14 @@ def set_exc_info(

# readable version of type (e.g. exceptions.ZeroDivisionError)
exc_type_str = "%s.%s" % (exc_type.__module__, exc_type.__name__)

self._meta[ERROR_MSG] = str(exc_val)
self._meta[ERROR_TYPE] = exc_type_str
self._meta[ERROR_STACK] = tb

# some web integrations like bottle rely on set_exc_info to get the error tags, so we need to dispatch
# this event such that the additional tags for inferred aws api gateway spans can be appended here.
core.dispatch("web.request.final_tags", (self,))

core.dispatch("span.exception", (self, exc_type, exc_val, exc_tb))

def _pprint(self) -> str:
Expand Down
109 changes: 104 additions & 5 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import wrapt

from ddtrace import config
from ddtrace._trace._inferred_proxy import create_inferred_proxy_span_if_headers_exist
from ddtrace._trace._span_pointer import _SpanPointerDescription
from ddtrace._trace.utils import extract_DD_context_from_messages
from ddtrace._trace.utils_botocore.span_pointers import extract_span_pointers_from_successful_botocore_response
Expand All @@ -19,6 +20,9 @@
from ddtrace._trace.utils_botocore.span_tags import set_botocore_response_metadata_tags
from ddtrace.constants import _ANALYTICS_SAMPLE_RATE_KEY
from ddtrace.constants import _SPAN_MEASURED_KEY
from ddtrace.constants import ERROR_MSG
from ddtrace.constants import ERROR_STACK
from ddtrace.constants import ERROR_TYPE
from ddtrace.constants import SPAN_KIND
from ddtrace.contrib import trace_utils
from ddtrace.contrib.internal.botocore.constants import BOTOCORE_STEPFUNCTIONS_INPUT_KEY
Expand Down Expand Up @@ -121,11 +125,25 @@ def _start_span(ctx: core.ExecutionContext, call_trace: bool = True, **kwargs) -
distributed_context = ctx.get_item("distributed_context")
if distributed_context and not call_trace:
span_kwargs["child_of"] = distributed_context

if config._inferred_proxy_services_enabled:
# dispatch event for checking headers and possibly making an inferred proxy span
core.dispatch("inferred_proxy.start", (ctx, tracer, span_kwargs, call_trace, distributed_headers_config))
# re-get span_kwargs in case an inferred span was created and we have a new span_kwargs.child_of field
span_kwargs = ctx.get_item("span_kwargs", span_kwargs)

span_kwargs.update(kwargs)
span = (tracer.trace if call_trace else tracer.start_span)(ctx["span_name"], **span_kwargs)

for tk, tv in ctx.get_item("tags", dict()).items():
span.set_tag_str(tk, tv)

ctx.span = span

if config._inferred_proxy_services_enabled:
# dispatch event for inferred proxy finish
core.dispatch("inferred_proxy.finish", (ctx,))

return span


Expand All @@ -148,7 +166,7 @@ def _on_web_framework_start_request(ctx, int_config):


def _on_web_framework_finish_request(
span, int_config, method, url, status_code, query, req_headers, res_headers, route, finish
span, int_config, method, url, status_code, query, req_headers, res_headers, route, finish, **kwargs
):
trace_utils.set_http_meta(
span=span,
Expand All @@ -160,11 +178,72 @@ def _on_web_framework_finish_request(
request_headers=req_headers,
response_headers=res_headers,
route=route,
**kwargs,
)
_set_inferred_proxy_tags(span, status_code)

if finish:
span.finish()


def _set_inferred_proxy_tags(span, status_code):
if span._parent and span._parent.name == "aws.apigateway":
inferred_span = span._parent
status_code = status_code if status_code else span.get_tag("http.status_code")
if status_code:
inferred_span.set_tag("http.status_code", status_code)
if span.error == 1:
inferred_span.error = span.error
if ERROR_MSG in span._meta.keys():
inferred_span.set_tag(ERROR_MSG, span.get_tag(ERROR_MSG))
if ERROR_TYPE in span._meta.keys():
inferred_span.set_tag(ERROR_TYPE, span.get_tag(ERROR_TYPE))
if ERROR_STACK in span._meta.keys():
inferred_span.set_tag(ERROR_STACK, span.get_tag(ERROR_STACK))


def _on_inferred_proxy_start(ctx, tracer, span_kwargs, call_trace, distributed_headers_config):
# Skip creating another inferred span if one has already been created for this request
if ctx.get_item("inferred_proxy_span"):
return

# some integrations like Flask / WSGI store headers from environ in 'distributed_headers'
# and normalized headers in 'headers'
headers = ctx.get_item("headers", ctx.get_item("distributed_headers", None))

# Inferred Proxy Spans
if distributed_headers_config and headers is not None:
create_inferred_proxy_span_if_headers_exist(
ctx,
headers=headers,
child_of=tracer.current_trace_context(),
tracer=tracer,
)
inferred_proxy_span = ctx.get_item("inferred_proxy_span")

# use the inferred proxy span as the new parent span
if inferred_proxy_span and not call_trace:
span_kwargs["child_of"] = inferred_proxy_span
ctx.set_item("span_kwargs", span_kwargs)


def _on_inferred_proxy_finish(ctx):
if not config._inferred_proxy_services_enabled:
return

inferred_proxy_span = ctx.get_item("inferred_proxy_span")
inferred_proxy_finish_callback = ctx.get_item("inferred_proxy_finish_callback")

# add callback to finish inferred proxy span when this span finishes
if (
inferred_proxy_span
and inferred_proxy_finish_callback
and ctx.span
and ctx.span.parent_id == inferred_proxy_span.span_id
):
ctx.span._on_finish_callbacks.append(inferred_proxy_finish_callback)


def _on_traced_request_context_started_flask(ctx):
current_span = ctx["pin"].tracer.current_span()
if not ctx["pin"].enabled or not current_span:
Expand Down Expand Up @@ -332,12 +411,17 @@ def _on_start_response_pre(request, ctx, flask_config, status_code, headers):
span.resource = " ".join((request.method, code))

response_cookies = _cookies_from_response_headers(headers)
trace_utils.set_http_meta(
span,
flask_config,
_on_web_framework_finish_request(
span=span,
int_config=flask_config,
method=request.method,
url=None,
status_code=code,
response_headers=headers,
query=None,
req_headers=None,
res_headers=headers,
route=span.get_tag(FLASK_URL_RULE),
finish=False,
response_cookies=response_cookies,
)

Expand Down Expand Up @@ -408,11 +492,20 @@ def _on_traced_get_response_pre(_, ctx: core.ExecutionContext, request, before_r
ctx.span._metrics[_SPAN_MEASURED_KEY] = 1


def _on_web_request_final_tags(span):
# Necessary to add remaining http status codes and
# errors relevant to the aws api gateway spans on close
if span and span.span_type == "web":
_set_inferred_proxy_tags(span, None)


def _on_django_finalize_response_pre(ctx, after_request_tags, request, response):
# DEV: Always set these tags, this is where `span.resource` is set
span = ctx.span
after_request_tags(ctx["pin"], span, request, response)

trace_utils.set_http_meta(span, ctx["distributed_headers_config"], route=span.get_tag("http.route"))
_set_inferred_proxy_tags(span, None)


def _on_django_start_response(
Expand Down Expand Up @@ -810,6 +903,11 @@ def listen():
# web frameworks general handlers
core.on("web.request.start", _on_web_framework_start_request)
core.on("web.request.finish", _on_web_framework_finish_request)
core.on("web.request.final_tags", _on_web_request_final_tags)

# inferred proxy handlers
core.on("inferred_proxy.start", _on_inferred_proxy_start)
core.on("inferred_proxy.finish", _on_inferred_proxy_finish)

core.on("test_visibility.enable", _on_test_visibility_enable)
core.on("test_visibility.disable", _on_test_visibility_disable)
Expand All @@ -827,6 +925,7 @@ def listen():
"molten.request",
"pyramid.request",
"sanic.request",
"tornado.request",
"flask.call",
"flask.jsonify",
"flask.render_template",
Expand Down
Loading

0 comments on commit 57efadc

Please sign in to comment.