Skip to content

Commit 287f89e

Browse files
authored
Add Netty client payload and headers data capture (#231)
* Add netty client HTTP payload and headers data capture Signed-off-by: Pavol Loffay <[email protected]> * Some cleanup Signed-off-by: Pavol Loffay <[email protected]> * Netty 4.1 Signed-off-by: Pavol Loffay <[email protected]> * Fix readme Signed-off-by: Pavol Loffay <[email protected]> * Add vertx Signed-off-by: Pavol Loffay <[email protected]> * rename vertx server class Signed-off-by: Pavol Loffay <[email protected]> * Foo bar Signed-off-by: Pavol Loffay <[email protected]> * micronaut Signed-off-by: Pavol Loffay <[email protected]> * remove junk Signed-off-by: Pavol Loffay <[email protected]> * Remove more junk Signed-off-by: Pavol Loffay <[email protected]>
1 parent d65f5ba commit 287f89e

File tree

34 files changed

+1307
-48
lines changed

34 files changed

+1307
-48
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ List of supported frameworks with additional capabilities:
1818
| [Apache HttpClient](https://hc.apache.org/index.html) | 4.0+ |
1919
| [gRPC](https://github.com/grpc/grpc-java) | 1.5+ |
2020
| [JAX-RS Client](https://javaee.github.io/javaee-spec/javadocs/javax/ws/rs/client/package-summary.html) | 2.0+ |
21-
| [Micronaut](https://micronaut.io/) (via Netty and only server) | 1.0+ |
22-
| [Netty](https://github.com/netty/netty) (only server) | 4.0+ |
21+
| [Micronaut](https://micronaut.io/) (basic support via Netty) | 1.0+ |
22+
| [Netty](https://github.com/netty/netty) | 4.0+ |
2323
| [OkHttp](https://github.com/square/okhttp/) | 3.0+ |
2424
| [Servlet](https://javaee.github.io/javaee-spec/javadocs/javax/servlet/package-summary.html) | 2.3+ |
2525
| [Spark Web Framework](https://github.com/perwendel/spark) | 2.3+ |
26-
| [Spring Webflux](https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/web/reactive/package-summary.html) (only server) | 5.0+ |
27-
| [Vert.x](https://vertx.io) (only server) | 3.0+ |
26+
| [Spring Webflux](https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/web/reactive/package-summary.html) | 5.0+ |
27+
| [Vert.x](https://vertx.io) | 3.0+ |
2828

2929
### Adding custom filter implementation
3030

instrumentation/micronaut-1.0/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ dependencies {
1717
testImplementation("io.micronaut:micronaut-http-server-netty:${micronautVersion}")
1818
testImplementation("io.micronaut:micronaut-runtime:${micronautVersion}")
1919
testImplementation("io.micronaut:micronaut-inject:${micronautVersion}")
20+
testImplementation("io.micronaut:micronaut-http-client:${micronautVersion}")
2021
testAnnotationProcessor("io.micronaut:micronaut-inject-java:${micronautVersion}")
2122
}
2223

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright The Hypertrace Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.opentelemetry.javaagent.instrumentation.hypertrace.micronaut;
18+
19+
import io.micronaut.http.HttpRequest;
20+
import io.micronaut.http.HttpResponse;
21+
import io.micronaut.http.client.HttpClient;
22+
import io.micronaut.http.client.annotation.Client;
23+
import io.micronaut.test.annotation.MicronautTest;
24+
import io.opentelemetry.sdk.trace.data.SpanData;
25+
import java.util.List;
26+
import java.util.concurrent.TimeoutException;
27+
import javax.inject.Inject;
28+
import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes;
29+
import org.hypertrace.agent.testing.AbstractInstrumenterTest;
30+
import org.hypertrace.agent.testing.TestHttpServer;
31+
import org.hypertrace.agent.testing.TestHttpServer.GetJsonHandler;
32+
import org.junit.jupiter.api.AfterAll;
33+
import org.junit.jupiter.api.Assertions;
34+
import org.junit.jupiter.api.BeforeAll;
35+
import org.junit.jupiter.api.Test;
36+
37+
@MicronautTest
38+
public class MicronautClientInstrumentationTest extends AbstractInstrumenterTest {
39+
40+
private static final String REQUEST_BODY = "hello_foo_bar";
41+
private static final String REQUEST_HEADER_NAME = "reqheadername";
42+
private static final String REQUEST_HEADER_VALUE = "reqheadervalue";
43+
44+
private static final TestHttpServer testHttpServer = new TestHttpServer();
45+
46+
@BeforeAll
47+
public static void startServer() throws Exception {
48+
testHttpServer.start();
49+
}
50+
51+
@AfterAll
52+
public static void closeServer() throws Exception {
53+
testHttpServer.close();
54+
}
55+
56+
@Inject
57+
@Client("/")
58+
private HttpClient client;
59+
60+
@Test
61+
public void getJson() throws InterruptedException, TimeoutException {
62+
String retrieve =
63+
client
64+
.toBlocking()
65+
.retrieve(
66+
HttpRequest.GET(
67+
String.format("http://localhost:%d/get_json", testHttpServer.port()))
68+
.header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE));
69+
Assertions.assertEquals(GetJsonHandler.RESPONSE_BODY, retrieve);
70+
71+
TEST_WRITER.waitForTraces(1);
72+
List<List<SpanData>> traces = TEST_WRITER.getTraces();
73+
Assertions.assertEquals(1, traces.size());
74+
Assertions.assertEquals(1, traces.get(0).size());
75+
SpanData clientSpan = traces.get(0).get(0);
76+
Assertions.assertEquals(
77+
REQUEST_HEADER_VALUE,
78+
clientSpan
79+
.getAttributes()
80+
.get(HypertraceSemanticAttributes.httpRequestHeader(REQUEST_HEADER_NAME)));
81+
Assertions.assertEquals(
82+
TestHttpServer.RESPONSE_HEADER_VALUE,
83+
clientSpan
84+
.getAttributes()
85+
.get(
86+
HypertraceSemanticAttributes.httpResponseHeader(
87+
TestHttpServer.RESPONSE_HEADER_NAME)));
88+
Assertions.assertNull(
89+
clientSpan.getAttributes().get(HypertraceSemanticAttributes.HTTP_REQUEST_BODY));
90+
Assertions.assertEquals(
91+
GetJsonHandler.RESPONSE_BODY,
92+
clientSpan.getAttributes().get(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY));
93+
}
94+
95+
@Test
96+
public void post() throws InterruptedException, TimeoutException {
97+
HttpResponse<Object> response =
98+
client
99+
.toBlocking()
100+
.exchange(
101+
HttpRequest.POST(
102+
String.format("http://localhost:%d/post", testHttpServer.port()),
103+
REQUEST_BODY)
104+
.header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE)
105+
.header("Content-Type", "application/json"));
106+
Assertions.assertEquals(204, response.getStatus().getCode());
107+
108+
TEST_WRITER.waitForTraces(1);
109+
List<List<SpanData>> traces = TEST_WRITER.getTraces();
110+
Assertions.assertEquals(1, traces.size());
111+
Assertions.assertEquals(1, traces.get(0).size());
112+
SpanData clientSpan = traces.get(0).get(0);
113+
Assertions.assertEquals(
114+
REQUEST_HEADER_VALUE,
115+
clientSpan
116+
.getAttributes()
117+
.get(HypertraceSemanticAttributes.httpRequestHeader(REQUEST_HEADER_NAME)));
118+
Assertions.assertEquals(
119+
TestHttpServer.RESPONSE_HEADER_VALUE,
120+
clientSpan
121+
.getAttributes()
122+
.get(
123+
HypertraceSemanticAttributes.httpResponseHeader(
124+
TestHttpServer.RESPONSE_HEADER_NAME)));
125+
Assertions.assertEquals(
126+
REQUEST_BODY,
127+
clientSpan.getAttributes().get(HypertraceSemanticAttributes.HTTP_REQUEST_BODY));
128+
Assertions.assertNull(
129+
clientSpan.getAttributes().get(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY));
130+
}
131+
}

instrumentation/netty/netty-4.0/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,6 @@ dependencies {
4747
implementation("io.netty:netty-codec-http:4.0.0.Final")
4848

4949
testImplementation(project(":testing-common"))
50+
testImplementation("org.asynchttpclient:async-http-client:2.0.9")
5051
}
5152

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.server;
17+
package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0;
1818

1919
import io.netty.buffer.ByteBuf;
2020
import io.netty.channel.Channel;

instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/NettyChannelPipelineInstrumentation.java

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,16 @@
2525

2626
import io.netty.channel.ChannelHandler;
2727
import io.netty.channel.ChannelPipeline;
28+
import io.netty.handler.codec.http.HttpClientCodec;
2829
import io.netty.handler.codec.http.HttpRequestDecoder;
30+
import io.netty.handler.codec.http.HttpRequestEncoder;
31+
import io.netty.handler.codec.http.HttpResponseDecoder;
2932
import io.netty.handler.codec.http.HttpResponseEncoder;
3033
import io.netty.handler.codec.http.HttpServerCodec;
3134
import io.opentelemetry.javaagent.instrumentation.api.CallDepthThreadLocalMap;
35+
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client.HttpClientRequestTracingHandler;
36+
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client.HttpClientResponseTracingHandler;
37+
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client.HttpClientTracingHandler;
3238
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.server.HttpServerBlockingRequestHandler;
3339
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.server.HttpServerRequestTracingHandler;
3440
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.server.HttpServerResponseTracingHandler;
@@ -123,22 +129,36 @@ public static void addHandler(
123129
pipeline.addLast(
124130
HttpServerBlockingRequestHandler.class.getName(),
125131
new HttpServerBlockingRequestHandler());
132+
} else
133+
// Client pipeline handlers
134+
if (handler instanceof HttpClientCodec) {
135+
pipeline.replace(
136+
io.opentelemetry.javaagent.instrumentation.netty.v4_0.client.HttpClientTracingHandler
137+
.class
138+
.getName(),
139+
HttpClientTracingHandler.class.getName(),
140+
new HttpClientTracingHandler());
141+
142+
// add OTEL request handler to start spans
143+
pipeline.addAfter(
144+
HttpClientTracingHandler.class.getName(),
145+
io.opentelemetry.javaagent.instrumentation.netty.v4_0.client
146+
.HttpClientRequestTracingHandler.class
147+
.getName(),
148+
new io.opentelemetry.javaagent.instrumentation.netty.v4_0.client
149+
.HttpClientRequestTracingHandler());
150+
} else if (handler instanceof HttpRequestEncoder) {
151+
pipeline.addLast(
152+
HttpClientRequestTracingHandler.class.getName(),
153+
new HttpClientRequestTracingHandler());
154+
} else if (handler instanceof HttpResponseDecoder) {
155+
pipeline.replace(
156+
io.opentelemetry.javaagent.instrumentation.netty.v4_0.client
157+
.HttpClientResponseTracingHandler.class
158+
.getName(),
159+
HttpClientResponseTracingHandler.class.getName(),
160+
new HttpClientResponseTracingHandler());
126161
}
127-
// TODO add client instrumentation
128-
// else
129-
// Client pipeline handlers
130-
// if (handler instanceof HttpClientCodec) {
131-
// pipeline.addLast(
132-
// HttpClientTracingHandler.class.getName(), new HttpClientTracingHandler());
133-
// } else if (handler instanceof HttpRequestEncoder) {
134-
// pipeline.addLast(
135-
// HttpClientRequestTracingHandler.class.getName(),
136-
// new HttpClientRequestTracingHandler());
137-
// } else if (handler instanceof HttpResponseDecoder) {
138-
// pipeline.addLast(
139-
// HttpClientResponseTracingHandler.class.getName(),
140-
// new HttpClientResponseTracingHandler());
141-
// }
142162
} catch (IllegalArgumentException e) {
143163
// Prevented adding duplicate handlers.
144164
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright The Hypertrace Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client;
18+
19+
import io.netty.buffer.ByteBuf;
20+
import io.netty.channel.Channel;
21+
import io.netty.channel.ChannelHandlerContext;
22+
import io.netty.channel.ChannelOutboundHandlerAdapter;
23+
import io.netty.channel.ChannelPromise;
24+
import io.netty.handler.codec.http.HttpContent;
25+
import io.netty.handler.codec.http.HttpMessage;
26+
import io.netty.handler.codec.http.HttpRequest;
27+
import io.netty.util.Attribute;
28+
import io.opentelemetry.api.common.AttributeKey;
29+
import io.opentelemetry.api.trace.Span;
30+
import io.opentelemetry.context.Context;
31+
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.AttributeKeys;
32+
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.DataCaptureUtils;
33+
import java.nio.charset.Charset;
34+
import java.util.HashMap;
35+
import java.util.Map;
36+
import org.hypertrace.agent.config.Config.AgentConfig;
37+
import org.hypertrace.agent.core.config.HypertraceConfig;
38+
import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes;
39+
import org.hypertrace.agent.core.instrumentation.buffer.BoundedBuffersFactory;
40+
import org.hypertrace.agent.core.instrumentation.buffer.BoundedByteArrayOutputStream;
41+
import org.hypertrace.agent.core.instrumentation.utils.ContentLengthUtils;
42+
import org.hypertrace.agent.core.instrumentation.utils.ContentTypeCharsetUtils;
43+
import org.hypertrace.agent.core.instrumentation.utils.ContentTypeUtils;
44+
45+
public class HttpClientRequestTracingHandler extends ChannelOutboundHandlerAdapter {
46+
47+
private final AgentConfig agentConfig = HypertraceConfig.get();
48+
49+
@Override
50+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) {
51+
Channel channel = ctx.channel();
52+
Context context =
53+
channel
54+
.attr(
55+
io.opentelemetry.javaagent.instrumentation.netty.v4_0.AttributeKeys.CLIENT_CONTEXT)
56+
.get();
57+
if (context == null) {
58+
ctx.write(msg, prm);
59+
return;
60+
}
61+
Span span = Span.fromContext(context);
62+
63+
if (msg instanceof HttpRequest) {
64+
HttpRequest httpRequest = (HttpRequest) msg;
65+
66+
Map<String, String> headersMap = headersToMap(httpRequest);
67+
if (agentConfig.getDataCapture().getHttpHeaders().getRequest().getValue()) {
68+
headersMap.forEach((key, value) -> span.setAttribute(key, value));
69+
}
70+
71+
CharSequence contentType = DataCaptureUtils.getContentType(httpRequest);
72+
if (agentConfig.getDataCapture().getHttpBody().getRequest().getValue()
73+
&& contentType != null
74+
&& ContentTypeUtils.shouldCapture(contentType.toString())) {
75+
76+
CharSequence contentLengthHeader = DataCaptureUtils.getContentLength(httpRequest);
77+
int contentLength = ContentLengthUtils.parseLength(contentLengthHeader);
78+
79+
String charsetString = ContentTypeUtils.parseCharset(contentType.toString());
80+
Charset charset = ContentTypeCharsetUtils.toCharset(charsetString);
81+
82+
// set the buffer to capture response body
83+
// the buffer is used byt captureBody method
84+
Attribute<BoundedByteArrayOutputStream> bufferAttr =
85+
ctx.channel().attr(AttributeKeys.REQUEST_BODY_BUFFER);
86+
bufferAttr.set(BoundedBuffersFactory.createStream(contentLength, charset));
87+
}
88+
}
89+
90+
if ((msg instanceof HttpContent || msg instanceof ByteBuf)
91+
&& agentConfig.getDataCapture().getHttpBody().getRequest().getValue()) {
92+
DataCaptureUtils.captureBody(span, channel, AttributeKeys.REQUEST_BODY_BUFFER, msg);
93+
}
94+
95+
ctx.write(msg, prm);
96+
}
97+
98+
private static Map<String, String> headersToMap(HttpMessage httpMessage) {
99+
Map<String, String> map = new HashMap<>();
100+
for (Map.Entry<String, String> entry : httpMessage.headers().entries()) {
101+
AttributeKey<String> key = HypertraceSemanticAttributes.httpRequestHeader(entry.getKey());
102+
map.put(key.getKey(), entry.getValue());
103+
}
104+
return map;
105+
}
106+
}

0 commit comments

Comments
 (0)