Skip to content

WebSocket support for Netty #8632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 36 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e619f2a
Implemented WebSocket support for Netty 4.1
ValentinZakharov Mar 28, 2025
1ab91db
Let propagate unhandled events and fix tests
amarziali Mar 28, 2025
7f96ea2
Refactoring
ValentinZakharov Mar 30, 2025
b2d45a7
Refactor netty test and fix instrumentation
amarziali Apr 3, 2025
0d7330c
Improved pipeline processing - now you can insert handler in any place
ValentinZakharov Apr 3, 2025
7f1138f
Merge branch 'master' into vzakharov/websockets_netty
ValentinZakharov Apr 4, 2025
ee2c5fd
Merge branch 'master' into vzakharov/websockets_netty
ValentinZakharov Apr 7, 2025
203e468
Fixed helper
ValentinZakharov Apr 8, 2025
d753e90
Refactoring
ValentinZakharov Apr 8, 2025
af33164
WebSocket Server support for netty-4.0
ValentinZakharov Apr 9, 2025
e79c661
Missing handlers use cases for netty-4.1
ValentinZakharov Apr 10, 2025
19417b9
Fixed handlers for netty-4.0
ValentinZakharov Apr 10, 2025
6e94f0f
Tests for netty-4.0
ValentinZakharov Apr 10, 2025
c9a7c20
Refactoring
ValentinZakharov Apr 10, 2025
86715ec
WebSocket Server support for netty-3.8
ValentinZakharov Apr 11, 2025
dabfe2d
Tests for netty-3.8
ValentinZakharov Apr 15, 2025
e7f4d7f
Spotless
ValentinZakharov Apr 15, 2025
5fd4cf2
Fixed tests
ValentinZakharov Apr 16, 2025
091ee80
Add profiler env check command to AgentCLI (#8671)
jbachorik Apr 7, 2025
eb18875
Remove dependency on bash from crash/oome uploder scripts (#8652)
jbachorik Apr 7, 2025
66f7b9f
Do not apply JUnit 4 instrumentation to MUnit runners (#8675)
nikita-tkachenko-datadog Apr 7, 2025
d851aa9
Shutdown CI Visibility test event handlers before tracer (#8677)
nikita-tkachenko-datadog Apr 7, 2025
c7ec7ce
Prevent double reporting of Scalatest events when using SBT with test…
nikita-tkachenko-datadog Apr 8, 2025
ba3f346
Fix In-Product when config is empty (#8679)
jpbempel Apr 9, 2025
2203ec4
Expand MUnit runners filter to catch munit.MUnitRunner in JUnit 4 ins…
daniel-mohedano Apr 9, 2025
fef00c7
Remove unused TestEventsHandler methods (#8674)
nikita-tkachenko-datadog Apr 9, 2025
15889fc
Delete print line (#8686)
sarahchen6 Apr 9, 2025
b838ed1
Exclude ProxyLeakTask exception from exception profiling (#8666)
jbachorik Apr 10, 2025
690dd95
Use jvmstat for JDKs 9+ programmatically (#8641)
MattAlp Apr 10, 2025
6510fa4
Update test.retry_reason to use full name of the feature (#8689)
daniel-mohedano Apr 10, 2025
b8b8dab
Allow dogstatsd port to be configurable with DD_DOGSTATSD_PORT (#8693)
randomanderson Apr 14, 2025
5922b70
wait the client handshake
amarziali Apr 17, 2025
78e9c9c
move netty ws client to interested modules
amarziali Apr 17, 2025
bc4b6f0
Merge remote-tracking branch 'origin/master' into vzakharov/websocket…
amarziali Apr 17, 2025
97e7d2f
Added WebSocket tracing check
ValentinZakharov Apr 28, 2025
c817fc9
Merge branch 'master' into vzakharov/websockets_netty
ValentinZakharov Apr 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.netty38;

import datadog.trace.api.InstrumenterConfig;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.instrumentation.netty38.client.HttpClientRequestTracingHandler;
Expand All @@ -9,6 +10,9 @@
import datadog.trace.instrumentation.netty38.server.HttpServerResponseTracingHandler;
import datadog.trace.instrumentation.netty38.server.HttpServerTracingHandler;
import datadog.trace.instrumentation.netty38.server.MaybeBlockResponseHandler;
import datadog.trace.instrumentation.netty38.server.websocket.WebSocketServerRequestTracingHandler;
import datadog.trace.instrumentation.netty38.server.websocket.WebSocketServerResponseTracingHandler;
import datadog.trace.instrumentation.netty38.server.websocket.WebSocketServerTracingHandler;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
Expand All @@ -18,6 +22,9 @@
import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.codec.http.HttpServerCodec;
import org.jboss.netty.handler.codec.http.websocketx.WebSocket13FrameDecoder;
import org.jboss.netty.handler.codec.http.websocketx.WebSocket13FrameEncoder;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

/**
* When certain handlers are added to the pipeline, we want to add our corresponding tracing
Expand Down Expand Up @@ -46,6 +53,33 @@ public static void wrapHandler(
new HttpServerResponseTracingHandler(contextStore));
pipeline.addLast(
MaybeBlockResponseHandler.class.getName(), new MaybeBlockResponseHandler(contextStore));
} else if (handler instanceof WebSocketServerProtocolHandler) {
if (InstrumenterConfig.get().isWebsocketTracingEnabled()) {
if (pipeline.get(HttpServerTracingHandler.class) != null) {
addHandlerAfter(
pipeline,
"datadog.trace.instrumentation.netty38.server.HttpServerTracingHandler",
new WebSocketServerTracingHandler(contextStore));
}
}
} else if (handler instanceof WebSocket13FrameEncoder) {
if (InstrumenterConfig.get().isWebsocketTracingEnabled()) {
if (pipeline.get(HttpServerRequestTracingHandler.class) != null) {
addHandlerAfter(
pipeline,
"datadog.trace.instrumentation.netty38.server.HttpServerRequestTracingHandler",
new WebSocketServerRequestTracingHandler(contextStore));
}
}
} else if (handler instanceof WebSocket13FrameDecoder) {
if (InstrumenterConfig.get().isWebsocketTracingEnabled()) {
if (pipeline.get(HttpServerResponseTracingHandler.class) != null) {
addHandlerAfter(
pipeline,
"datadog.trace.instrumentation.netty38.server.HttpServerResponseTracingHandler",
new WebSocketServerResponseTracingHandler(contextStore));
}
}
} else
// Client pipeline handlers
if (handler instanceof HttpClientCodec) {
Expand All @@ -64,4 +98,13 @@ public static void wrapHandler(
CallDepthThreadLocalMap.reset(ChannelPipeline.class);
}
}

private static void addHandlerAfter(
final ChannelPipeline pipeline, final String name, final ChannelHandler handler) {
ChannelHandler existing = pipeline.get(handler.getClass());
if (existing != null) {
pipeline.remove(existing);
}
pipeline.addAfter(name, handler.getClass().getName(), handler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.websocket.HandlerContext;
import org.jboss.netty.handler.codec.http.HttpHeaders;

public class ChannelTraceContext {
Expand All @@ -23,6 +24,9 @@ public ChannelTraceContext create() {
boolean analyzedResponse;
boolean blockedResponse;

HandlerContext.Sender senderHandlerContext;
HandlerContext.Receiver receiverHandlerContext;

public void reset() {
this.connectionContinuation = null;
this.serverSpan = null;
Expand Down Expand Up @@ -88,4 +92,20 @@ public void setClientSpan(AgentSpan clientSpan) {
public void setClientParentSpan(AgentSpan clientParentSpan) {
this.clientParentSpan = clientParentSpan;
}

public HandlerContext.Sender getSenderHandlerContext() {
return senderHandlerContext;
}

public void setSenderHandlerContext(HandlerContext.Sender senderHandlerContext) {
this.senderHandlerContext = senderHandlerContext;
}

public HandlerContext.Receiver getReceiverHandlerContext() {
return receiverHandlerContext;
}

public void setReceiverHandlerContext(HandlerContext.Receiver receiverHandlerContext) {
this.receiverHandlerContext = receiverHandlerContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ public String[] helperClassNames() {
packageName + ".server.HttpServerResponseTracingHandler",
packageName + ".server.HttpServerTracingHandler",
packageName + ".server.MaybeBlockResponseHandler",
packageName + ".server.websocket.WebSocketServerTracingHandler",
packageName + ".server.websocket.WebSocketServerRequestTracingHandler",
packageName + ".server.websocket.WebSocketServerResponseTracingHandler",
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.websocket.HandlerContext;
import datadog.trace.instrumentation.netty38.ChannelTraceContext;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
Expand All @@ -17,6 +18,7 @@
public class HttpServerResponseTracingHandler extends SimpleChannelDownstreamHandler {

private final ContextStore<Channel, ChannelTraceContext> contextStore;
private static final String UPGRADE_HEADER = "upgrade";

public HttpServerResponseTracingHandler(
final ContextStore<Channel, ChannelTraceContext> contextStore) {
Expand Down Expand Up @@ -45,7 +47,16 @@ public void writeRequested(final ChannelHandlerContext ctx, final MessageEvent m
span.finish(); // Finish the span manually since finishSpanOnClose was false
throw throwable;
}
if (response.getStatus() != HttpResponseStatus.CONTINUE) {
final boolean isWebsocketUpgrade =
response.getStatus() == HttpResponseStatus.SWITCHING_PROTOCOLS
&& "websocket".equals(response.headers().get(UPGRADE_HEADER));
if (isWebsocketUpgrade) {
String channelId = ctx.getChannel().getId().toString();
channelTraceContext.setSenderHandlerContext(new HandlerContext.Sender(span, channelId));
}
if (response.getStatus() != HttpResponseStatus.CONTINUE
&& (response.getStatus() != HttpResponseStatus.SWITCHING_PROTOCOLS
|| isWebsocketUpgrade)) {
DECORATE.onResponse(span, response);
DECORATE.beforeFinish(span);
span.finish(); // Finish the span manually since finishSpanOnClose was false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package datadog.trace.instrumentation.netty38.server.websocket;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.decorator.WebsocketDecorator.DECORATE;
import static datadog.trace.bootstrap.instrumentation.websocket.HandlersExtractor.MESSAGE_TYPE_TEXT;

import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.websocket.HandlerContext;
import datadog.trace.instrumentation.netty38.ChannelTraceContext;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame;

public class WebSocketServerRequestTracingHandler extends SimpleChannelUpstreamHandler {

private final ContextStore<Channel, ChannelTraceContext> contextStore;

public WebSocketServerRequestTracingHandler(
final ContextStore<Channel, ChannelTraceContext> contextStore) {
this.contextStore = contextStore;
}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
Object frame = event.getMessage();
if (frame instanceof WebSocketFrame) {
Channel channel = ctx.getChannel();

ChannelTraceContext traceContext = this.contextStore.get(channel);
if (traceContext != null) {

HandlerContext.Receiver receiverContext = traceContext.getReceiverHandlerContext();
if (receiverContext == null) {
HandlerContext.Sender sessionState = traceContext.getSenderHandlerContext();
if (sessionState != null) {
receiverContext =
new HandlerContext.Receiver(
sessionState.getHandshakeSpan(), channel.getId().toString());
traceContext.setReceiverHandlerContext(receiverContext);
}
}
if (receiverContext != null) {
if (frame instanceof TextWebSocketFrame) {
// WebSocket Read Text Start
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;

final AgentSpan span =
DECORATE.onReceiveFrameStart(
receiverContext, textFrame.getText(), textFrame.isFinalFragment());
try (final AgentScope scope = activateSpan(span)) {
ctx.sendUpstream(event);
// WebSocket Read Text Start
} finally {
if (textFrame.isFinalFragment()) {
traceContext.setReceiverHandlerContext(null);
DECORATE.onFrameEnd(receiverContext);
}
}
return;
}

if (frame instanceof BinaryWebSocketFrame) {
// WebSocket Read Binary Start
BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
final AgentSpan span =
DECORATE.onReceiveFrameStart(
receiverContext,
binaryFrame.getBinaryData().array(),
binaryFrame.isFinalFragment());
try (final AgentScope scope = activateSpan(span)) {
ctx.sendUpstream(event);
} finally {
// WebSocket Read Binary End
if (binaryFrame.isFinalFragment()) {
traceContext.setReceiverHandlerContext(null);
DECORATE.onFrameEnd(receiverContext);
}
}

return;
}

if (frame instanceof ContinuationWebSocketFrame) {
ContinuationWebSocketFrame continuationWebSocketFrame =
(ContinuationWebSocketFrame) frame;
final AgentSpan span =
DECORATE.onReceiveFrameStart(
receiverContext,
MESSAGE_TYPE_TEXT.equals(receiverContext.getMessageType())
? continuationWebSocketFrame.getText()
: continuationWebSocketFrame.getBinaryData().array(),
continuationWebSocketFrame.isFinalFragment());
try (final AgentScope scope = activateSpan(span)) {
ctx.sendUpstream(event);
} finally {
if (continuationWebSocketFrame.isFinalFragment()) {
traceContext.setReceiverHandlerContext(null);
DECORATE.onFrameEnd(receiverContext);
}
}
return;
}

if (frame instanceof CloseWebSocketFrame) {
// WebSocket Closed by client
CloseWebSocketFrame closeFrame = (CloseWebSocketFrame) frame;
int statusCode = closeFrame.getStatusCode();
String reasonText = closeFrame.getReasonText();
traceContext.setSenderHandlerContext(null);
traceContext.setReceiverHandlerContext(null);
final AgentSpan span =
DECORATE.onSessionCloseReceived(receiverContext, reasonText, statusCode);
try (final AgentScope scope = activateSpan(span)) {
ctx.sendUpstream(event);
if (closeFrame.isFinalFragment()) {
DECORATE.onFrameEnd(receiverContext);
}
}
return;
}
}
}
}

ctx.sendUpstream(event); // superclass does not throw
}
}
Loading
Loading