diff --git a/ddtrace/tracer/civisibility_transport.go b/ddtrace/tracer/civisibility_transport.go index 9910ea8fd3..8a8f1007c9 100644 --- a/ddtrace/tracer/civisibility_transport.go +++ b/ddtrace/tracer/civisibility_transport.go @@ -192,7 +192,7 @@ func (t *ciVisibilityTransport) send(p *payload) (body io.ReadCloser, err error) // Returns: // // An error indicating that stats are not supported. -func (t *ciVisibilityTransport) sendStats(*pb.ClientStatsPayload) error { +func (t *ciVisibilityTransport) sendStats(*pb.ClientStatsPayload, int) error { // Stats are not supported by CI Visibility agentless / EVP proxy. return nil } diff --git a/ddtrace/tracer/option.go b/ddtrace/tracer/option.go index a9fedd5044..38f6f8a68f 100644 --- a/ddtrace/tracer/option.go +++ b/ddtrace/tracer/option.go @@ -729,6 +729,9 @@ type agentFeatures struct { // metaStructAvailable reports whether the trace-agent can receive spans with the `meta_struct` field. metaStructAvailable bool + + // obfuscationVersion reports the trace-agent's version of obfuscation logic. A value of 0 means this field wasn't present. + obfuscationVersion int } // HasFlag reports whether the agent has set the feat feature flag. @@ -755,12 +758,13 @@ func loadAgentFeatures(agentDisabled bool, agentURL *url.URL, httpClient *http.C } defer resp.Body.Close() type infoResponse struct { - Endpoints []string `json:"endpoints"` - ClientDropP0s bool `json:"client_drop_p0s"` - FeatureFlags []string `json:"feature_flags"` - PeerTags []string `json:"peer_tags"` - SpanMetaStruct bool `json:"span_meta_structs"` - Config struct { + Endpoints []string `json:"endpoints"` + ClientDropP0s bool `json:"client_drop_p0s"` + FeatureFlags []string `json:"feature_flags"` + PeerTags []string `json:"peer_tags"` + SpanMetaStruct bool `json:"span_meta_structs"` + ObfuscationVersion int `json:"obfuscation_version"` + Config struct { StatsdPort int `json:"statsd_port"` } `json:"config"` } @@ -775,6 +779,7 @@ func loadAgentFeatures(agentDisabled bool, agentURL *url.URL, httpClient *http.C features.StatsdPort = info.Config.StatsdPort features.metaStructAvailable = info.SpanMetaStruct features.peerTags = info.PeerTags + features.obfuscationVersion = info.ObfuscationVersion for _, endpoint := range info.Endpoints { switch endpoint { case "/v0.6/stats": diff --git a/ddtrace/tracer/option_test.go b/ddtrace/tracer/option_test.go index 9045400905..ddf670253f 100644 --- a/ddtrace/tracer/option_test.go +++ b/ddtrace/tracer/option_test.go @@ -224,7 +224,7 @@ func TestLoadAgentFeatures(t *testing.T) { t.Run("OK", func(t *testing.T) { srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - w.Write([]byte(`{"endpoints":["/v0.6/stats"],"feature_flags":["a","b"],"client_drop_p0s":true,"peer_tags":["peer.hostname"],"config": {"statsd_port":8999}}`)) + w.Write([]byte(`{"endpoints":["/v0.6/stats"],"feature_flags":["a","b"],"client_drop_p0s":true,"obfuscation_version":2,"peer_tags":["peer.hostname"],"config": {"statsd_port":8999}}`)) })) defer srv.Close() cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://")), WithAgentTimeout(2)) @@ -238,6 +238,7 @@ func TestLoadAgentFeatures(t *testing.T) { assert.True(t, cfg.agent.HasFlag("a")) assert.True(t, cfg.agent.HasFlag("b")) assert.EqualValues(t, cfg.agent.peerTags, []string{"peer.hostname"}) + assert.Equal(t, 2, cfg.agent.obfuscationVersion) }) t.Run("discovery", func(t *testing.T) { diff --git a/ddtrace/tracer/stats.go b/ddtrace/tracer/stats.go index 687059dcc3..3ce10d68ca 100644 --- a/ddtrace/tracer/stats.go +++ b/ddtrace/tracer/stats.go @@ -20,6 +20,10 @@ import ( "github.com/DataDog/datadog-go/v5/statsd" ) +// tracerObfuscationVersion indicates which version of stats obfuscation logic we implement +// In the future this can be pulled directly from our obfuscation import. +var tracerObfuscationVersion = 1 + // defaultStatsBucketSize specifies the default span of time that will be // covered in one stats bucket. var defaultStatsBucketSize = (10 * time.Second).Nanoseconds() @@ -157,7 +161,11 @@ func (c *concentrator) runIngester() { } func (c *concentrator) newTracerStatSpan(s *span, obfuscator *obfuscate.Obfuscator) (*tracerStatSpan, bool) { - statSpan, ok := c.spanConcentrator.NewStatSpan(s.Service, obfuscatedResource(obfuscator, s.Type, s.Resource), + resource := s.Resource + if c.shouldObfuscate() { + resource = obfuscatedResource(obfuscator, s.Type, s.Resource) + } + statSpan, ok := c.spanConcentrator.NewStatSpan(s.Service, resource, s.Name, s.Type, s.ParentID, s.Start, s.Duration, s.Error, s.Meta, s.Metrics, c.cfg.agent.peerTags) if !ok { return nil, false @@ -169,6 +177,11 @@ func (c *concentrator) newTracerStatSpan(s *span, obfuscator *obfuscate.Obfuscat }, true } +func (c *concentrator) shouldObfuscate() bool { + // Obfuscate if agent reports an obfuscation version AND our version is at least as new + return c.cfg.agent.obfuscationVersion > 0 && c.cfg.agent.obfuscationVersion <= tracerObfuscationVersion +} + // add s into the concentrator's internal stats buckets. func (c *concentrator) add(s *tracerStatSpan) { c.spanConcentrator.AddSpan(s.statSpan, c.aggregationKey, "", nil, s.origin) @@ -204,6 +217,13 @@ const ( func (c *concentrator) flushAndSend(timenow time.Time, includeCurrent bool) { csps := c.spanConcentrator.Flush(timenow.UnixNano(), includeCurrent) + obfVersion := 0 + if c.shouldObfuscate() { + obfVersion = tracerObfuscationVersion + } else { + log.Debug("Stats Obfuscation was skipped, agent will obfuscate (tracer %d, agent %d)", tracerObfuscationVersion, c.cfg.agent.obfuscationVersion) + } + if len(csps) == 0 { // nothing to flush return @@ -214,7 +234,7 @@ func (c *concentrator) flushAndSend(timenow time.Time, includeCurrent bool) { // compatible in case this ever changes we can just iterate through all of them. for _, csp := range csps { flushedBuckets += len(csp.Stats) - if err := c.cfg.transport.sendStats(csp); err != nil { + if err := c.cfg.transport.sendStats(csp, obfVersion); err != nil { c.statsd().Incr("datadog.tracer.stats.flush_errors", nil, 1) log.Error("Error sending stats payload: %v", err) } diff --git a/ddtrace/tracer/stats_test.go b/ddtrace/tracer/stats_test.go index 503552adcf..1121009b2e 100644 --- a/ddtrace/tracer/stats_test.go +++ b/ddtrace/tracer/stats_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/DataDog/datadog-agent/pkg/obfuscate" "github.com/DataDog/datadog-go/v5/statsd" "gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/constants" "gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/utils" @@ -119,7 +120,7 @@ func TestConcentrator(t *testing.T) { // stats should be sent if the concentrator is stopped t.Run("stop", func(t *testing.T) { transport := newDummyTransport() - c := newConcentrator(&config{transport: transport}, 500000, &statsd.NoOpClientDirect{}) + c := newConcentrator(&config{transport: transport}, 500_000, &statsd.NoOpClientDirect{}) assert.Len(t, transport.Stats(), 0) ss1, ok := c.newTracerStatSpan(&s1, nil) assert.True(t, ok) @@ -130,3 +131,55 @@ func TestConcentrator(t *testing.T) { }) }) } + +func TestShouldObfuscate(t *testing.T) { + bucketSize := int64(500_000) + tsp := newDummyTransport() + for _, params := range []struct { + name string + tracerVersion int + agentVersion int + expectedShouldObfuscate bool + }{ + {name: "version equal", tracerVersion: 2, agentVersion: 2, expectedShouldObfuscate: true}, + {name: "agent version missing", tracerVersion: 2, agentVersion: 0, expectedShouldObfuscate: false}, + {name: "agent version older", tracerVersion: 2, agentVersion: 1, expectedShouldObfuscate: true}, + {name: "agent version newer", tracerVersion: 2, agentVersion: 3, expectedShouldObfuscate: false}, + } { + t.Run(params.name, func(t *testing.T) { + c := newConcentrator(&config{transport: tsp, env: "someEnv", agent: agentFeatures{obfuscationVersion: params.agentVersion}}, bucketSize, &statsd.NoOpClientDirect{}) + defer func(oldVersion int) { tracerObfuscationVersion = oldVersion }(tracerObfuscationVersion) + tracerObfuscationVersion = params.tracerVersion + assert.Equal(t, params.expectedShouldObfuscate, c.shouldObfuscate()) + }) + } +} + +func TestObfuscation(t *testing.T) { + bucketSize := int64(500_000) + s1 := span{ + Name: "redis-query", + Start: time.Now().UnixNano() + 3*bucketSize, + Duration: 1, + Metrics: map[string]float64{keyMeasured: 1}, + Type: "redis", + Resource: "GET somekey", + } + tsp := newDummyTransport() + c := newConcentrator(&config{transport: tsp, env: "someEnv", agent: agentFeatures{obfuscationVersion: 2}}, bucketSize, &statsd.NoOpClientDirect{}) + defer func(oldVersion int) { tracerObfuscationVersion = oldVersion }(tracerObfuscationVersion) + tracerObfuscationVersion = 2 + + assert.Len(t, tsp.Stats(), 0) + ss1, ok := c.newTracerStatSpan(&s1, obfuscate.NewObfuscator(obfuscate.Config{})) + assert.True(t, ok) + c.Start() + c.In <- ss1 + c.Stop() + actualStats := tsp.Stats() + assert.Len(t, actualStats, 1) + assert.Len(t, actualStats[0].Stats, 1) + assert.Len(t, actualStats[0].Stats[0].Stats, 1) + assert.Equal(t, 2, tsp.obfVersion) + assert.Equal(t, "GET", actualStats[0].Stats[0].Stats[0].Resource) +} diff --git a/ddtrace/tracer/tracer_test.go b/ddtrace/tracer/tracer_test.go index dd1820621e..ad0b75009f 100644 --- a/ddtrace/tracer/tracer_test.go +++ b/ddtrace/tracer/tracer_test.go @@ -2189,12 +2189,13 @@ func startTestTracer(t testing.TB, opts ...StartOption) (trc *tracer, transport // Mock Transport with a real Encoder type dummyTransport struct { sync.RWMutex - traces spanLists - stats []*pb.ClientStatsPayload + traces spanLists + stats []*pb.ClientStatsPayload + obfVersion int } func newDummyTransport() *dummyTransport { - return &dummyTransport{traces: spanLists{}} + return &dummyTransport{traces: spanLists{}, obfVersion: -1} } func (t *dummyTransport) Len() int { @@ -2203,9 +2204,10 @@ func (t *dummyTransport) Len() int { return len(t.traces) } -func (t *dummyTransport) sendStats(p *pb.ClientStatsPayload) error { +func (t *dummyTransport) sendStats(p *pb.ClientStatsPayload, obfVersion int) error { t.Lock() t.stats = append(t.stats, p) + t.obfVersion = obfVersion t.Unlock() return nil } @@ -2216,6 +2218,12 @@ func (t *dummyTransport) Stats() []*pb.ClientStatsPayload { return t.stats } +func (t *dummyTransport) ObfuscationVersion() int { + t.RLock() + defer t.RUnlock() + return t.obfVersion +} + func (t *dummyTransport) send(p *payload) (io.ReadCloser, error) { traces, err := decode(p) if err != nil { diff --git a/ddtrace/tracer/transport.go b/ddtrace/tracer/transport.go index 1cf84254ad..6c0432a329 100644 --- a/ddtrace/tracer/transport.go +++ b/ddtrace/tracer/transport.go @@ -18,7 +18,6 @@ import ( "time" pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace" - traceinternal "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal" "gopkg.in/DataDog/dd-trace-go.v1/internal" "gopkg.in/DataDog/dd-trace-go.v1/internal/version" @@ -56,12 +55,13 @@ func defaultHTTPClient(timeout time.Duration) *http.Client { } const ( - defaultHostname = "localhost" - defaultPort = "8126" - defaultAddress = defaultHostname + ":" + defaultPort - defaultURL = "http://" + defaultAddress - defaultHTTPTimeout = 10 * time.Second // defines the current timeout before giving up with the send process - traceCountHeader = "X-Datadog-Trace-Count" // header containing the number of traces in the payload + defaultHostname = "localhost" + defaultPort = "8126" + defaultAddress = defaultHostname + ":" + defaultPort + defaultURL = "http://" + defaultAddress + defaultHTTPTimeout = 10 * time.Second // defines the current timeout before giving up with the send process + traceCountHeader = "X-Datadog-Trace-Count" // header containing the number of traces in the payload + obfuscationVersionHeader = "Datadog-Obfuscation-Version" // header containing the version of obfuscation used, if any ) // transport is an interface for communicating data to the agent. @@ -70,7 +70,8 @@ type transport interface { // It returns a non-nil response body when no error occurred. send(p *payload) (body io.ReadCloser, err error) // sendStats sends the given stats payload to the agent. - sendStats(s *pb.ClientStatsPayload) error + // tracerObfuscationVersion is the version of obfuscation applied (0 if none was applied) + sendStats(s *pb.ClientStatsPayload, tracerObfuscationVersion int) error // endpoint returns the URL to which the transport will send traces. endpoint() string } @@ -115,7 +116,7 @@ func newHTTPTransport(url string, client *http.Client) *httpTransport { } } -func (t *httpTransport) sendStats(p *pb.ClientStatsPayload) error { +func (t *httpTransport) sendStats(p *pb.ClientStatsPayload, tracerObfuscationVersion int) error { var buf bytes.Buffer if err := msgp.Encode(&buf, p); err != nil { return err @@ -124,6 +125,9 @@ func (t *httpTransport) sendStats(p *pb.ClientStatsPayload) error { if err != nil { return err } + if tracerObfuscationVersion > 0 { + req.Header.Set(obfuscationVersionHeader, strconv.Itoa(tracerObfuscationVersion)) + } resp, err := t.client.Do(req) if err != nil { return err