Skip to content

Commit

Permalink
- Hook into complete instead of complete attempt.
Browse files Browse the repository at this point in the history
- Add custom tracer.
- Improve http tracing.
- Retry failed trace sends.
- Set trace on retries as well.

Signed-off-by: Jakub Martin <[email protected]>
  • Loading branch information
cube2222 authored and peterdeme committed Jul 7, 2024
1 parent 2664189 commit a68016b
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 0 deletions.
95 changes: 95 additions & 0 deletions contrib/internal/httputil/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package httputil // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/httputil"

//go:generate sh -c "go run make_responsewriter.go | gofmt > trace_gen.go"

import (
"fmt"
"net/http"
"strconv"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

// TraceConfig defines the configuration for request tracing.
type TraceConfig struct {
ResponseWriter http.ResponseWriter // response writer
Request *http.Request // request that is traced
Service string // service name
Resource string // resource name
QueryParams bool // specifies that request query parameters should be appended to http.url tag
FinishOpts []ddtrace.FinishOption // span finish options to be applied
SpanOpts []ddtrace.StartSpanOption // additional span options to be applied
}

// TraceAndServe will apply tracing to the given http.Handler using the passed tracer under the given service and resource.
func TraceAndServe(h http.Handler, cfg *TraceConfig) {
path := cfg.Request.URL.Path
if cfg.QueryParams {
path += "?" + cfg.Request.URL.RawQuery
}
opts := append([]ddtrace.StartSpanOption{
tracer.SpanType(ext.SpanTypeWeb),
tracer.ServiceName(cfg.Service),
tracer.ResourceName(cfg.Resource),
tracer.Tag(ext.HTTPMethod, cfg.Request.Method),
tracer.Tag(ext.HTTPURL, path),
}, cfg.SpanOpts...)
if cfg.Request.URL.Host != "" {
opts = append([]ddtrace.StartSpanOption{
tracer.Tag("http.host", cfg.Request.URL.Host),
tracer.Tag("http.content-length", cfg.Request.ContentLength),
}, opts...)
}
if spanctx, err := tracer.Extract(tracer.HTTPHeadersCarrier(cfg.Request.Header)); err == nil {
opts = append(opts, tracer.ChildOf(spanctx))
}
span, ctx := tracer.StartSpanFromContext(cfg.Request.Context(), "http.request", opts...)
defer span.Finish(cfg.FinishOpts...)

cfg.ResponseWriter = wrapResponseWriter(cfg.ResponseWriter, span)

h.ServeHTTP(cfg.ResponseWriter, cfg.Request.WithContext(ctx))
}

// responseWriter is a small wrapper around an http response writer that will
// intercept and store the status of a request.
type responseWriter struct {
http.ResponseWriter
span ddtrace.Span
status int
}

func newResponseWriter(w http.ResponseWriter, span ddtrace.Span) *responseWriter {
return &responseWriter{w, span, 0}
}

// Write writes the data to the connection as part of an HTTP reply.
// We explicitely call WriteHeader with the 200 status code
// in order to get it reported into the span.
func (w *responseWriter) Write(b []byte) (int, error) {
if w.status == 0 {
w.WriteHeader(http.StatusOK)
}
return w.ResponseWriter.Write(b)
}

// WriteHeader sends an HTTP response header with status code.
// It also sets the status code to the span.
func (w *responseWriter) WriteHeader(status int) {
if w.status != 0 {
return
}
w.ResponseWriter.WriteHeader(status)
w.status = status
w.span.SetTag(ext.HTTPCode, strconv.Itoa(status))
if status >= 500 && status < 600 {
w.span.SetTag(ext.Error, fmt.Errorf("%d: %s", status, http.StatusText(status)))
}
}
119 changes: 119 additions & 0 deletions ddtrace/tracer/custom_trace_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-2020 Datadog, Inc.
// Copyright 2021 Spacelift, Inc.
package tracer

import (
"sync"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
)

type customTraceWriter struct {
spans []*span

// climit limits the number of concurrent outgoing connections
climit chan struct{}

sendSpans func(spans []*CustomSpan) error

// wg waits for all uploads to finish
wg sync.WaitGroup
}

func NewCustomTraceWriter(sendSpans func(spans []*CustomSpan) error) *customTraceWriter {
return &customTraceWriter{
climit: make(chan struct{}, concurrentConnectionLimit),
sendSpans: sendSpans,
}
}

func (h *customTraceWriter) add(trace []*span) {
h.spans = append(h.spans, trace...)
if len(h.spans) > payloadSizeLimit {
h.flush()
}
}

func (h *customTraceWriter) stop() {
h.flush()
h.wg.Wait()
}

// flush will push any currently buffered traces to the server.
func (h *customTraceWriter) flush() {
if len(h.spans) == 0 {
return
}
h.wg.Add(1)
h.climit <- struct{}{}
go func(spans []*span) {
defer func(start time.Time) {
<-h.climit
h.wg.Done()
}(time.Now())

customSpans := make([]*CustomSpan, len(spans))
for i := range spans {
customSpans[i] = &CustomSpan{
Name: spans[i].Name,
Service: spans[i].Service,
Resource: spans[i].Resource,
Type: spans[i].Type,
Start: spans[i].Start,
Duration: spans[i].Duration,
Meta: spans[i].Meta,
Metrics: spans[i].Metrics,
SpanID: spans[i].SpanID,
TraceID: spans[i].TraceID,
ParentID: spans[i].ParentID,
Error: spans[i].Error,
}
}

if err := h.sendSpans(customSpans); err != nil {
log.Error("lost %d spans: %v", len(spans), err)
}
}(h.spans)
h.spans = nil
}

type CustomSpan struct {
Name string `json:"name"` // operation name
Service string `json:"service"` // service name (i.e. "grpc.server", "http.request")
Resource string `json:"resource"` // resource name (i.e. "/user?id=123", "SELECT * FROM users")
Type string `json:"type"` // protocol associated with the span (i.e. "web", "db", "cache")
Start int64 `json:"start"` // span start time expressed in nanoseconds since epoch
Duration int64 `json:"duration"` // duration of the span expressed in nanoseconds
Meta map[string]string `json:"meta"` // arbitrary map of metadata
Metrics map[string]float64 `json:"metrics"` // arbitrary map of numeric metrics
SpanID uint64 `json:"span_id"` // identifier of this span
TraceID uint64 `json:"trace_id"` // identifier of the root span
ParentID uint64 `json:"parent_id"` // identifier of the span's direct parent
Error int32 `json:"error"` // error status of the span; 0 means no errors
}

type multiTraceWriter struct {
ws []traceWriter
}

func (m *multiTraceWriter) add(spans []*span) {
for i := range m.ws {
m.ws[i].add(spans)
}
}

func (m *multiTraceWriter) flush() {
for i := range m.ws {
m.ws[i].flush()
}
}

func (m *multiTraceWriter) stop() {
for i := range m.ws {
m.ws[i].stop()
}
}

0 comments on commit a68016b

Please sign in to comment.