From aa51e23ea13dd5c6df110e4698dd39675617214b Mon Sep 17 00:00:00 2001 From: Romain Marcadier Date: Fri, 6 Dec 2024 11:44:00 +0100 Subject: [PATCH 1/6] telemetry: submit `orchestrion_usage` counter metric To make it easier to get data about orchestrion usage, start submitting a counter metric in telemetry for applications built with orchestrion. --- ddtrace/tracer/telemetry.go | 14 ++++++++++++++ ddtrace/tracer/telemetry_test.go | 8 ++++++++ internal/telemetry/telemetrytest/telemetrytest.go | 1 - 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/ddtrace/tracer/telemetry.go b/ddtrace/tracer/telemetry.go index bc9185858f..51a38a13be 100644 --- a/ddtrace/tracer/telemetry.go +++ b/ddtrace/tracer/telemetry.go @@ -7,7 +7,9 @@ package tracer import ( "fmt" + "slices" "strings" + "testing" "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" ) @@ -108,9 +110,21 @@ func startTelemetry(c *config) { Value: fmt.Sprintf("rate:%f_maxPerSecond:%f", rule.Rate, rule.MaxPerSecond)}) } if c.orchestrionCfg.Enabled { + tags := make([]string, 0, len(c.orchestrionCfg.Metadata)) for k, v := range c.orchestrionCfg.Metadata { telemetryConfigs = append(telemetryConfigs, telemetry.Configuration{Name: "orchestrion_" + k, Value: v}) + tags = append(tags, k+":"+v) } + if testing.Testing() { + // In tests, ensure tags are consistently ordered... + slices.Sort(tags) + } + telemetry.GlobalClient.Count( + telemetry.NamespaceTracers, + "orchestrion_usage", 1, + tags, + false, // Go-specific + ) } telemetryConfigs = append(telemetryConfigs, additionalConfigs...) telemetry.GlobalClient.ProductChange(telemetry.NamespaceTracers, true, telemetryConfigs) diff --git a/ddtrace/tracer/telemetry_test.go b/ddtrace/tracer/telemetry_test.go index 6dee152e14..83947f29a3 100644 --- a/ddtrace/tracer/telemetry_test.go +++ b/ddtrace/tracer/telemetry_test.go @@ -154,11 +154,19 @@ func TestTelemetryEnabled(t *testing.T) { telemetryClient := new(telemetrytest.MockClient) defer telemetry.MockGlobalClient(telemetryClient)() + telemetryClient.On("Count", + telemetry.NamespaceTracers, + "orchestrion_usage", 1.0, + []string{"k1:v1", "k2:v2"}, + false, + ).Return() + Start(WithOrchestrion(map[string]string{"k1": "v1", "k2": "v2"})) defer Stop() telemetry.Check(t, telemetryClient.Configuration, "orchestrion_enabled", true) telemetry.Check(t, telemetryClient.Configuration, "orchestrion_k1", "v1") telemetry.Check(t, telemetryClient.Configuration, "orchestrion_k2", "v2") + telemetryClient.AssertExpectations(t) }) } diff --git a/internal/telemetry/telemetrytest/telemetrytest.go b/internal/telemetry/telemetrytest/telemetrytest.go index 0c8b22bd32..f856d79eb3 100644 --- a/internal/telemetry/telemetrytest/telemetrytest.go +++ b/internal/telemetry/telemetrytest/telemetrytest.go @@ -82,7 +82,6 @@ func (c *MockClient) Record(ns telemetry.Namespace, _ telemetry.MetricKind, name // Count counts the value for the given metric func (c *MockClient) Count(ns telemetry.Namespace, name string, val float64, tags []string, common bool) { - c.On("Count", ns, name, val, tags, common).Return() _ = c.Called(ns, name, val, tags, common) } From edf5d5397f7b5b706c9224b41a3a482f7a02e049 Mon Sep 17 00:00:00 2001 From: Romain Marcadier Date: Fri, 6 Dec 2024 13:25:08 +0100 Subject: [PATCH 2/6] Fix tests --- ddtrace/tracer/telemetry.go | 2 +- ddtrace/tracer/telemetry_test.go | 14 ++++++-------- internal/telemetry/telemetrytest/telemetrytest.go | 4 ++++ 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/ddtrace/tracer/telemetry.go b/ddtrace/tracer/telemetry.go index 51a38a13be..b5b0f83223 100644 --- a/ddtrace/tracer/telemetry.go +++ b/ddtrace/tracer/telemetry.go @@ -121,7 +121,7 @@ func startTelemetry(c *config) { } telemetry.GlobalClient.Count( telemetry.NamespaceTracers, - "orchestrion_usage", 1, + "orchestrion.enabled", 1, tags, false, // Go-specific ) diff --git a/ddtrace/tracer/telemetry_test.go b/ddtrace/tracer/telemetry_test.go index 83947f29a3..d1cf1f897d 100644 --- a/ddtrace/tracer/telemetry_test.go +++ b/ddtrace/tracer/telemetry_test.go @@ -154,19 +154,17 @@ func TestTelemetryEnabled(t *testing.T) { telemetryClient := new(telemetrytest.MockClient) defer telemetry.MockGlobalClient(telemetryClient)() - telemetryClient.On("Count", - telemetry.NamespaceTracers, - "orchestrion_usage", 1.0, - []string{"k1:v1", "k2:v2"}, - false, - ).Return() - Start(WithOrchestrion(map[string]string{"k1": "v1", "k2": "v2"})) defer Stop() telemetry.Check(t, telemetryClient.Configuration, "orchestrion_enabled", true) telemetry.Check(t, telemetryClient.Configuration, "orchestrion_k1", "v1") telemetry.Check(t, telemetryClient.Configuration, "orchestrion_k2", "v2") - telemetryClient.AssertExpectations(t) + telemetryClient.AssertCalled(t, "Count", + telemetry.NamespaceTracers, + "orchestrion.enabled", 1.0, + []string{"k1:v1", "k2:v2"}, + false, + ) }) } diff --git a/internal/telemetry/telemetrytest/telemetrytest.go b/internal/telemetry/telemetrytest/telemetrytest.go index f856d79eb3..f6977645bb 100644 --- a/internal/telemetry/telemetrytest/telemetrytest.go +++ b/internal/telemetry/telemetrytest/telemetrytest.go @@ -75,6 +75,9 @@ func (c *MockClient) Record(ns telemetry.Namespace, _ telemetry.MetricKind, name _ = c.Called(ns, name, val, tags, common) // record the val for tests that assert based on the value if _, ok := c.Metrics[ns]; !ok { + if c.Metrics == nil { + c.Metrics = make(map[telemetry.Namespace]map[string]float64) + } c.Metrics[ns] = map[string]float64{} } c.Metrics[ns][name] = val @@ -82,6 +85,7 @@ func (c *MockClient) Record(ns telemetry.Namespace, _ telemetry.MetricKind, name // Count counts the value for the given metric func (c *MockClient) Count(ns telemetry.Namespace, name string, val float64, tags []string, common bool) { + c.On("Count", ns, name, val, tags, common).Return() _ = c.Called(ns, name, val, tags, common) } From 64f7fe1a65fa4fbfdb1ae981f73b4429b2160fe8 Mon Sep 17 00:00:00 2001 From: Romain Marcadier Date: Fri, 6 Dec 2024 13:41:24 +0100 Subject: [PATCH 3/6] Switch to gauge metric --- ddtrace/tracer/telemetry.go | 3 ++- ddtrace/tracer/telemetry_test.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/ddtrace/tracer/telemetry.go b/ddtrace/tracer/telemetry.go index b5b0f83223..dbaacdaa08 100644 --- a/ddtrace/tracer/telemetry.go +++ b/ddtrace/tracer/telemetry.go @@ -119,8 +119,9 @@ func startTelemetry(c *config) { // In tests, ensure tags are consistently ordered... slices.Sort(tags) } - telemetry.GlobalClient.Count( + telemetry.GlobalClient.Record( telemetry.NamespaceTracers, + telemetry.MetricKindGauge, "orchestrion.enabled", 1, tags, false, // Go-specific diff --git a/ddtrace/tracer/telemetry_test.go b/ddtrace/tracer/telemetry_test.go index d1cf1f897d..a4ad105ac4 100644 --- a/ddtrace/tracer/telemetry_test.go +++ b/ddtrace/tracer/telemetry_test.go @@ -160,8 +160,9 @@ func TestTelemetryEnabled(t *testing.T) { telemetry.Check(t, telemetryClient.Configuration, "orchestrion_enabled", true) telemetry.Check(t, telemetryClient.Configuration, "orchestrion_k1", "v1") telemetry.Check(t, telemetryClient.Configuration, "orchestrion_k2", "v2") - telemetryClient.AssertCalled(t, "Count", + telemetryClient.AssertCalled(t, "Record", telemetry.NamespaceTracers, + telemetry.MetricKindGauge, "orchestrion.enabled", 1.0, []string{"k1:v1", "k2:v2"}, false, From dbdc01ac0efb7479d6af0087bfe259a15b84bd4f Mon Sep 17 00:00:00 2001 From: Romain Marcadier Date: Mon, 9 Dec 2024 12:06:09 +0100 Subject: [PATCH 4/6] emit the metric on every heartbeat --- ddtrace/tracer/telemetry.go | 70 +++++++++++++++++++------------ internal/telemetry/client.go | 40 +++++++++++++++++- internal/telemetry/client_test.go | 60 ++++++++++++++++++++++++++ internal/telemetry/option.go | 11 +++++ 4 files changed, 152 insertions(+), 29 deletions(-) diff --git a/ddtrace/tracer/telemetry.go b/ddtrace/tracer/telemetry.go index dbaacdaa08..d871adf971 100644 --- a/ddtrace/tracer/telemetry.go +++ b/ddtrace/tracer/telemetry.go @@ -33,15 +33,7 @@ func startTelemetry(c *config) { // Do not do extra work populating config data if instrumentation telemetry is disabled. return } - telemetry.GlobalClient.ApplyOps( - telemetry.WithService(c.serviceName), - telemetry.WithEnv(c.env), - telemetry.WithHTTPClient(c.httpClient), - // c.logToStdout is true if serverless is turned on - // c.ciVisibilityAgentless is true if ci visibility mode is turned on and agentless writer is configured - telemetry.WithURL(c.logToStdout || c.ciVisibilityAgentless, c.agentURL.String()), - telemetry.WithVersion(c.version), - ) + telemetryConfigs := []telemetry.Configuration{ {Name: "trace_debug_enabled", Value: c.debug}, {Name: "agent_feature_drop_p0s", Value: c.agent.DropP0s}, @@ -73,6 +65,38 @@ func startTelemetry(c *config) { c.traceSampleRules.toTelemetry(), telemetry.Sanitize(telemetry.Configuration{Name: "span_sample_rules", Value: c.spanRules}), } + + // Process orchestrion enablement metric emission... + const orchestrionEnabledMetric = "orchestrion.enabled" + var ( + orchestrionEnabledValue float64 + orchestrionEnabledTags []string + ) + if c.orchestrionCfg.Enabled { + orchestrionEnabledValue = 1 + orchestrionEnabledTags = make([]string, 0, len(c.orchestrionCfg.Metadata)) + for k, v := range c.orchestrionCfg.Metadata { + telemetryConfigs = append(telemetryConfigs, telemetry.Configuration{Name: "orchestrion_" + k, Value: v}) + orchestrionEnabledTags = append(orchestrionEnabledTags, k+":"+v) + } + if testing.Testing() { + // In tests, ensure tags are consistently ordered... Ordering is irrelevant outside of tests. + slices.Sort(orchestrionEnabledTags) + } + } + + // Apply the GlobalClient options... + telemetry.GlobalClient.ApplyOps( + telemetry.WithService(c.serviceName), + telemetry.WithEnv(c.env), + telemetry.WithHTTPClient(c.httpClient), + // c.logToStdout is true if serverless is turned on + // c.ciVisibilityAgentless is true if ci visibility mode is turned on and agentless writer is configured + telemetry.WithURL(c.logToStdout || c.ciVisibilityAgentless, c.agentURL.String()), + telemetry.WithVersion(c.version), + telemetry.WithHeartbeatMetric(telemetry.NamespaceTracers, telemetry.MetricKindGauge, orchestrionEnabledMetric, func() float64 { return orchestrionEnabledValue }, orchestrionEnabledTags, false), + ) + var peerServiceMapping []string for key, value := range c.peerServiceMappings { peerServiceMapping = append(peerServiceMapping, fmt.Sprintf("%s:%s", key, value)) @@ -109,24 +133,16 @@ func startTelemetry(c *config) { telemetry.Configuration{Name: fmt.Sprintf("sr_%s_(%s)_(%s)", rule.ruleType.String(), service, name), Value: fmt.Sprintf("rate:%f_maxPerSecond:%f", rule.Rate, rule.MaxPerSecond)}) } - if c.orchestrionCfg.Enabled { - tags := make([]string, 0, len(c.orchestrionCfg.Metadata)) - for k, v := range c.orchestrionCfg.Metadata { - telemetryConfigs = append(telemetryConfigs, telemetry.Configuration{Name: "orchestrion_" + k, Value: v}) - tags = append(tags, k+":"+v) - } - if testing.Testing() { - // In tests, ensure tags are consistently ordered... - slices.Sort(tags) - } - telemetry.GlobalClient.Record( - telemetry.NamespaceTracers, - telemetry.MetricKindGauge, - "orchestrion.enabled", 1, - tags, - false, // Go-specific - ) - } + + // Submit the initial metric tick + telemetry.GlobalClient.Record( + telemetry.NamespaceTracers, + telemetry.MetricKindGauge, + orchestrionEnabledMetric, orchestrionEnabledValue, + orchestrionEnabledTags, + false, // Go-specific + ) + telemetryConfigs = append(telemetryConfigs, additionalConfigs...) telemetry.GlobalClient.ProductChange(telemetry.NamespaceTracers, true, telemetryConfigs) } diff --git a/internal/telemetry/client.go b/internal/telemetry/client.go index 7a94be85cd..9495620c06 100644 --- a/internal/telemetry/client.go +++ b/internal/telemetry/client.go @@ -147,6 +147,19 @@ type client struct { // Globally registered application configuration sent in the app-started request, along with the locally-defined // configuration of the event. globalAppConfig []Configuration + + // heartbeatMetrics is a set of metrics to be emitted each time a heartbeat is sent. + heartbeatMetrics []heartbeatMetric +} + +// heartbeatMetric is a metric that is emitted each time a heartbeat is sent. +type heartbeatMetric struct { + namespace Namespace + kind MetricKind + name string + value func() float64 // Called to determine the current value of the metric. + tags []string + common bool } func log(msg string, args ...interface{}) { @@ -338,14 +351,22 @@ func metricKey(name string, tags []string, kind MetricKind) string { return name + string(kind) + strings.Join(tags, "-") } -// Record sets the value for a gauge or distribution metric type -// with the given name and tags. If the metric is not language-specific, common should be set to true +// Record sets the value for a gauge or distribution metric type with the given +// name and tags. If the metric is not language-specific, common should be set +// to true func (c *client) Record(namespace Namespace, kind MetricKind, name string, value float64, tags []string, common bool) { c.mu.Lock() defer c.mu.Unlock() if !c.started { return } + c.record(namespace, kind, name, value, tags, common) +} + +// record sets the value for a gauge or distribution metric type with the given +// name and tags. If the metric is not language-soecific, common should be set +// to true. Must be called with c.mu locked. +func (c *client) record(namespace Namespace, kind MetricKind, name string, value float64, tags []string, common bool) { if _, ok := c.metrics[namespace]; !ok { c.metrics[namespace] = map[string]*metric{} } @@ -606,7 +627,22 @@ func (c *client) backgroundHeartbeat() { if !c.started { return } + + // Emit all the metrics that were registered for heartbeat. + c.emitHeartbeatMetrics() + + // Send the actual app heartbeat. c.scheduleSubmit(c.newRequest(RequestTypeAppHeartbeat)) c.flush(false) c.heartbeatT.Reset(c.heartbeatInterval) } + +// emitHeartbeatMetrics is invoked as part of each heartbeat tick, and is +// responsible for emitting periodic metrics that are expected to be sent +// throughout the lifetime of the service. These are typically gauge metrics. +// Must be called with c.mu locked. +func (c *client) emitHeartbeatMetrics() { + for _, m := range c.heartbeatMetrics { + c.record(m.namespace, m.kind, m.name, m.value(), m.tags, m.common) + } +} diff --git a/internal/telemetry/client_test.go b/internal/telemetry/client_test.go index 556157e38f..a75e2ec3a7 100644 --- a/internal/telemetry/client_test.go +++ b/internal/telemetry/client_test.go @@ -52,7 +52,67 @@ func TestClient(t *testing.T) { t.Fatal("Heartbeat took more than 30 seconds. Should have been ~1 second") case <-heartbeat: } +} +func TestHeartbeatMetric(t *testing.T) { + t.Setenv("DD_TELEMETRY_HEARTBEAT_INTERVAL", "1") + heartbeat := make(chan struct{}) + metrics := make(chan string) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + h := r.Header.Get("DD-Telemetry-Request-Type") + if len(h) == 0 { + t.Fatal("didn't get telemetry request type header") + } + switch RequestType(h) { + case RequestTypeAppHeartbeat: + select { + case heartbeat <- struct{}{}: + default: + } + case RequestTypeGenerateMetrics: + var data struct { + Payload *Metrics + } + if err := json.NewDecoder(r.Body).Decode(&data); err != nil { + t.Fatal(err) + } + for _, s := range data.Payload.Series { + select { + case metrics <- s.Metric: + default: + } + } + } + })) + defer server.Close() + + client := &client{ + URL: server.URL, + } + const metricName = "test.metric" + client.ApplyOps(WithHeartbeatMetric(NamespaceGeneral, MetricKindGauge, metricName, func() float64 { return 1 }, nil, false)) + + client.mu.Lock() + client.start(nil, NamespaceTracers, true) + client.start(nil, NamespaceTracers, true) // test idempotence + client.mu.Unlock() + defer client.Stop() + + timeout := time.After(30 * time.Second) + waitingForHeartbeat := true + waitingForMetric := true + for waitingForHeartbeat || waitingForMetric { + select { + case <-timeout: + t.Fatal("Heartbeat took more than 30 seconds. Should have been ~1 second") + case <-heartbeat: + waitingForHeartbeat = false + case m := <-metrics: + assert.Equal(t, metricName, m) + waitingForMetric = false + } + } } func TestMetrics(t *testing.T) { diff --git a/internal/telemetry/option.go b/internal/telemetry/option.go index 8320d1a5fb..2735958117 100644 --- a/internal/telemetry/option.go +++ b/internal/telemetry/option.go @@ -56,6 +56,17 @@ func WithVersion(version string) Option { } } +// WithHeartbeatMetric register a metric data point to be emitted at each +// heartbeat tick. This is useful to maintain gauge metrics at a specific level. +func WithHeartbeatMetric(namespace Namespace, kind MetricKind, name string, value func() float64, tags []string, common bool) Option { + return func(client *client) { + client.heartbeatMetrics = append( + client.heartbeatMetrics, + heartbeatMetric{namespace, kind, name, value, tags, common}, + ) + } +} + // WithHTTPClient specifies the http client for the telemetry client func WithHTTPClient(httpClient *http.Client) Option { return func(client *client) { From 79e9f936447334701ed38cd89d4a22af7fb7bcb7 Mon Sep 17 00:00:00 2001 From: Romain Marcadier Date: Mon, 9 Dec 2024 13:27:20 +0100 Subject: [PATCH 5/6] Remove use of testing.Testing() --- ddtrace/tracer/telemetry.go | 6 ------ internal/telemetry/telemetrytest/telemetrytest.go | 11 +++++++---- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/ddtrace/tracer/telemetry.go b/ddtrace/tracer/telemetry.go index d871adf971..9fa60de2dc 100644 --- a/ddtrace/tracer/telemetry.go +++ b/ddtrace/tracer/telemetry.go @@ -7,9 +7,7 @@ package tracer import ( "fmt" - "slices" "strings" - "testing" "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" ) @@ -79,10 +77,6 @@ func startTelemetry(c *config) { telemetryConfigs = append(telemetryConfigs, telemetry.Configuration{Name: "orchestrion_" + k, Value: v}) orchestrionEnabledTags = append(orchestrionEnabledTags, k+":"+v) } - if testing.Testing() { - // In tests, ensure tags are consistently ordered... Ordering is irrelevant outside of tests. - slices.Sort(orchestrionEnabledTags) - } } // Apply the GlobalClient options... diff --git a/internal/telemetry/telemetrytest/telemetrytest.go b/internal/telemetry/telemetrytest/telemetrytest.go index f6977645bb..8861c966d6 100644 --- a/internal/telemetry/telemetrytest/telemetrytest.go +++ b/internal/telemetry/telemetrytest/telemetrytest.go @@ -7,6 +7,7 @@ package telemetrytest import ( + "slices" "sync" "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" @@ -69,10 +70,12 @@ func (c *MockClient) productChange(namespace telemetry.Namespace, enabled bool) } // Record stores the value for the given metric. It is currently mocked for `Gauge` and `Distribution` metric types. -func (c *MockClient) Record(ns telemetry.Namespace, _ telemetry.MetricKind, name string, val float64, tags []string, common bool) { - c.On("Gauge", ns, name, val, tags, common).Return() - c.On("Record", ns, name, val, tags, common).Return() - _ = c.Called(ns, name, val, tags, common) +func (c *MockClient) Record(ns telemetry.Namespace, kind telemetry.MetricKind, name string, val float64, tags []string, common bool) { + // Ensure consistent ordering through expectations + slices.Sort(tags) + + c.On("Record", ns, kind, name, val, tags, common).Return() + _ = c.Called(ns, kind, name, val, tags, common) // record the val for tests that assert based on the value if _, ok := c.Metrics[ns]; !ok { if c.Metrics == nil { From 78e6f39433ad720dbfee3968ef11be6e2e4ef4cd Mon Sep 17 00:00:00 2001 From: Romain Marcadier Date: Mon, 9 Dec 2024 14:06:55 +0100 Subject: [PATCH 6/6] Fix telemetry payload schema --- ddtrace/tracer/telemetry.go | 5 +- ddtrace/tracer/telemetry_test.go | 6 +- .../utils/telemetry/telemetry_distribution.go | 32 +++--- internal/telemetry/client.go | 106 +++++++++++++----- internal/telemetry/client_test.go | 12 +- internal/telemetry/telemetry.go | 2 +- .../telemetry/telemetrytest/telemetrytest.go | 30 ++++- 7 files changed, 133 insertions(+), 60 deletions(-) diff --git a/ddtrace/tracer/telemetry.go b/ddtrace/tracer/telemetry.go index 9fa60de2dc..4c3814aab0 100644 --- a/ddtrace/tracer/telemetry.go +++ b/ddtrace/tracer/telemetry.go @@ -129,10 +129,9 @@ func startTelemetry(c *config) { } // Submit the initial metric tick - telemetry.GlobalClient.Record( + telemetry.GlobalClient.Gauge( telemetry.NamespaceTracers, - telemetry.MetricKindGauge, - orchestrionEnabledMetric, orchestrionEnabledValue, + orchestrionEnabledMetric, telemetry.GlobalClient.HeartbeatInterval(), orchestrionEnabledValue, orchestrionEnabledTags, false, // Go-specific ) diff --git a/ddtrace/tracer/telemetry_test.go b/ddtrace/tracer/telemetry_test.go index a4ad105ac4..f3769a248c 100644 --- a/ddtrace/tracer/telemetry_test.go +++ b/ddtrace/tracer/telemetry_test.go @@ -8,6 +8,7 @@ package tracer import ( "fmt" "testing" + "time" "gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig" "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" @@ -160,10 +161,9 @@ func TestTelemetryEnabled(t *testing.T) { telemetry.Check(t, telemetryClient.Configuration, "orchestrion_enabled", true) telemetry.Check(t, telemetryClient.Configuration, "orchestrion_k1", "v1") telemetry.Check(t, telemetryClient.Configuration, "orchestrion_k2", "v2") - telemetryClient.AssertCalled(t, "Record", + telemetryClient.AssertCalled(t, "Gauge", telemetry.NamespaceTracers, - telemetry.MetricKindGauge, - "orchestrion.enabled", 1.0, + "orchestrion.enabled", time.Second, 1.0, []string{"k1:v1", "k2:v2"}, false, ) diff --git a/internal/civisibility/utils/telemetry/telemetry_distribution.go b/internal/civisibility/utils/telemetry/telemetry_distribution.go index 3b4d8d54fe..bd02707263 100644 --- a/internal/civisibility/utils/telemetry/telemetry_distribution.go +++ b/internal/civisibility/utils/telemetry/telemetry_distribution.go @@ -9,96 +9,96 @@ import "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" // EndpointPayloadBytes records the size in bytes of the serialized payload by CI Visibility. func EndpointPayloadBytes(endpointType EndpointType, value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "endpoint_payload.bytes", value, removeEmptyStrings([]string{ + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "endpoint_payload.bytes", value, removeEmptyStrings([]string{ (string)(endpointType), }), true) } // EndpointPayloadRequestsMs records the time it takes to send the payload sent to the endpoint in ms by CI Visibility. func EndpointPayloadRequestsMs(endpointType EndpointType, value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "endpoint_payload.requests_ms", value, removeEmptyStrings([]string{ + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "endpoint_payload.requests_ms", value, removeEmptyStrings([]string{ (string)(endpointType), }), true) } // EndpointPayloadEventsCount records the number of events in the payload sent to the endpoint by CI Visibility. func EndpointPayloadEventsCount(endpointType EndpointType, value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "endpoint_payload.events_count", value, removeEmptyStrings([]string{ + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "endpoint_payload.events_count", value, removeEmptyStrings([]string{ (string)(endpointType), }), true) } // EndpointEventsSerializationMs records the time it takes to serialize the events in the payload sent to the endpoint in ms by CI Visibility. func EndpointEventsSerializationMs(endpointType EndpointType, value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "endpoint_payload.events_serialization_ms", value, removeEmptyStrings([]string{ + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "endpoint_payload.events_serialization_ms", value, removeEmptyStrings([]string{ (string)(endpointType), }), true) } // GitCommandMs records the time it takes to execute a git command in ms by CI Visibility. func GitCommandMs(commandType CommandType, value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "git.command_ms", value, removeEmptyStrings([]string{ + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "git.command_ms", value, removeEmptyStrings([]string{ (string)(commandType), }), true) } // GitRequestsSearchCommitsMs records the time it takes to get the response of the search commit quest in ms by CI Visibility. func GitRequestsSearchCommitsMs(responseCompressedType ResponseCompressedType, value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "git_requests.search_commits_ms", value, removeEmptyStrings([]string{ + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "git_requests.search_commits_ms", value, removeEmptyStrings([]string{ (string)(responseCompressedType), }), true) } // GitRequestsObjectsPackMs records the time it takes to get the response of the objects pack request in ms by CI Visibility. func GitRequestsObjectsPackMs(value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "git_requests.objects_pack_ms", value, nil, true) + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "git_requests.objects_pack_ms", value, nil, true) } // GitRequestsObjectsPackBytes records the sum of the sizes of the object pack files inside a single payload by CI Visibility func GitRequestsObjectsPackBytes(value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "git_requests.objects_pack_bytes", value, nil, true) + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "git_requests.objects_pack_bytes", value, nil, true) } // GitRequestsObjectsPackFiles records the number of files sent in the object pack payload by CI Visibility. func GitRequestsObjectsPackFiles(value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "git_requests.objects_pack_files", value, nil, true) + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "git_requests.objects_pack_files", value, nil, true) } // GitRequestsSettingsMs records the time it takes to get the response of the settings endpoint request in ms by CI Visibility. func GitRequestsSettingsMs(value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "git_requests.settings_ms", value, nil, true) + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "git_requests.settings_ms", value, nil, true) } // ITRSkippableTestsRequestMs records the time it takes to get the response of the itr skippable tests endpoint request in ms by CI Visibility. func ITRSkippableTestsRequestMs(value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "itr_skippable_tests.request_ms", value, nil, true) + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "itr_skippable_tests.request_ms", value, nil, true) } // ITRSkippableTestsResponseBytes records the number of bytes received by the endpoint. Tagged with a boolean flag set to true if response body is compressed. func ITRSkippableTestsResponseBytes(responseCompressedType ResponseCompressedType, value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "itr_skippable_tests.response_bytes", value, removeEmptyStrings([]string{ + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "itr_skippable_tests.response_bytes", value, removeEmptyStrings([]string{ (string)(responseCompressedType), }), true) } // CodeCoverageFiles records the number of files in the code coverage report by CI Visibility. func CodeCoverageFiles(value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "code_coverage.files", value, nil, true) + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "code_coverage.files", value, nil, true) } // EarlyFlakeDetectionRequestMs records the time it takes to get the response of the early flake detection endpoint request in ms by CI Visibility. func EarlyFlakeDetectionRequestMs(value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "early_flake_detection.request_ms", value, nil, true) + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "early_flake_detection.request_ms", value, nil, true) } // EarlyFlakeDetectionResponseBytes records the number of bytes received by the endpoint. Tagged with a boolean flag set to true if response body is compressed. func EarlyFlakeDetectionResponseBytes(responseCompressedType ResponseCompressedType, value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "early_flake_detection.response_bytes", value, removeEmptyStrings([]string{ + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "early_flake_detection.response_bytes", value, removeEmptyStrings([]string{ (string)(responseCompressedType), }), true) } // EarlyFlakeDetectionResponseTests records the number of tests in the response of the early flake detection endpoint by CI Visibility. func EarlyFlakeDetectionResponseTests(value float64) { - telemetry.GlobalClient.Record(telemetry.NamespaceCiVisibility, telemetry.MetricKindDist, "early_flake_detection.response_tests", value, nil, true) + telemetry.GlobalClient.Distribution(telemetry.NamespaceCiVisibility, "early_flake_detection.response_tests", value, nil, true) } diff --git a/internal/telemetry/client.go b/internal/telemetry/client.go index 9495620c06..b6adae24f2 100644 --- a/internal/telemetry/client.go +++ b/internal/telemetry/client.go @@ -33,10 +33,15 @@ type Client interface { RegisterAppConfig(name string, val interface{}, origin Origin) ProductChange(namespace Namespace, enabled bool, configuration []Configuration) ConfigChange(configuration []Configuration) - Record(namespace Namespace, metric MetricKind, name string, value float64, tags []string, common bool) + Count(namespace Namespace, name string, value float64, tags []string, common bool) + Distribution(namespace Namespace, name string, value float64, tags []string, common bool) + Gauge(namespace Namespace, name string, interval time.Duration, value float64, tags []string, common bool) + ApplyOps(opts ...Option) Stop() + + HeartbeatInterval() time.Duration } var ( @@ -273,6 +278,10 @@ func (c *client) start(configuration []Configuration, namespace Namespace, flush c.heartbeatT = time.AfterFunc(c.heartbeatInterval, c.backgroundHeartbeat) } +func (c *client) HeartbeatInterval() time.Duration { + return c.heartbeatInterval +} + func heartbeatInterval() time.Duration { heartbeat := internal.FloatEnv("DD_TELEMETRY_HEARTBEAT_INTERVAL", defaultHeartbeatInterval) if heartbeat <= 0 || heartbeat > 3600 { @@ -326,9 +335,10 @@ var ( ) type metric struct { - name string - kind MetricKind - value float64 + name string + kind MetricKind + value float64 + interval int // Unix timestamp ts float64 tags []string @@ -351,29 +361,54 @@ func metricKey(name string, tags []string, kind MetricKind) string { return name + string(kind) + strings.Join(tags, "-") } -// Record sets the value for a gauge or distribution metric type with the given +// Count adds the value to a count with the given name and tags. If the metric +// is not language-specific, common should be set to true +func (c *client) Count(namespace Namespace, name string, value float64, tags []string, common bool) { + c.mu.Lock() + defer c.mu.Unlock() + if !c.started { + return + } + c.count(namespace, name, value, tags, common) +} + +// count implements Count, must be called with c.mu locked. +func (c *client) count(namespace Namespace, name string, value float64, tags []string, common bool) { + if _, ok := c.metrics[namespace]; !ok { + c.metrics[namespace] = map[string]*metric{} + } + key := metricKey(name, tags, MetricKindCount) + m, ok := c.metrics[namespace][key] + if !ok { + m = newMetric(name, MetricKindCount, tags, common) + c.metrics[namespace][key] = m + } + m.value += value + m.ts = float64(time.Now().Unix()) + c.newMetrics = true +} + +// Distribution sets the value for a distribution metric type with the given // name and tags. If the metric is not language-specific, common should be set -// to true -func (c *client) Record(namespace Namespace, kind MetricKind, name string, value float64, tags []string, common bool) { +// to true. +func (c *client) Distribution(namespace Namespace, name string, value float64, tags []string, common bool) { c.mu.Lock() defer c.mu.Unlock() if !c.started { return } - c.record(namespace, kind, name, value, tags, common) + c.distribution(namespace, name, value, tags, common) } -// record sets the value for a gauge or distribution metric type with the given -// name and tags. If the metric is not language-soecific, common should be set -// to true. Must be called with c.mu locked. -func (c *client) record(namespace Namespace, kind MetricKind, name string, value float64, tags []string, common bool) { +// distribution implements Distribution, must be called with c.mu locked. +func (c *client) distribution(namespace Namespace, name string, value float64, tags []string, common bool) { if _, ok := c.metrics[namespace]; !ok { c.metrics[namespace] = map[string]*metric{} } - key := metricKey(name, tags, kind) + key := metricKey(name, tags, MetricKindDist) m, ok := c.metrics[namespace][key] if !ok { - m = newMetric(name, kind, tags, common) + m = newMetric(name, MetricKindDist, tags, common) c.metrics[namespace][key] = m } m.value = value @@ -381,24 +416,30 @@ func (c *client) record(namespace Namespace, kind MetricKind, name string, value c.newMetrics = true } -// Count adds the value to a count with the given name and tags. If the metric -// is not language-specific, common should be set to true -func (c *client) Count(namespace Namespace, name string, value float64, tags []string, common bool) { +// Gauge sets the value for a gauge metric type with the given name and tags. If +// the metric is not language-specific, common should be set to true. +func (c *client) Gauge(namespace Namespace, name string, interval time.Duration, value float64, tags []string, common bool) { c.mu.Lock() defer c.mu.Unlock() if !c.started { return } + c.gauge(namespace, name, interval, value, tags, common) +} + +// gauge implements Gauge, must be called with c.mu locked. +func (c *client) gauge(namespace Namespace, name string, interval time.Duration, value float64, tags []string, common bool) { if _, ok := c.metrics[namespace]; !ok { c.metrics[namespace] = map[string]*metric{} } - key := metricKey(name, tags, MetricKindCount) + key := metricKey(name, tags, MetricKindGauge) m, ok := c.metrics[namespace][key] if !ok { - m = newMetric(name, MetricKindCount, tags, common) + m = newMetric(name, MetricKindGauge, tags, common) + m.interval = int(interval.Seconds()) c.metrics[namespace][key] = m } - m.value += value + m.value = value m.ts = float64(time.Now().Unix()) c.newMetrics = true } @@ -430,20 +471,22 @@ func (c *client) flush(sync bool) { Namespace: namespace, } for _, m := range c.metrics[namespace] { - if m.kind == MetricKindDist { + switch m.kind { + case MetricKindDist: dPayload.Series = append(dPayload.Series, DistributionSeries{ Metric: m.name, Tags: m.tags, Common: m.common, Points: []float64{m.value}, }) - } else { + default: gPayload.Series = append(gPayload.Series, Series{ - Metric: m.name, - Type: string(m.kind), - Tags: m.tags, - Common: m.common, - Points: [][2]float64{{m.ts, m.value}}, + Metric: m.name, + Type: string(m.kind), + Interval: m.interval, + Tags: m.tags, + Common: m.common, + Points: [][2]float64{{m.ts, m.value}}, }) } } @@ -643,6 +686,13 @@ func (c *client) backgroundHeartbeat() { // Must be called with c.mu locked. func (c *client) emitHeartbeatMetrics() { for _, m := range c.heartbeatMetrics { - c.record(m.namespace, m.kind, m.name, m.value(), m.tags, m.common) + switch m.kind { + case MetricKindCount: + c.count(m.namespace, m.name, m.value(), m.tags, m.common) + case MetricKindDist: + c.distribution(m.namespace, m.name, m.value(), m.tags, m.common) + case MetricKindGauge: + c.gauge(m.namespace, m.name, c.HeartbeatInterval(), m.value(), m.tags, m.common) + } } } diff --git a/internal/telemetry/client_test.go b/internal/telemetry/client_test.go index a75e2ec3a7..19a6cec268 100644 --- a/internal/telemetry/client_test.go +++ b/internal/telemetry/client_test.go @@ -169,8 +169,8 @@ func TestMetrics(t *testing.T) { client.start(nil, NamespaceTracers, true) // Records should have the most recent value - client.Record(NamespaceTracers, MetricKindGauge, "foobar", 1, nil, false) - client.Record(NamespaceTracers, MetricKindGauge, "foobar", 2, nil, false) + client.Gauge(NamespaceTracers, "foobar", 1, 1, nil, false) + client.Gauge(NamespaceTracers, "foobar", 1, 2, nil, false) // Counts should be aggregated client.Count(NamespaceTracers, "baz", 3, nil, true) client.Count(NamespaceTracers, "baz", 1, nil, true) @@ -244,8 +244,8 @@ func TestDistributionMetrics(t *testing.T) { } client.start(nil, NamespaceTracers, true) // Records should have the most recent value - client.Record(NamespaceTracers, MetricKindDist, "soobar", 1, nil, false) - client.Record(NamespaceTracers, MetricKindDist, "soobar", 3, nil, false) + client.distribution(NamespaceTracers, "soobar", 1, nil, false) + client.distribution(NamespaceTracers, "soobar", 3, nil, false) client.mu.Lock() client.flush(false) client.mu.Unlock() @@ -275,7 +275,7 @@ func TestDisabledClient(t *testing.T) { URL: server.URL, } client.start(nil, NamespaceTracers, true) - client.Record(NamespaceTracers, MetricKindGauge, "foobar", 1, nil, false) + client.Gauge(NamespaceTracers, "foobar", 1, 1, nil, false) client.Count(NamespaceTracers, "bonk", 4, []string{"org:1"}, false) client.Stop() } @@ -290,7 +290,7 @@ func TestNonStartedClient(t *testing.T) { client := &client{ URL: server.URL, } - client.Record(NamespaceTracers, MetricKindGauge, "foobar", 1, nil, false) + client.Gauge(NamespaceTracers, "foobar", 1, 1, nil, false) client.Count(NamespaceTracers, "bonk", 4, []string{"org:1"}, false) client.Stop() } diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 994dee89a8..cdb4f408df 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -121,6 +121,6 @@ func Time(namespace Namespace, name string, tags []string, common bool) (finish start := time.Now() return func() { elapsed := time.Since(start) - GlobalClient.Record(namespace, MetricKindDist, name, float64(elapsed.Milliseconds()), tags, common) + GlobalClient.Distribution(namespace, name, float64(elapsed.Milliseconds()), tags, common) } } diff --git a/internal/telemetry/telemetrytest/telemetrytest.go b/internal/telemetry/telemetrytest/telemetrytest.go index 8861c966d6..331f619497 100644 --- a/internal/telemetry/telemetrytest/telemetrytest.go +++ b/internal/telemetry/telemetrytest/telemetrytest.go @@ -9,6 +9,7 @@ package telemetrytest import ( "slices" "sync" + "time" "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" @@ -44,6 +45,12 @@ func (c *MockClient) ProductChange(namespace telemetry.Namespace, enabled bool, c.productChange(namespace, enabled) } +func (c *MockClient) HeartbeatInterval() time.Duration { + c.On("HeartbeatInterval").Return(time.Second) + _ = c.Called() + return time.Second +} + // ProductStop signals a product has stopped and disables that product in the mock client. // ProductStop is NOOP for the tracer namespace, since the tracer is not considered a product. func (c *MockClient) ProductStop(namespace telemetry.Namespace) { @@ -69,13 +76,30 @@ func (c *MockClient) productChange(namespace telemetry.Namespace, enabled bool) } } +// Distribution stores the value for the given metric. +func (c *MockClient) Distribution(ns telemetry.Namespace, name string, val float64, tags []string, common bool) { + // Ensure consistent ordering through expectations + slices.Sort(tags) + + c.On("Distribution", ns, name, val, tags, common).Return() + _ = c.Called(ns, name, val, tags, common) + // record the val for tests that assert based on the value + if _, ok := c.Metrics[ns]; !ok { + if c.Metrics == nil { + c.Metrics = make(map[telemetry.Namespace]map[string]float64) + } + c.Metrics[ns] = map[string]float64{} + } + c.Metrics[ns][name] = val +} + // Record stores the value for the given metric. It is currently mocked for `Gauge` and `Distribution` metric types. -func (c *MockClient) Record(ns telemetry.Namespace, kind telemetry.MetricKind, name string, val float64, tags []string, common bool) { +func (c *MockClient) Gauge(ns telemetry.Namespace, name string, interval time.Duration, val float64, tags []string, common bool) { // Ensure consistent ordering through expectations slices.Sort(tags) - c.On("Record", ns, kind, name, val, tags, common).Return() - _ = c.Called(ns, kind, name, val, tags, common) + c.On("Gauge", ns, name, interval, val, tags, common).Return() + _ = c.Called(ns, name, interval, val, tags, common) // record the val for tests that assert based on the value if _, ok := c.Metrics[ns]; !ok { if c.Metrics == nil {