Skip to content

Commit b9ad8e5

Browse files
authored
Introduce more local counters for tracer health metrics (#6424)
* Introduce more local counters for tracer health metrics This will help us report these counts to tracer-flare in the near future. * Remove unused metric
1 parent 9030071 commit b9ad8e5

File tree

9 files changed

+128
-82
lines changed

9 files changed

+128
-82
lines changed

dd-java-agent/instrumentation/opentelemetry/opentelemetry-0.3/src/test/groovy/OpenTelemetryTest.groovy

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,6 @@ class OpenTelemetryTest extends AgentTestRunner {
215215

216216
then:
217217
tracer.currentSpan.delegate == secondScope.delegate.span()
218-
1 * STATS_D_CLIENT.incrementCounter("scope.close.error")
219-
1 * STATS_D_CLIENT.incrementCounter("scope.user.close.error")
220218
_ * TEST_CHECKPOINTER._
221219
0 * _
222220

dd-java-agent/instrumentation/opentracing/api-0.31/src/test/groovy/OpenTracing31Test.groovy

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,6 @@ class OpenTracing31Test extends AgentTestRunner {
255255

256256
then:
257257
tracer.scopeManager().active().delegate == secondScope.delegate
258-
1 * STATS_D_CLIENT.incrementCounter("scope.close.error")
259-
1 * STATS_D_CLIENT.incrementCounter("scope.user.close.error")
260258
_ * TEST_CHECKPOINTER._
261259
0 * _
262260

dd-java-agent/instrumentation/opentracing/api-0.32/src/test/groovy/OpenTracing32Test.groovy

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,8 +270,6 @@ class OpenTracing32Test extends AgentTestRunner {
270270

271271
then:
272272
tracer.scopeManager().active().delegate == secondScope.delegate
273-
1 * STATS_D_CLIENT.incrementCounter("scope.close.error")
274-
1 * STATS_D_CLIENT.incrementCounter("scope.user.close.error")
275273
_ * TEST_CHECKPOINTER._
276274
0 * _
277275

dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable
2828
httpStatus -> new String[] {"status:" + httpStatus};
2929

3030
private static final String[] NO_TAGS = new String[0];
31+
private static final String[] STATUS_OK_TAGS = STATUS_TAGS.apply(200);
3132
private final RadixTreeCache<String[]> statusTagsCache =
3233
new RadixTreeCache<>(16, 32, STATUS_TAGS, 200, 400);
3334

@@ -62,6 +63,8 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable
6263

6364
private final FixedSizeStripedLongCounter enqueuedSpans =
6465
CountersFactory.createFixedSizeStripedCounter(8);
66+
private final FixedSizeStripedLongCounter enqueuedBytes =
67+
CountersFactory.createFixedSizeStripedCounter(8);
6568
private final FixedSizeStripedLongCounter singleSpanSampled =
6669
CountersFactory.createFixedSizeStripedCounter(8);
6770
private final FixedSizeStripedLongCounter singleSpanUnsampled =
@@ -84,8 +87,6 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable
8487
CountersFactory.createFixedSizeStripedCounter(8);
8588
private final FixedSizeStripedLongCounter unsetPriorityFailedPublishSpanCount =
8689
CountersFactory.createFixedSizeStripedCounter(8);
87-
private final FixedSizeStripedLongCounter sampledSpans =
88-
CountersFactory.createFixedSizeStripedCounter(8);
8990
private final FixedSizeStripedLongCounter manualTraces =
9091
CountersFactory.createFixedSizeStripedCounter(8);
9192
private final FixedSizeStripedLongCounter capturedContinuations =
@@ -102,6 +103,8 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable
102103
CountersFactory.createFixedSizeStripedCounter(8);
103104
private final FixedSizeStripedLongCounter partialTraces =
104105
CountersFactory.createFixedSizeStripedCounter(8);
106+
private final FixedSizeStripedLongCounter partialBytes =
107+
CountersFactory.createFixedSizeStripedCounter(8);
105108
private final FixedSizeStripedLongCounter clientSpansWithoutContext =
106109
CountersFactory.createFixedSizeStripedCounter(8);
107110
private final FixedSizeStripedLongCounter longRunningTracesWrite =
@@ -111,6 +114,21 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable
111114
private final FixedSizeStripedLongCounter longRunningTracesExpired =
112115
CountersFactory.createFixedSizeStripedCounter(8);
113116

117+
private final FixedSizeStripedLongCounter apiRequests =
118+
CountersFactory.createFixedSizeStripedCounter(8);
119+
private final FixedSizeStripedLongCounter apiResponsesOK =
120+
CountersFactory.createFixedSizeStripedCounter(8);
121+
private final FixedSizeStripedLongCounter apiErrors =
122+
CountersFactory.createFixedSizeStripedCounter(8);
123+
private final FixedSizeStripedLongCounter flushedTraces =
124+
CountersFactory.createFixedSizeStripedCounter(8);
125+
private final FixedSizeStripedLongCounter flushedBytes =
126+
CountersFactory.createFixedSizeStripedCounter(8);
127+
private final FixedSizeStripedLongCounter scopeCloseErrors =
128+
CountersFactory.createFixedSizeStripedCounter(8);
129+
private final FixedSizeStripedLongCounter userScopeCloseErrors =
130+
CountersFactory.createFixedSizeStripedCounter(8);
131+
114132
private final StatsDClient statsd;
115133
private final long interval;
116134
private final TimeUnit units;
@@ -216,7 +234,7 @@ public void onFlush(final boolean early) {}
216234

217235
@Override
218236
public void onPartialFlush(final int sizeInBytes) {
219-
statsd.count("span.flushed.partial", sizeInBytes, NO_TAGS);
237+
partialBytes.inc(sizeInBytes);
220238
}
221239

222240
@Override
@@ -233,7 +251,7 @@ public void onSingleSpanUnsampled() {
233251
public void onSerialize(final int serializedSizeInBytes) {
234252
// DQH - Because of Java tracer's 2 phase acceptance and serialization scheme, this doesn't
235253
// map precisely
236-
statsd.count("queue.enqueued.bytes", serializedSizeInBytes, NO_TAGS);
254+
enqueuedBytes.inc(serializedSizeInBytes);
237255
}
238256

239257
@Override
@@ -266,9 +284,9 @@ public void onCreateManualTrace() {
266284

267285
@Override
268286
public void onScopeCloseError(int scopeSource) {
269-
statsd.incrementCounter("scope.close.error", NO_TAGS);
287+
scopeCloseErrors.inc();
270288
if (scopeSource == ScopeSource.MANUAL.id()) {
271-
statsd.incrementCounter("scope.user.close.error", NO_TAGS);
289+
userScopeCloseErrors.inc();
272290
}
273291
}
274292

@@ -323,19 +341,24 @@ public void onLongRunningUpdate(final int dropped, final int write, final int ex
323341

324342
private void onSendAttempt(
325343
final int traceCount, final int sizeInBytes, final RemoteApi.Response response) {
326-
statsd.incrementCounter("api.requests.total", NO_TAGS);
327-
statsd.count("flush.traces.total", traceCount, NO_TAGS);
344+
apiRequests.inc();
345+
flushedTraces.inc(traceCount);
328346
// TODO: missing queue.spans (# of spans being sent)
329-
statsd.count("flush.bytes.total", sizeInBytes, NO_TAGS);
347+
flushedBytes.inc(sizeInBytes);
330348

331349
if (response.exception() != null) {
332350
// covers communication errors -- both not receiving a response or
333351
// receiving malformed response (even when otherwise successful)
334-
statsd.incrementCounter("api.errors.total", NO_TAGS);
352+
apiErrors.inc();
335353
}
336354

337-
if (response.status() != null) {
338-
statsd.incrementCounter("api.responses.total", statusTagsCache.get(response.status()));
355+
Integer status = response.status();
356+
if (status != null) {
357+
if (200 == status) {
358+
apiResponsesOK.inc();
359+
} else {
360+
statsd.incrementCounter("api.responses.total", statusTagsCache.get(status));
361+
}
339362
}
340363
}
341364

@@ -415,6 +438,7 @@ public void run(TracerHealthMetrics target) {
415438
target.userDropFailedPublishSpanCount,
416439
USER_DROP_TAG);
417440
reportIfChanged(target.statsd, "queue.enqueued.spans", target.enqueuedSpans, NO_TAGS);
441+
reportIfChanged(target.statsd, "queue.enqueued.bytes", target.enqueuedBytes, NO_TAGS);
418442
reportIfChanged(target.statsd, "trace.pending.created", target.createdTraces, NO_TAGS);
419443
reportIfChanged(target.statsd, "span.pending.created", target.createdSpans, NO_TAGS);
420444
reportIfChanged(target.statsd, "span.pending.finished", target.finishedSpans, NO_TAGS);
@@ -424,6 +448,7 @@ public void run(TracerHealthMetrics target) {
424448
reportIfChanged(
425449
target.statsd, "span.continuations.finished", target.finishedContinuations, NO_TAGS);
426450
reportIfChanged(target.statsd, "queue.partial.traces", target.partialTraces, NO_TAGS);
451+
reportIfChanged(target.statsd, "span.flushed.partial", target.partialBytes, NO_TAGS);
427452
reportIfChanged(
428453
target.statsd, "span.client.no-context", target.clientSpansWithoutContext, NO_TAGS);
429454
reportIfChanged(
@@ -442,6 +467,16 @@ public void run(TracerHealthMetrics target) {
442467
target.statsd, "long-running.dropped", target.longRunningTracesDropped, NO_TAGS);
443468
reportIfChanged(
444469
target.statsd, "long-running.expired", target.longRunningTracesExpired, NO_TAGS);
470+
471+
reportIfChanged(target.statsd, "api.requests.total", target.apiRequests, NO_TAGS);
472+
reportIfChanged(target.statsd, "api.errors.total", target.apiErrors, NO_TAGS);
473+
// non-OK responses are reported immediately in onSendAttempt with different status tags
474+
reportIfChanged(target.statsd, "api.responses.total", target.apiResponsesOK, STATUS_OK_TAGS);
475+
reportIfChanged(target.statsd, "flush.traces.total", target.flushedTraces, NO_TAGS);
476+
reportIfChanged(target.statsd, "flush.bytes.total", target.flushedBytes, NO_TAGS);
477+
reportIfChanged(target.statsd, "scope.close.error", target.scopeCloseErrors, NO_TAGS);
478+
reportIfChanged(
479+
target.statsd, "scope.user.close.error", target.userScopeCloseErrors, NO_TAGS);
445480
}
446481

447482
private void reportIfChanged(

dd-trace-core/src/test/groovy/datadog/trace/common/writer/DDAgentWriterCombinedTest.groovy

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import spock.lang.Timeout
2929
import spock.util.concurrent.PollingConditions
3030

3131
import java.nio.ByteBuffer
32+
import java.util.concurrent.CountDownLatch
3233
import java.util.concurrent.Phaser
3334
import java.util.concurrent.Semaphore
3435
import java.util.concurrent.TimeUnit
@@ -692,45 +693,39 @@ class DDAgentWriterCombinedTest extends DDCoreSpecification {
692693
}
693694

694695
def "statsd comm failure"() {
695-
def numRequests = new AtomicInteger(0)
696-
def numResponses = new AtomicInteger(0)
697-
def numErrors = new AtomicInteger(0)
698-
699696
setup:
700697
def minimalTrace = createMinimalTrace()
701698

702699
def api = apiWithVersion(agentVersion)
703700
api.sendSerializedTraces(_) >> RemoteApi.Response.failed(new IOException("comm error"))
704701

705-
def statsd = Stub(StatsDClient)
706-
statsd.incrementCounter("api.requests.total") >> { stat ->
707-
numRequests.incrementAndGet()
708-
}
709-
statsd.incrementCounter("api.responses.total", _) >> { stat, tags ->
710-
numResponses.incrementAndGet()
711-
}
712-
statsd.incrementCounter("api.errors.total", _) >> { stat ->
713-
numErrors.incrementAndGet()
714-
}
715-
716-
def healthMetrics = new TracerHealthMetrics(statsd)
702+
def latch = new CountDownLatch(2)
703+
def statsd = Mock(StatsDClient)
704+
def healthMetrics = new TracerHealthMetrics(statsd, 100, TimeUnit.MILLISECONDS)
717705
def writer = DDAgentWriter.builder()
718706
.traceAgentV05Enabled(true)
719707
.agentApi(api).monitoring(monitoring)
720708
.healthMetrics(healthMetrics).build()
709+
healthMetrics.start()
721710
writer.start()
722711

723712
when:
724713
writer.write(minimalTrace)
725714
writer.flush()
715+
latch.await(10, TimeUnit.SECONDS)
726716

727717
then:
728-
numRequests.get() == 1
729-
numResponses.get() == 0
730-
numErrors.get() == 1
718+
1 * statsd.count("api.requests.total", 1, _) >> {
719+
latch.countDown()
720+
}
721+
0 * statsd.incrementCounter("api.responses.total", _)
722+
1 * statsd.count("api.errors.total", 1, _) >> {
723+
latch.countDown()
724+
}
731725

732726
cleanup:
733727
writer.close()
728+
healthMetrics.close()
734729

735730
where:
736731
agentVersion << ["v0.3/traces", "v0.4/traces", "v0.5/traces"]

dd-trace-core/src/test/groovy/datadog/trace/common/writer/DDIntakeWriterCombinedTest.groovy

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import spock.lang.Timeout
2929
import spock.util.concurrent.PollingConditions
3030

3131
import java.nio.ByteBuffer
32+
import java.util.concurrent.CountDownLatch
3233
import java.util.concurrent.Phaser
3334
import java.util.concurrent.Semaphore
3435
import java.util.concurrent.TimeUnit
@@ -669,47 +670,41 @@ class DDIntakeWriterCombinedTest extends DDCoreSpecification {
669670
}
670671

671672
def "statsd comm failure"() {
672-
def numRequests = new AtomicInteger(0)
673-
def numResponses = new AtomicInteger(0)
674-
def numErrors = new AtomicInteger(0)
675-
676673
setup:
677674
def minimalTrace = createMinimalTrace()
678675

679676
def api = Mock(DDIntakeApi)
680677
api.sendSerializedTraces(_) >> RemoteApi.Response.failed(new IOException("comm error"))
681678

682-
def statsd = Stub(StatsDClient)
683-
statsd.incrementCounter("api.requests.total") >> { stat ->
684-
numRequests.incrementAndGet()
685-
}
686-
statsd.incrementCounter("api.responses.total", _) >> { stat, tags ->
687-
numResponses.incrementAndGet()
688-
}
689-
statsd.incrementCounter("api.errors.total", _) >> { stat ->
690-
numErrors.incrementAndGet()
691-
}
692-
693-
def healthMetrics = new TracerHealthMetrics(statsd)
679+
def latch = new CountDownLatch(2)
680+
def statsd = Mock(StatsDClient)
681+
def healthMetrics = new TracerHealthMetrics(statsd, 100, TimeUnit.MILLISECONDS)
694682
def writer = DDIntakeWriter.builder()
695683
.addTrack(trackType, api)
696684
.monitoring(monitoring)
697685
.healthMetrics(healthMetrics)
698686
.alwaysFlush(false)
699687
.build()
688+
healthMetrics.start()
700689
writer.start()
701690

702691
when:
703692
writer.write(minimalTrace)
704693
writer.flush()
694+
latch.await(10, TimeUnit.SECONDS)
705695

706696
then:
707-
numRequests.get() == 1
708-
numResponses.get() == 0
709-
numErrors.get() == 1
697+
1 * statsd.count("api.requests.total", 1, _) >> {
698+
latch.countDown()
699+
}
700+
0 * statsd.incrementCounter("api.responses.total", _)
701+
1 * statsd.count("api.errors.total", 1, _) >> {
702+
latch.countDown()
703+
}
710704

711705
cleanup:
712706
writer.close()
707+
healthMetrics.close()
713708

714709
where:
715710
trackType | apiVersion

0 commit comments

Comments
 (0)