Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APM: Only do stats obfuscation when we know its safe to do so #3155

Merged
merged 2 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddtrace/tracer/civisibility_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (t *ciVisibilityTransport) send(p *payload) (body io.ReadCloser, err error)
// Returns:
//
// An error indicating that stats are not supported.
func (t *ciVisibilityTransport) sendStats(*pb.ClientStatsPayload) error {
func (t *ciVisibilityTransport) sendStats(*pb.ClientStatsPayload, int) error {
// Stats are not supported by CI Visibility agentless / EVP proxy.
return nil
}
Expand Down
17 changes: 11 additions & 6 deletions ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,9 @@ type agentFeatures struct {

// metaStructAvailable reports whether the trace-agent can receive spans with the `meta_struct` field.
metaStructAvailable bool

// obfuscationVersion reports the trace-agent's version of obfuscation logic. A value of 0 means this field wasn't present.
obfuscationVersion int
}

// HasFlag reports whether the agent has set the feat feature flag.
Expand All @@ -755,12 +758,13 @@ func loadAgentFeatures(agentDisabled bool, agentURL *url.URL, httpClient *http.C
}
defer resp.Body.Close()
type infoResponse struct {
Endpoints []string `json:"endpoints"`
ClientDropP0s bool `json:"client_drop_p0s"`
FeatureFlags []string `json:"feature_flags"`
PeerTags []string `json:"peer_tags"`
SpanMetaStruct bool `json:"span_meta_structs"`
Config struct {
Endpoints []string `json:"endpoints"`
ClientDropP0s bool `json:"client_drop_p0s"`
FeatureFlags []string `json:"feature_flags"`
PeerTags []string `json:"peer_tags"`
SpanMetaStruct bool `json:"span_meta_structs"`
ObfuscationVersion int `json:"obfuscation_version"`
Config struct {
StatsdPort int `json:"statsd_port"`
} `json:"config"`
}
Expand All @@ -775,6 +779,7 @@ func loadAgentFeatures(agentDisabled bool, agentURL *url.URL, httpClient *http.C
features.StatsdPort = info.Config.StatsdPort
features.metaStructAvailable = info.SpanMetaStruct
features.peerTags = info.PeerTags
features.obfuscationVersion = info.ObfuscationVersion
for _, endpoint := range info.Endpoints {
switch endpoint {
case "/v0.6/stats":
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/tracer/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestLoadAgentFeatures(t *testing.T) {

t.Run("OK", func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Write([]byte(`{"endpoints":["/v0.6/stats"],"feature_flags":["a","b"],"client_drop_p0s":true,"peer_tags":["peer.hostname"],"config": {"statsd_port":8999}}`))
w.Write([]byte(`{"endpoints":["/v0.6/stats"],"feature_flags":["a","b"],"client_drop_p0s":true,"obfuscation_version":2,"peer_tags":["peer.hostname"],"config": {"statsd_port":8999}}`))
}))
defer srv.Close()
cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://")), WithAgentTimeout(2))
Expand All @@ -238,6 +238,7 @@ func TestLoadAgentFeatures(t *testing.T) {
assert.True(t, cfg.agent.HasFlag("a"))
assert.True(t, cfg.agent.HasFlag("b"))
assert.EqualValues(t, cfg.agent.peerTags, []string{"peer.hostname"})
assert.Equal(t, 2, cfg.agent.obfuscationVersion)
})

t.Run("discovery", func(t *testing.T) {
Expand Down
24 changes: 22 additions & 2 deletions ddtrace/tracer/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"github.com/DataDog/datadog-go/v5/statsd"
)

// tracerObfuscationVersion indicates which version of stats obfuscation logic we implement
// In the future this can be pulled directly from our obfuscation import.
var tracerObfuscationVersion = 1

// defaultStatsBucketSize specifies the default span of time that will be
// covered in one stats bucket.
var defaultStatsBucketSize = (10 * time.Second).Nanoseconds()
Expand Down Expand Up @@ -157,7 +161,11 @@ func (c *concentrator) runIngester() {
}

func (c *concentrator) newTracerStatSpan(s *span, obfuscator *obfuscate.Obfuscator) (*tracerStatSpan, bool) {
statSpan, ok := c.spanConcentrator.NewStatSpan(s.Service, obfuscatedResource(obfuscator, s.Type, s.Resource),
resource := s.Resource
if c.shouldObfuscate() {
resource = obfuscatedResource(obfuscator, s.Type, s.Resource)
}
statSpan, ok := c.spanConcentrator.NewStatSpan(s.Service, resource,
s.Name, s.Type, s.ParentID, s.Start, s.Duration, s.Error, s.Meta, s.Metrics, c.cfg.agent.peerTags)
if !ok {
return nil, false
Expand All @@ -169,6 +177,11 @@ func (c *concentrator) newTracerStatSpan(s *span, obfuscator *obfuscate.Obfuscat
}, true
}

func (c *concentrator) shouldObfuscate() bool {
// Obfuscate if agent reports an obfuscation version AND our version is at least as new
return c.cfg.agent.obfuscationVersion > 0 && c.cfg.agent.obfuscationVersion <= tracerObfuscationVersion
}

// add s into the concentrator's internal stats buckets.
func (c *concentrator) add(s *tracerStatSpan) {
c.spanConcentrator.AddSpan(s.statSpan, c.aggregationKey, "", nil, s.origin)
Expand Down Expand Up @@ -204,6 +217,13 @@ const (
func (c *concentrator) flushAndSend(timenow time.Time, includeCurrent bool) {
csps := c.spanConcentrator.Flush(timenow.UnixNano(), includeCurrent)

obfVersion := 0
if c.shouldObfuscate() {
obfVersion = tracerObfuscationVersion
} else {
log.Debug("Stats Obfuscation was skipped, agent will obfuscate (tracer %d, agent %d)", tracerObfuscationVersion, c.cfg.agent.obfuscationVersion)
}

if len(csps) == 0 {
// nothing to flush
return
Expand All @@ -214,7 +234,7 @@ func (c *concentrator) flushAndSend(timenow time.Time, includeCurrent bool) {
// compatible in case this ever changes we can just iterate through all of them.
for _, csp := range csps {
flushedBuckets += len(csp.Stats)
if err := c.cfg.transport.sendStats(csp); err != nil {
if err := c.cfg.transport.sendStats(csp, obfVersion); err != nil {
c.statsd().Incr("datadog.tracer.stats.flush_errors", nil, 1)
log.Error("Error sending stats payload: %v", err)
}
Expand Down
55 changes: 54 additions & 1 deletion ddtrace/tracer/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/stretchr/testify/assert"

"github.com/DataDog/datadog-agent/pkg/obfuscate"
"github.com/DataDog/datadog-go/v5/statsd"
"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/constants"
"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/utils"
Expand Down Expand Up @@ -119,7 +120,7 @@ func TestConcentrator(t *testing.T) {
// stats should be sent if the concentrator is stopped
t.Run("stop", func(t *testing.T) {
transport := newDummyTransport()
c := newConcentrator(&config{transport: transport}, 500000, &statsd.NoOpClientDirect{})
c := newConcentrator(&config{transport: transport}, 500_000, &statsd.NoOpClientDirect{})
assert.Len(t, transport.Stats(), 0)
ss1, ok := c.newTracerStatSpan(&s1, nil)
assert.True(t, ok)
Expand All @@ -130,3 +131,55 @@ func TestConcentrator(t *testing.T) {
})
})
}

func TestShouldObfuscate(t *testing.T) {
bucketSize := int64(500_000)
tsp := newDummyTransport()
for _, params := range []struct {
name string
tracerVersion int
agentVersion int
expectedShouldObfuscate bool
}{
{name: "version equal", tracerVersion: 2, agentVersion: 2, expectedShouldObfuscate: true},
{name: "agent version missing", tracerVersion: 2, agentVersion: 0, expectedShouldObfuscate: false},
{name: "agent version older", tracerVersion: 2, agentVersion: 1, expectedShouldObfuscate: true},
{name: "agent version newer", tracerVersion: 2, agentVersion: 3, expectedShouldObfuscate: false},
} {
t.Run(params.name, func(t *testing.T) {
c := newConcentrator(&config{transport: tsp, env: "someEnv", agent: agentFeatures{obfuscationVersion: params.agentVersion}}, bucketSize, &statsd.NoOpClientDirect{})
defer func(oldVersion int) { tracerObfuscationVersion = oldVersion }(tracerObfuscationVersion)
tracerObfuscationVersion = params.tracerVersion
assert.Equal(t, params.expectedShouldObfuscate, c.shouldObfuscate())
})
}
}

func TestObfuscation(t *testing.T) {
bucketSize := int64(500_000)
s1 := span{
Name: "redis-query",
Start: time.Now().UnixNano() + 3*bucketSize,
Duration: 1,
Metrics: map[string]float64{keyMeasured: 1},
Type: "redis",
Resource: "GET somekey",
}
tsp := newDummyTransport()
c := newConcentrator(&config{transport: tsp, env: "someEnv", agent: agentFeatures{obfuscationVersion: 2}}, bucketSize, &statsd.NoOpClientDirect{})
defer func(oldVersion int) { tracerObfuscationVersion = oldVersion }(tracerObfuscationVersion)
tracerObfuscationVersion = 2

assert.Len(t, tsp.Stats(), 0)
ss1, ok := c.newTracerStatSpan(&s1, obfuscate.NewObfuscator(obfuscate.Config{}))
assert.True(t, ok)
c.Start()
c.In <- ss1
c.Stop()
actualStats := tsp.Stats()
assert.Len(t, actualStats, 1)
assert.Len(t, actualStats[0].Stats, 1)
assert.Len(t, actualStats[0].Stats[0].Stats, 1)
assert.Equal(t, 2, tsp.obfVersion)
assert.Equal(t, "GET", actualStats[0].Stats[0].Stats[0].Resource)
}
16 changes: 12 additions & 4 deletions ddtrace/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2189,12 +2189,13 @@ func startTestTracer(t testing.TB, opts ...StartOption) (trc *tracer, transport
// Mock Transport with a real Encoder
type dummyTransport struct {
sync.RWMutex
traces spanLists
stats []*pb.ClientStatsPayload
traces spanLists
stats []*pb.ClientStatsPayload
obfVersion int
}

func newDummyTransport() *dummyTransport {
return &dummyTransport{traces: spanLists{}}
return &dummyTransport{traces: spanLists{}, obfVersion: -1}
}

func (t *dummyTransport) Len() int {
Expand All @@ -2203,9 +2204,10 @@ func (t *dummyTransport) Len() int {
return len(t.traces)
}

func (t *dummyTransport) sendStats(p *pb.ClientStatsPayload) error {
func (t *dummyTransport) sendStats(p *pb.ClientStatsPayload, obfVersion int) error {
t.Lock()
t.stats = append(t.stats, p)
t.obfVersion = obfVersion
t.Unlock()
return nil
}
Expand All @@ -2216,6 +2218,12 @@ func (t *dummyTransport) Stats() []*pb.ClientStatsPayload {
return t.stats
}

func (t *dummyTransport) ObfuscationVersion() int {
t.RLock()
defer t.RUnlock()
return t.obfVersion
}

func (t *dummyTransport) send(p *payload) (io.ReadCloser, error) {
traces, err := decode(p)
if err != nil {
Expand Down
22 changes: 13 additions & 9 deletions ddtrace/tracer/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"time"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"

traceinternal "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/version"
Expand Down Expand Up @@ -56,12 +55,13 @@ func defaultHTTPClient(timeout time.Duration) *http.Client {
}

const (
defaultHostname = "localhost"
defaultPort = "8126"
defaultAddress = defaultHostname + ":" + defaultPort
defaultURL = "http://" + defaultAddress
defaultHTTPTimeout = 10 * time.Second // defines the current timeout before giving up with the send process
traceCountHeader = "X-Datadog-Trace-Count" // header containing the number of traces in the payload
defaultHostname = "localhost"
defaultPort = "8126"
defaultAddress = defaultHostname + ":" + defaultPort
defaultURL = "http://" + defaultAddress
defaultHTTPTimeout = 10 * time.Second // defines the current timeout before giving up with the send process
traceCountHeader = "X-Datadog-Trace-Count" // header containing the number of traces in the payload
obfuscationVersionHeader = "Datadog-Obfuscation-Version" // header containing the version of obfuscation used, if any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity, why do some Datadog- headers have the X- prefix, but others don't? Should it be added here?

Copy link
Contributor Author

@ajgajg1134 ajgajg1134 Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to look this up, apparently the old MIME headers RFC said this should be done for custom headers. However since at least 2011 that's apparently been deprecated and you can just name headers whatever you want :P https://datatracker.ietf.org/doc/html/draft-saintandre-xdash-00 I noticed some of our more recent headers dropped the X- so it seemed to make sense to do that here as well

Oh looks like that draft is now this RFC itself from 2012

)

// transport is an interface for communicating data to the agent.
Expand All @@ -70,7 +70,8 @@ type transport interface {
// It returns a non-nil response body when no error occurred.
send(p *payload) (body io.ReadCloser, err error)
// sendStats sends the given stats payload to the agent.
sendStats(s *pb.ClientStatsPayload) error
// tracerObfuscationVersion is the version of obfuscation applied (0 if none was applied)
sendStats(s *pb.ClientStatsPayload, tracerObfuscationVersion int) error
// endpoint returns the URL to which the transport will send traces.
endpoint() string
}
Expand Down Expand Up @@ -115,7 +116,7 @@ func newHTTPTransport(url string, client *http.Client) *httpTransport {
}
}

func (t *httpTransport) sendStats(p *pb.ClientStatsPayload) error {
func (t *httpTransport) sendStats(p *pb.ClientStatsPayload, tracerObfuscationVersion int) error {
var buf bytes.Buffer
if err := msgp.Encode(&buf, p); err != nil {
return err
Expand All @@ -124,6 +125,9 @@ func (t *httpTransport) sendStats(p *pb.ClientStatsPayload) error {
if err != nil {
return err
}
if tracerObfuscationVersion > 0 {
req.Header.Set(obfuscationVersionHeader, strconv.Itoa(tracerObfuscationVersion))
}
resp, err := t.client.Do(req)
if err != nil {
return err
Expand Down
Loading