Skip to content

Commit 141ff78

Browse files
committed
wip impl writer
1 parent 1701c89 commit 141ff78

File tree

13 files changed

+396
-21
lines changed

13 files changed

+396
-21
lines changed

communication/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ dependencies {
44
implementation libs.slf4j
55

66
api project(':remote-config:remote-config-api')
7+
implementation project(':components:json')
78
implementation project(':remote-config:remote-config-core')
89
implementation project(':internal-api')
910
implementation project(':utils:container-utils')
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package datadog.communication.serialization.json;
2+
3+
import datadog.communication.serialization.EncodingCache;
4+
import datadog.communication.serialization.Mapper;
5+
import datadog.communication.serialization.WritableFormatter;
6+
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
7+
import java.nio.ByteBuffer;
8+
import java.util.Map;
9+
10+
public class JSONWritableFormatter implements WritableFormatter {
11+
12+
@Override
13+
public <T> boolean format(T message, Mapper<T> mapper) {
14+
return false;
15+
}
16+
17+
@Override
18+
public void flush() {
19+
20+
}
21+
22+
@Override
23+
public void writeNull() {
24+
25+
}
26+
27+
@Override
28+
public void writeBoolean(boolean value) {
29+
30+
}
31+
32+
@Override
33+
public void writeObject(Object value, EncodingCache encodingCache) {
34+
35+
}
36+
37+
@Override
38+
public void writeObjectString(Object value, EncodingCache encodingCache) {
39+
40+
}
41+
42+
@Override
43+
public void writeMap(Map<? extends CharSequence, ?> map, EncodingCache encodingCache) {
44+
45+
}
46+
47+
@Override
48+
public void writeString(CharSequence s, EncodingCache encodingCache) {
49+
50+
}
51+
52+
@Override
53+
public void writeUTF8(byte[] string, int offset, int length) {
54+
55+
}
56+
57+
@Override
58+
public void writeUTF8(byte[] string) {
59+
60+
}
61+
62+
@Override
63+
public void writeUTF8(UTF8BytesString string) {
64+
65+
}
66+
67+
@Override
68+
public void writeBinary(byte[] binary) {
69+
70+
}
71+
72+
@Override
73+
public void writeBinary(byte[] binary, int offset, int length) {
74+
75+
}
76+
77+
@Override
78+
public void startMap(int elementCount) {
79+
80+
}
81+
82+
@Override
83+
public void startStruct(int elementCount) {
84+
85+
}
86+
87+
@Override
88+
public void startArray(int elementCount) {
89+
90+
}
91+
92+
@Override
93+
public void writeBinary(ByteBuffer buffer) {
94+
95+
}
96+
97+
@Override
98+
public void writeInt(int value) {
99+
100+
}
101+
102+
@Override
103+
public void writeSignedInt(int value) {
104+
105+
}
106+
107+
@Override
108+
public void writeLong(long value) {
109+
110+
}
111+
112+
@Override
113+
public void writeUnsignedLong(long value) {
114+
115+
}
116+
117+
@Override
118+
public void writeSignedLong(long value) {
119+
120+
}
121+
122+
@Override
123+
public void writeFloat(float value) {
124+
125+
}
126+
127+
@Override
128+
public void writeDouble(double value) {
129+
130+
}
131+
}

dd-java-agent/instrumentation/wildfly-9/build.gradle

+5-4
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ muzzle {
3333
pass {
3434
group = 'org.wildfly'
3535
module = 'wildfly-ee'
36-
versions = '[9.0.0.Final,)'
36+
versions = '[9.0.0.Final,35.0.0.Final]'
3737
excludeDependency 'org.jboss.xnio:*' // not related and causes issues with missing jar in maven repo
3838
}
3939
}
@@ -75,9 +75,10 @@ dependencies {
7575
wildflyLatestPoll group: 'org.wildfly', name: 'wildfly-dist', version: '+'
7676

7777
configurations.wildflyLatestPoll.resolve()
78-
def latestWildflyVersion = configurations.wildflyLatestPoll.resolvedConfiguration.getResolvedArtifacts().find {
79-
it.name == "wildfly-dist"
80-
}.moduleVersion.id.version
78+
// def latestWildflyVersion = configurations.wildflyLatestPoll.resolvedConfiguration.getResolvedArtifacts().find {
79+
// it.name == "wildfly-dist"
80+
// }.moduleVersion.id.version
81+
def latestWildflyVersion = "35.0.0.Final"
8182
wildflyLatestDepTest "wildfly:wildfly:$latestWildflyVersion@zip"
8283
latestDepForkedTest {
8384
configure {

dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java

+6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import java.util.EnumMap;
1313
import java.util.Map;
1414
import java.util.concurrent.TimeUnit;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
1517

1618
public class DDIntakeWriter extends RemoteWriter {
1719

@@ -40,6 +42,8 @@ public static class DDIntakeWriterBuilder {
4042

4143
private SingleSpanSampler singleSpanSampler;
4244

45+
private static final Logger log = LoggerFactory.getLogger(DDIntakeWriterBuilder.class);
46+
4347
public DDIntakeWriterBuilder addTrack(final TrackType trackType, final RemoteApi intakeApi) {
4448
tracks.put(trackType, intakeApi);
4549
return this;
@@ -98,6 +102,7 @@ public DDIntakeWriterBuilder singleSpanSampler(SingleSpanSampler singleSpanSampl
98102
}
99103

100104
public DDIntakeWriter build() {
105+
log.debug("DDINTAKEWRITER TRACKS {}", tracks);
101106
if (tracks.isEmpty()) {
102107
throw new IllegalArgumentException("At least one track needs to be configured");
103108
}
@@ -112,6 +117,7 @@ public DDIntakeWriter build() {
112117
.toArray(PayloadDispatcher[]::new);
113118
dispatcher = new CompositePayloadDispatcher(dispatchers);
114119
}
120+
log.debug("DISPATCHER {}", dispatcher);
115121

116122
final TraceProcessingWorker traceProcessingWorker =
117123
new TraceProcessingWorker(

dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java

+26-4
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ public static Writer createWriter(
3939
final Sampler sampler,
4040
final SingleSpanSampler singleSpanSampler,
4141
final HealthMetrics healthMetrics) {
42-
return createWriter(
42+
Writer w = createWriter(
4343
config, commObjects, sampler, singleSpanSampler, healthMetrics, config.getWriterType());
44+
return w;
4445
}
4546

4647
public static Writer createWriter(
@@ -51,14 +52,20 @@ public static Writer createWriter(
5152
final HealthMetrics healthMetrics,
5253
String configuredType) {
5354

55+
log.debug("START CREATE WRITER");
56+
5457
if (LOGGING_WRITER_TYPE.equals(configuredType)) {
58+
log.debug("STARTED WRITER LOGGING");
5559
return new LoggingWriter();
5660
} else if (PRINTING_WRITER_TYPE.equals(configuredType)) {
61+
log.debug("STARTED WRITER PRINTING");
5762
return new PrintingWriter(System.out, true);
5863
} else if (configuredType.startsWith(TRACE_STRUCTURE_WRITER_TYPE)) {
64+
log.debug("STARTED WRITER TRACE STRCT");
5965
return new TraceStructureWriter(
6066
Strings.replace(configuredType, TRACE_STRUCTURE_WRITER_TYPE, ""));
6167
} else if (configuredType.startsWith(MULTI_WRITER_TYPE)) {
68+
log.debug("STARTED WRITER MULTI");
6269
return new MultiWriter(
6370
config, commObjects, sampler, singleSpanSampler, healthMetrics, configuredType);
6471
}
@@ -116,6 +123,13 @@ public static Writer createWriter(
116123
builder.addTrack(TrackType.CITESTCOV, coverageApi);
117124
}
118125

126+
log.debug("BEFORE ADDING LLM OBSERVER");
127+
if (config.isLlmObsEnabled() && config.isLlmObsAgentlessEnabled()) {
128+
final RemoteApi llmobsApi = createDDIntakeRemoteApi(config, commObjects, featuresDiscovery, TrackType.LLMOBS);
129+
builder.addTrack(TrackType.LLMOBS, llmobsApi);
130+
log.debug("ADDED LLM OBSERVER");
131+
}
132+
119133
remoteWriter = builder.build();
120134

121135
} else { // configuredType == DDAgentWriter
@@ -171,26 +185,34 @@ private static RemoteApi createDDIntakeRemoteApi(
171185
SharedCommunicationObjects commObjects,
172186
DDAgentFeaturesDiscovery featuresDiscovery,
173187
TrackType trackType) {
174-
if (featuresDiscovery.supportsEvpProxy() && !config.isCiVisibilityAgentlessEnabled()) {
188+
// TODO make it so that it is agentless for the requested product and not both
189+
if (featuresDiscovery.supportsEvpProxy() && !config.isCiVisibilityAgentlessEnabled() && !config.isLlmObsAgentlessEnabled()) {
175190
return DDEvpProxyApi.builder()
176191
.httpClient(commObjects.okHttpClient)
177192
.agentUrl(commObjects.agentUrl)
178193
.evpProxyEndpoint(featuresDiscovery.getEvpProxyEndpoint())
179194
.trackType(trackType)
180195
.compressionEnabled(featuresDiscovery.supportsContentEncodingHeadersWithEvpProxy())
181196
.build();
182-
183197
} else {
184198
HttpUrl hostUrl = null;
199+
String llmObsAgentlessUrl = config.getLlMObsAgentlessUrl();
200+
log.debug("LLMOBS URL {}", llmObsAgentlessUrl);
201+
185202
if (config.getCiVisibilityAgentlessUrl() != null) {
186203
hostUrl = HttpUrl.get(config.getCiVisibilityAgentlessUrl());
187204
log.info("Using host URL '{}' to report CI Visibility traces in Agentless mode.", hostUrl);
205+
} else if (config.isLlmObsEnabled() && config.isLlmObsAgentlessEnabled() && llmObsAgentlessUrl != null && !llmObsAgentlessUrl.isEmpty()) {
206+
hostUrl = HttpUrl.get(llmObsAgentlessUrl);
207+
log.info("Using host URL '{}' to report LLM Obs traces in Agentless mode.", hostUrl);
188208
}
189-
return DDIntakeApi.builder()
209+
RemoteApi ddintake = DDIntakeApi.builder()
190210
.hostUrl(hostUrl)
191211
.apiKey(config.getApiKey())
192212
.trackType(trackType)
193213
.build();
214+
log.debug("CREATED DD INTAKE for track {} {}", trackType.name(), ddintake);
215+
return ddintake;
194216
}
195217
}
196218

dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeApi.java

+13-8
Original file line numberDiff line numberDiff line change
@@ -121,16 +121,20 @@ private DDIntakeApi(
121121

122122
@Override
123123
public Response sendSerializedTraces(Payload payload) {
124+
log.debug("DDINTAKE SENDING {} for track {}", payload, trackType);
124125
final int sizeInBytes = payload.sizeInBytes();
125126

126-
final Request request =
127-
new Request.Builder()
128-
.url(intakeUrl)
129-
.addHeader(DD_API_KEY_HEADER, apiKey)
130-
.addHeader(CONTENT_ENCODING_HEADER, GZIP_CONTENT_TYPE)
131-
.post(payload.toRequest())
132-
.tag(OkHttpUtils.CustomListener.class, telemetryListener)
133-
.build();
127+
Request.Builder builder = new Request.Builder()
128+
.url(intakeUrl)
129+
.addHeader(DD_API_KEY_HEADER, apiKey)
130+
.post(payload.toRequest())
131+
.tag(OkHttpUtils.CustomListener.class, telemetryListener);
132+
133+
if (!trackType.equals(TrackType.LLMOBS)) {
134+
builder.addHeader(CONTENT_ENCODING_HEADER, GZIP_CONTENT_TYPE);
135+
}
136+
137+
final Request request = builder.build();
134138
totalTraces += payload.traceCount();
135139
receivedTraces += payload.traceCount();
136140

@@ -143,6 +147,7 @@ public Response sendSerializedTraces(Payload payload) {
143147
InstrumentationBridge.getMetricCollector()
144148
.add(CiVisibilityCountMetric.ENDPOINT_PAYLOAD_DROPPED, 1, trackType.endpoint);
145149
countAndLogFailedSend(payload.traceCount(), sizeInBytes, response, null);
150+
log.error("FAILED TO SEND FOR TRACK {}", trackType);
146151
return Response.failed(response.code());
147152
}
148153

dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeMapperDiscovery.java

+3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import datadog.trace.civisibility.writer.ddintake.CiTestCycleMapperV1;
77
import datadog.trace.common.writer.RemoteMapper;
88
import datadog.trace.common.writer.RemoteMapperDiscovery;
9+
import datadog.trace.llmobs.writer.ddintake.LLMObsSpanMapper;
910

1011
/**
1112
* Mapper discovery logic when a DDIntake is used. The mapper is discovered based on a backend
@@ -40,6 +41,8 @@ public void discover() {
4041
mapper = new CiTestCycleMapperV1(wellKnownTags, compressionEnabled);
4142
} else if (TrackType.CITESTCOV.equals(trackType)) {
4243
mapper = new CiTestCovMapperV2(compressionEnabled);
44+
} else if (TrackType.LLMOBS.equals(trackType)) {
45+
mapper = new LLMObsSpanMapper();
4346
} else {
4447
mapper = RemoteMapper.NO_OP;
4548
}

dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java

+1
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,7 @@ private CoreTracer(
669669
} else {
670670
this.writer = writer;
671671
}
672+
log.debug("STARTED WRITER {}", this.writer);
672673

673674
if (config.isCiVisibilityEnabled()
674675
&& (config.isCiVisibilityAgentlessEnabled()

0 commit comments

Comments
 (0)