@@ -7,6 +7,7 @@ import datadog.trace.test.agent.decoder.Decoder
7
7
import datadog.trace.test.agent.decoder.DecodedMessage
8
8
import datadog.trace.test.agent.decoder.DecodedTrace
9
9
import datadog.trace.util.Strings
10
+ import groovy.json.JsonSlurper
10
11
11
12
import java.nio.charset.StandardCharsets
12
13
import java.util.concurrent.CopyOnWriteArrayList
@@ -28,6 +29,12 @@ abstract class AbstractSmokeTest extends ProcessManager {
28
29
@Shared
29
30
protected CopyOnWriteArrayList<DecodedTrace > decodeTraces = new CopyOnWriteArrayList ()
30
31
32
+ @Shared
33
+ protected CopyOnWriteArrayList<Map<String , Object > > telemetryMessages = new CopyOnWriteArrayList ()
34
+
35
+ @Shared
36
+ protected CopyOnWriteArrayList<Map<String , Object > > telemetryFlatMessages = new CopyOnWriteArrayList ()
37
+
31
38
@Shared
32
39
private Closure decode = decodedTracesCallback()
33
40
@@ -37,6 +44,9 @@ abstract class AbstractSmokeTest extends ProcessManager {
37
44
@Shared
38
45
private Throwable traceDecodingFailure = null
39
46
47
+ @Shared
48
+ private Throwable telemetryDecodingFailure = null
49
+
40
50
@Shared
41
51
@AutoCleanup
42
52
protected TestHttpServer server = httpServer {
@@ -111,6 +121,24 @@ abstract class AbstractSmokeTest extends ProcessManager {
111
121
response. status(200 ). send(remoteConfigResponse)
112
122
}
113
123
prefix(" /telemetry/proxy/api/v2/apmtelemetry" ) {
124
+ def body = request. getBody()
125
+ if (body != null ) {
126
+ Map<String , Object > msg = null
127
+ try {
128
+ msg = new JsonSlurper (). parseText(new String (body, StandardCharsets . UTF_8 )) as Map<String , Object >
129
+ } catch (Throwable t) {
130
+ println (" === Failure during telemetry decoding ===" )
131
+ t. printStackTrace(System . out)
132
+ telemetryDecodingFailure = t
133
+ throw t
134
+ }
135
+ telemetryMessages. add(msg)
136
+ if (msg. get(" request_type" ) == " message-batch" ) {
137
+ msg. get(" payload" )?. each { telemetryFlatMessages. add(it as Map<String , Object > ) }
138
+ } else {
139
+ telemetryFlatMessages. add(msg)
140
+ }
141
+ }
114
142
response. status(202 ). send()
115
143
}
116
144
}
@@ -147,7 +175,8 @@ abstract class AbstractSmokeTest extends ProcessManager {
147
175
" -Ddd.profiling.ddprof.alloc.enabled=${ isDdprofSafe()} " ,
148
176
" -Ddatadog.slf4j.simpleLogger.defaultLogLevel=${ logLevel()} " ,
149
177
" -Dorg.slf4j.simpleLogger.defaultLogLevel=${ logLevel()} " ,
150
- " -Ddd.site="
178
+ " -Ddd.site=" ,
179
+ " -Ddd.telemetry.heartbeat.interval=2" ,
151
180
]
152
181
if (inferServiceName()) {
153
182
ret + = " -Ddd.service.name=${ SERVICE_NAME} "
@@ -268,6 +297,31 @@ abstract class AbstractSmokeTest extends ProcessManager {
268
297
decodeTraces
269
298
}
270
299
300
+ void waitForTelemetryCount (final int count ) {
301
+ def conditions = new PollingConditions (timeout : 30 , initialDelay : 0 , delay : 1 , factor : 1 )
302
+ waitForTelemetryCount(conditions, count)
303
+ }
304
+
305
+ void waitForTelemetryCount (final PollingConditions poll , final int count ) {
306
+ poll. eventually {
307
+ telemetryMessages. size() >= count
308
+ }
309
+ }
310
+
311
+ void waitForTelemetryFlat (final Function<Map<String , Object > , Boolean > predicate ) {
312
+ def conditions = new PollingConditions (timeout : 30 , initialDelay : 0 , delay : 1 , factor : 1 )
313
+ waitForTelemetryFlat(conditions, predicate)
314
+ }
315
+
316
+ void waitForTelemetryFlat (final PollingConditions poll , final Function<Map<String , Object > , Boolean > predicate ) {
317
+ poll. eventually {
318
+ if (telemetryDecodingFailure != null ) {
319
+ throw telemetryDecodingFailure
320
+ }
321
+ assert telemetryFlatMessages. find { predicate. apply(it) } != null
322
+ }
323
+ }
324
+
271
325
def logLevel () {
272
326
return " info"
273
327
}
0 commit comments