From ec126c834a9712149142ef386c6ef1d680d59066 Mon Sep 17 00:00:00 2001 From: Stuart McCulloch Date: Tue, 14 Jan 2025 12:51:37 +0000 Subject: [PATCH 01/24] Use the JDK's built-in support for Unix Domain Sockets on Java 16+ --- utils/socket-utils/build.gradle | 29 +++ .../socket/UnixDomainSocketFactory.java | 13 +- .../common/socket/TunnelingJdkSocket.java | 183 ++++++++++++++++++ 3 files changed, 223 insertions(+), 2 deletions(-) create mode 100644 utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java diff --git a/utils/socket-utils/build.gradle b/utils/socket-utils/build.gradle index eb09cca849f..51276c768c1 100644 --- a/utils/socket-utils/build.gradle +++ b/utils/socket-utils/build.gradle @@ -1,8 +1,37 @@ apply from: "$rootDir/gradle/java.gradle" +apply plugin: "idea" + +sourceSets { + main_java17 { + java.srcDirs "${project.projectDir}/src/main/java17" + } +} + +compileMain_java17Java.configure { + setJavaVersion(it, 17) + sourceCompatibility = JavaVersion.VERSION_1_8 + targetCompatibility = JavaVersion.VERSION_1_8 +} dependencies { + compileOnly sourceSets.main_java17.output + implementation libs.slf4j implementation project(':internal-api') implementation group: 'com.github.jnr', name: 'jnr-unixsocket', version: libs.versions.jnr.unixsocket.get() } + +jar { + from sourceSets.main_java17.output +} + +forbiddenApisMain_java17 { + failOnMissingClasses = false +} + +idea { + module { + jdkName = '17' + } +} diff --git a/utils/socket-utils/src/main/java/datadog/common/socket/UnixDomainSocketFactory.java b/utils/socket-utils/src/main/java/datadog/common/socket/UnixDomainSocketFactory.java index bb1929f369b..bf4001d0a03 100644 --- a/utils/socket-utils/src/main/java/datadog/common/socket/UnixDomainSocketFactory.java +++ b/utils/socket-utils/src/main/java/datadog/common/socket/UnixDomainSocketFactory.java @@ -3,6 +3,7 @@ import static java.util.concurrent.TimeUnit.MINUTES; import datadog.trace.api.Config; +import datadog.trace.api.Platform; import datadog.trace.relocate.api.RatelimitedLogger; import java.io.File; import java.io.IOException; @@ -24,6 +25,8 @@ public final class UnixDomainSocketFactory extends SocketFactory { private static final Logger log = LoggerFactory.getLogger(UnixDomainSocketFactory.class); + private static final boolean JDK_SUPPORTS_UDS = Platform.isJavaVersionAtLeast(16); + private final RatelimitedLogger rlLog = new RatelimitedLogger(log, 5, MINUTES); private final File path; @@ -35,8 +38,14 @@ public UnixDomainSocketFactory(final File path) { @Override public Socket createSocket() throws IOException { try { - final UnixSocketChannel channel = UnixSocketChannel.open(); - return new TunnelingUnixSocket(path, channel); + if (JDK_SUPPORTS_UDS) { + try { + return new TunnelingJdkSocket(path.toPath()); + } catch (Throwable ignore) { + // fall back to jnr-unixsocket library + } + } + return new TunnelingUnixSocket(path, UnixSocketChannel.open()); } catch (Throwable e) { if (Config.get().isAgentConfiguredUsingDefault()) { // fall back to port if we previously auto-discovered this socket file diff --git a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java new file mode 100644 index 00000000000..e4c94c91749 --- /dev/null +++ b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java @@ -0,0 +1,183 @@ +package datadog.common.socket; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.UnixDomainSocketAddress; +import java.nio.channels.Channels; +import java.nio.channels.SocketChannel; +import java.nio.file.Path; + +/** + * Subtype UNIX socket for a higher-fidelity impersonation of TCP sockets. This is named "tunneling" + * because it assumes the ultimate destination has a hostname and port. + * + *

Bsed on {@link TunnelingUnixSocket}; adapted to use the built-in UDS support added in Java 16. + */ +final class TunnelingJdkSocket extends Socket { + private final SocketAddress unixSocketAddress; + private InetSocketAddress inetSocketAddress; + + private SocketChannel unixSocketChannel; + + private int timeout; + private boolean shutIn; + private boolean shutOut; + private boolean closed; + + TunnelingJdkSocket(final Path path) { + this.unixSocketAddress = UnixDomainSocketAddress.of(path); + } + + TunnelingJdkSocket(final Path path, final InetSocketAddress address) { + this(path); + inetSocketAddress = address; + } + + @Override + public boolean isConnected() { + return null != unixSocketChannel; + } + + @Override + public boolean isInputShutdown() { + return shutIn; + } + + @Override + public boolean isOutputShutdown() { + return shutOut; + } + + @Override + public boolean isClosed() { + return closed; + } + + @Override + public synchronized void setSoTimeout(int timeout) throws SocketException { + if (isClosed()) { + throw new SocketException("Socket is closed"); + } + if (timeout < 0) { + throw new IllegalArgumentException("Socket timeout can't be negative"); + } + this.timeout = timeout; + } + + @Override + public synchronized int getSoTimeout() throws SocketException { + if (isClosed()) { + throw new SocketException("Socket is closed"); + } + return timeout; + } + + @Override + public void connect(final SocketAddress endpoint) throws IOException { + if (isClosed()) { + throw new SocketException("Socket is closed"); + } + if (isConnected()) { + throw new SocketException("Socket is already connected"); + } + inetSocketAddress = (InetSocketAddress) endpoint; + unixSocketChannel = SocketChannel.open(unixSocketAddress); + } + + @Override + public void connect(final SocketAddress endpoint, final int timeout) throws IOException { + if (isClosed()) { + throw new SocketException("Socket is closed"); + } + if (isConnected()) { + throw new SocketException("Socket is already connected"); + } + inetSocketAddress = (InetSocketAddress) endpoint; + unixSocketChannel = SocketChannel.open(unixSocketAddress); + } + + @Override + public SocketChannel getChannel() { + return unixSocketChannel; + } + + @Override + public InputStream getInputStream() throws IOException { + if (isClosed()) { + throw new SocketException("Socket is closed"); + } + if (!isConnected()) { + throw new SocketException("Socket is not connected"); + } + if (isInputShutdown()) { + throw new SocketException("Socket input is shutdown"); + } + return Channels.newInputStream(unixSocketChannel); + } + + @Override + public OutputStream getOutputStream() throws IOException { + if (isClosed()) { + throw new SocketException("Socket is closed"); + } + if (!isConnected()) { + throw new SocketException("Socket is not connected"); + } + if (isInputShutdown()) { + throw new SocketException("Socket output is shutdown"); + } + return Channels.newOutputStream(unixSocketChannel); + } + + @Override + public void shutdownInput() throws IOException { + if (isClosed()) { + throw new SocketException("Socket is closed"); + } + if (!isConnected()) { + throw new SocketException("Socket is not connected"); + } + if (isInputShutdown()) { + throw new SocketException("Socket input is already shutdown"); + } + unixSocketChannel.shutdownInput(); + shutIn = true; + } + + @Override + public void shutdownOutput() throws IOException { + if (isClosed()) { + throw new SocketException("Socket is closed"); + } + if (!isConnected()) { + throw new SocketException("Socket is not connected"); + } + if (isOutputShutdown()) { + throw new SocketException("Socket output is already shutdown"); + } + unixSocketChannel.shutdownOutput(); + shutOut = true; + } + + @Override + public InetAddress getInetAddress() { + return inetSocketAddress.getAddress(); + } + + @Override + public void close() throws IOException { + if (isClosed()) { + return; + } + if (null != unixSocketChannel) { + unixSocketChannel.close(); + } + closed = true; + } +} From 87425f158163c2360f7cc0bbe87d21409f5a500a Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Wed, 29 Jan 2025 17:12:23 -0500 Subject: [PATCH 02/24] First draft of timeout test. --- utils/socket-utils/build.gradle | 4 ++ .../common/socket/TunnelingJdkSocketTest.java | 38 +++++++++++++++++++ 2 files changed, 42 insertions(+) create mode 100644 utils/socket-utils/src/test/java17/datadog/common/socket/TunnelingJdkSocketTest.java diff --git a/utils/socket-utils/build.gradle b/utils/socket-utils/build.gradle index 51276c768c1..f4df69b3e88 100644 --- a/utils/socket-utils/build.gradle +++ b/utils/socket-utils/build.gradle @@ -1,6 +1,10 @@ apply from: "$rootDir/gradle/java.gradle" apply plugin: "idea" +ext { + minJavaVersionForTests = JavaVersion.VERSION_17 +} + sourceSets { main_java17 { java.srcDirs "${project.projectDir}/src/main/java17" diff --git a/utils/socket-utils/src/test/java17/datadog/common/socket/TunnelingJdkSocketTest.java b/utils/socket-utils/src/test/java17/datadog/common/socket/TunnelingJdkSocketTest.java new file mode 100644 index 00000000000..081c993f01d --- /dev/null +++ b/utils/socket-utils/src/test/java17/datadog/common/socket/TunnelingJdkSocketTest.java @@ -0,0 +1,38 @@ +package datadog.common.socket; + +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TunnelingJdkSocketTest { + + @Test + public void testTimeout() throws Exception { + Assertions.assertEquals(1 + 1, 3); // should fail + + // create test path + Path socketPath = Files.createTempFile("testSocket", null); + // create client socket + TunnelingJdkSocket clientSocket = createClient(socketPath); + + // attempt to read from empty socket (read should block indefinitely) + assertTimeoutPreemptively(Duration.ofSeconds(5), () -> clientSocket.getInputStream().read()); + + // clean up + clientSocket.close(); + Files.deleteIfExists(socketPath); + } + + private TunnelingJdkSocket createClient(Path socketPath) throws IOException { + // create client socket + TunnelingJdkSocket clientSocket = new TunnelingJdkSocket(socketPath); + // set timeout to one second + clientSocket.setSoTimeout(1000); + return clientSocket; + } +} From 1634bcdd8157d0d731863c18ec9bfd8901752fab Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Thu, 30 Jan 2025 16:53:03 -0500 Subject: [PATCH 03/24] Add server to test. --- utils/socket-utils/build.gradle | 4 +++ .../common/socket/TunnelingJdkSocketTest.java | 31 ++++++++++++++++++- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/utils/socket-utils/build.gradle b/utils/socket-utils/build.gradle index f4df69b3e88..aad4b67e16a 100644 --- a/utils/socket-utils/build.gradle +++ b/utils/socket-utils/build.gradle @@ -9,6 +9,10 @@ sourceSets { main_java17 { java.srcDirs "${project.projectDir}/src/main/java17" } + + test_java17 { + java.srcDirs "${project.projectDir}/src/test/java17" + } } compileMain_java17Java.configure { diff --git a/utils/socket-utils/src/test/java17/datadog/common/socket/TunnelingJdkSocketTest.java b/utils/socket-utils/src/test/java17/datadog/common/socket/TunnelingJdkSocketTest.java index 081c993f01d..30ed1695e10 100644 --- a/utils/socket-utils/src/test/java17/datadog/common/socket/TunnelingJdkSocketTest.java +++ b/utils/socket-utils/src/test/java17/datadog/common/socket/TunnelingJdkSocketTest.java @@ -3,31 +3,60 @@ import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import java.io.IOException; +import java.nio.channels.ServerSocketChannel; import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; +import jnr.unixsocket.UnixSocketAddress; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; public class TunnelingJdkSocketTest { + private final AtomicBoolean running = new AtomicBoolean(false); + @Test public void testTimeout() throws Exception { Assertions.assertEquals(1 + 1, 3); // should fail // create test path Path socketPath = Files.createTempFile("testSocket", null); + // start server + startServer(socketPath); // create client socket TunnelingJdkSocket clientSocket = createClient(socketPath); // attempt to read from empty socket (read should block indefinitely) assertTimeoutPreemptively(Duration.ofSeconds(5), () -> clientSocket.getInputStream().read()); - // clean up + // clean up client, server, and path clientSocket.close(); + running.set(false); Files.deleteIfExists(socketPath); } + private void startServer(Path socketPath) { + Thread serverThread = + new Thread( + () -> { + // open and bind server to socketPath + try (ServerSocketChannel serverChannel = ServerSocketChannel.open()) { + serverChannel.socket().bind(new UnixSocketAddress(socketPath.toFile())); + // accept connections made to the server + running.set(true); + while (running.get()) { + serverChannel.accept(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + // start server in separate thread + serverThread.start(); + } + private TunnelingJdkSocket createClient(Path socketPath) throws IOException { // create client socket TunnelingJdkSocket clientSocket = new TunnelingJdkSocket(socketPath); From 683e7c16ed17fad63d13227e2357d0ab01417a7a Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Fri, 31 Jan 2025 10:11:00 -0500 Subject: [PATCH 04/24] Fix build.gradle file. --- utils/socket-utils/build.gradle | 31 +++++-------------- .../common/socket/TunnelingJdkSocketTest.java | 0 2 files changed, 8 insertions(+), 23 deletions(-) rename utils/socket-utils/src/test/{java17 => java}/datadog/common/socket/TunnelingJdkSocketTest.java (100%) diff --git a/utils/socket-utils/build.gradle b/utils/socket-utils/build.gradle index aad4b67e16a..1626e511ef6 100644 --- a/utils/socket-utils/build.gradle +++ b/utils/socket-utils/build.gradle @@ -1,37 +1,22 @@ -apply from: "$rootDir/gradle/java.gradle" -apply plugin: "idea" - ext { minJavaVersionForTests = JavaVersion.VERSION_17 } -sourceSets { - main_java17 { - java.srcDirs "${project.projectDir}/src/main/java17" - } +apply from: "$rootDir/gradle/java.gradle" +apply plugin: "idea" - test_java17 { - java.srcDirs "${project.projectDir}/src/test/java17" +[compileMain_java17Java, compileTestJava].each { + it.configure { + setJavaVersion(it, 17) + sourceCompatibility = JavaVersion.VERSION_1_8 + targetCompatibility = JavaVersion.VERSION_1_8 } } -compileMain_java17Java.configure { - setJavaVersion(it, 17) - sourceCompatibility = JavaVersion.VERSION_1_8 - targetCompatibility = JavaVersion.VERSION_1_8 -} - dependencies { - compileOnly sourceSets.main_java17.output - implementation libs.slf4j implementation project(':internal-api') - - implementation group: 'com.github.jnr', name: 'jnr-unixsocket', version: libs.versions.jnr.unixsocket.get() -} - -jar { - from sourceSets.main_java17.output + implementation libs.jnr.unixsocket } forbiddenApisMain_java17 { diff --git a/utils/socket-utils/src/test/java17/datadog/common/socket/TunnelingJdkSocketTest.java b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java similarity index 100% rename from utils/socket-utils/src/test/java17/datadog/common/socket/TunnelingJdkSocketTest.java rename to utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java From 662602a2873b0f96388becce9af85ea075d876fe Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Fri, 31 Jan 2025 12:39:00 -0500 Subject: [PATCH 05/24] Add debugging statements. --- .../common/socket/TunnelingJdkSocketTest.java | 52 +++++++++++++++++-- 1 file changed, 47 insertions(+), 5 deletions(-) diff --git a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java index 30ed1695e10..9ccefc46f54 100644 --- a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java +++ b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java @@ -3,12 +3,12 @@ import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import java.io.IOException; +import java.net.UnixDomainSocketAddress; import java.nio.channels.ServerSocketChannel; import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; -import jnr.unixsocket.UnixSocketAddress; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -18,22 +18,43 @@ public class TunnelingJdkSocketTest { @Test public void testTimeout() throws Exception { - Assertions.assertEquals(1 + 1, 3); // should fail + Assertions.assertEquals(1 + 1, 2); - // create test path - Path socketPath = Files.createTempFile("testSocket", null); + // set test socket path + Path socketPath = getSocketPath(); // start server startServer(socketPath); + + // timeout after two seconds if server doesn't start + long startTime = System.currentTimeMillis(); + long timeout = 2000; + while (!running.get()) { + Thread.sleep(100); + if (System.currentTimeMillis() - startTime > timeout) { + System.out.println("Timeout waiting for server to start."); + break; + } + } + // create client socket TunnelingJdkSocket clientSocket = createClient(socketPath); // attempt to read from empty socket (read should block indefinitely) + System.out.println("Test is starting..."); assertTimeoutPreemptively(Duration.ofSeconds(5), () -> clientSocket.getInputStream().read()); // clean up client, server, and path clientSocket.close(); running.set(false); Files.deleteIfExists(socketPath); + System.out.println("Client, server, and path cleaned."); + } + + private Path getSocketPath() throws IOException { + Path socketPath = Files.createTempFile("testSocket", ".sock"); + Files.delete(socketPath); + socketPath.toFile().deleteOnExit(); + return socketPath; } private void startServer(Path socketPath) { @@ -42,13 +63,26 @@ private void startServer(Path socketPath) { () -> { // open and bind server to socketPath try (ServerSocketChannel serverChannel = ServerSocketChannel.open()) { - serverChannel.socket().bind(new UnixSocketAddress(socketPath.toFile())); + System.out.println("serverChannel is open."); + serverChannel.configureBlocking(false); + System.out.println("serverChannel is not blocking."); + serverChannel.socket().bind(UnixDomainSocketAddress.of(socketPath)); // accept connections made to the server running.set(true); + System.out.println("Server is running and ready to accept connections."); while (running.get()) { serverChannel.accept(); + System.out.println("Server is accepting connections."); } } catch (IOException e) { + System.out.println("Server encountered error with accepting a connection."); + // clean up server and path + running.set(false); + try { + Files.deleteIfExists(socketPath); + } catch (IOException ex) { + throw new RuntimeException(ex); + } throw new RuntimeException(e); } }); @@ -62,6 +96,14 @@ private TunnelingJdkSocket createClient(Path socketPath) throws IOException { TunnelingJdkSocket clientSocket = new TunnelingJdkSocket(socketPath); // set timeout to one second clientSocket.setSoTimeout(1000); + System.out.println("Client set timeout."); + + if (clientSocket.isConnected()) { + System.out.println("Client connected successfully."); + } else { + System.out.println("Client failed to connect."); + } + return clientSocket; } } From 64da205dc86663c8d257fa2d384a4fd66fc50fd0 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Fri, 31 Jan 2025 17:31:46 -0500 Subject: [PATCH 06/24] Second draft of timeout test. --- .../common/socket/TunnelingJdkSocketTest.java | 87 ++++++------------- 1 file changed, 28 insertions(+), 59 deletions(-) diff --git a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java index 9ccefc46f54..be62151777c 100644 --- a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java +++ b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java @@ -3,107 +3,76 @@ import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.StandardProtocolFamily; import java.net.UnixDomainSocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; public class TunnelingJdkSocketTest { - private final AtomicBoolean running = new AtomicBoolean(false); + private static final AtomicBoolean is_server_running = new AtomicBoolean(false); @Test public void testTimeout() throws Exception { - Assertions.assertEquals(1 + 1, 2); - - // set test socket path + // set socket path and address Path socketPath = getSocketPath(); - // start server - startServer(socketPath); + UnixDomainSocketAddress socketAddress = UnixDomainSocketAddress.of(socketPath); - // timeout after two seconds if server doesn't start - long startTime = System.currentTimeMillis(); - long timeout = 2000; - while (!running.get()) { - Thread.sleep(100); - if (System.currentTimeMillis() - startTime > timeout) { - System.out.println("Timeout waiting for server to start."); - break; - } - } + // start server in a separate thread + startServer(socketAddress, false); - // create client socket + // create client TunnelingJdkSocket clientSocket = createClient(socketPath); - // attempt to read from empty socket (read should block indefinitely) - System.out.println("Test is starting..."); - assertTimeoutPreemptively(Duration.ofSeconds(5), () -> clientSocket.getInputStream().read()); + // expect a failure after three seconds because timeout is not supported yet + assertTimeoutPreemptively(Duration.ofMillis(3000), () -> clientSocket.getInputStream().read()); - // clean up client, server, and path + // clean up clientSocket.close(); - running.set(false); - Files.deleteIfExists(socketPath); - System.out.println("Client, server, and path cleaned."); + is_server_running.set(false); } private Path getSocketPath() throws IOException { - Path socketPath = Files.createTempFile("testSocket", ".sock"); + Path socketPath = Files.createTempFile("testSocket", null); Files.delete(socketPath); socketPath.toFile().deleteOnExit(); return socketPath; } - private void startServer(Path socketPath) { + private static void startServer(UnixDomainSocketAddress socketAddress, boolean sendMessage) { Thread serverThread = new Thread( () -> { - // open and bind server to socketPath - try (ServerSocketChannel serverChannel = ServerSocketChannel.open()) { - System.out.println("serverChannel is open."); - serverChannel.configureBlocking(false); - System.out.println("serverChannel is not blocking."); - serverChannel.socket().bind(UnixDomainSocketAddress.of(socketPath)); - // accept connections made to the server - running.set(true); - System.out.println("Server is running and ready to accept connections."); - while (running.get()) { - serverChannel.accept(); - System.out.println("Server is accepting connections."); + try (ServerSocketChannel serverChannel = + ServerSocketChannel.open(StandardProtocolFamily.UNIX)) { + serverChannel.bind(socketAddress); + is_server_running.set(true); + + // wait for client connection + while (is_server_running.get()) { + SocketChannel clientChannel = serverChannel.accept(); + if (sendMessage) { + clientChannel.write(ByteBuffer.wrap("Hello!".getBytes())); + } } } catch (IOException e) { - System.out.println("Server encountered error with accepting a connection."); - // clean up server and path - running.set(false); - try { - Files.deleteIfExists(socketPath); - } catch (IOException ex) { - throw new RuntimeException(ex); - } throw new RuntimeException(e); } }); - - // start server in separate thread serverThread.start(); } private TunnelingJdkSocket createClient(Path socketPath) throws IOException { - // create client socket TunnelingJdkSocket clientSocket = new TunnelingJdkSocket(socketPath); - // set timeout to one second + clientSocket.connect(new InetSocketAddress("localhost", 0)); clientSocket.setSoTimeout(1000); - System.out.println("Client set timeout."); - - if (clientSocket.isConnected()) { - System.out.println("Client connected successfully."); - } else { - System.out.println("Client failed to connect."); - } - return clientSocket; } } From 953f3e88991eb93185069e5d20ef1ecf4e11a6a4 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Thu, 6 Feb 2025 16:41:11 -0500 Subject: [PATCH 07/24] Update getInputStream to use a selector. --- .../common/socket/TunnelingJdkSocket.java | 44 ++++++++++++++++++- .../common/socket/TunnelingJdkSocketTest.java | 2 +- 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java index e4c94c91749..1493af05c27 100644 --- a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java +++ b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java @@ -9,7 +9,10 @@ import java.net.SocketAddress; import java.net.SocketException; import java.net.UnixDomainSocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.Channels; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.file.Path; @@ -118,7 +121,46 @@ public InputStream getInputStream() throws IOException { if (isInputShutdown()) { throw new SocketException("Socket input is shutdown"); } - return Channels.newInputStream(unixSocketChannel); + + Selector selector = Selector.open(); + unixSocketChannel.configureBlocking(false); + unixSocketChannel.register(selector, SelectionKey.OP_READ); + ByteBuffer buffer = ByteBuffer.allocate(256); // arbitrary buffer size for now + + try { + if (selector.select(timeout) == 0) { + System.out.println("Timeout (" + timeout + "ms) while waiting for data."); + } + for (SelectionKey key : selector.selectedKeys()) { + if (key.isReadable()) { + int r = unixSocketChannel.read(buffer); + if (r == -1) { + unixSocketChannel.close(); + System.out.println("Not accepting client messages anymore."); + } + } + } + buffer.flip(); + } finally { + selector.close(); + } + + return new InputStream() { + @Override + public int read() { + return buffer.hasRemaining() ? (buffer.get() & 0xFF) : -1; + } + + @Override + public int read(byte[] bytes, int off, int len) { + if (!buffer.hasRemaining()) { + return -1; + } + len = Math.min(len, buffer.remaining()); + buffer.get(bytes, off, len); + return len; + } + }; } @Override diff --git a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java index be62151777c..60335a9881e 100644 --- a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java +++ b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java @@ -31,7 +31,7 @@ public void testTimeout() throws Exception { // create client TunnelingJdkSocket clientSocket = createClient(socketPath); - // expect a failure after three seconds because timeout is not supported yet + // will fail after three seconds if timeout (set to one second) is not supported assertTimeoutPreemptively(Duration.ofMillis(3000), () -> clientSocket.getInputStream().read()); // clean up From d07fc197d4fa9bfa28ece4bfc3585ef353bf66a3 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Thu, 6 Feb 2025 16:49:18 -0500 Subject: [PATCH 08/24] Clean code. --- .../common/socket/TunnelingJdkSocket.java | 2 +- .../common/socket/TunnelingJdkSocketTest.java | 23 ++++++------------- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java index 1493af05c27..9740a0695f7 100644 --- a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java +++ b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java @@ -125,7 +125,7 @@ public InputStream getInputStream() throws IOException { Selector selector = Selector.open(); unixSocketChannel.configureBlocking(false); unixSocketChannel.register(selector, SelectionKey.OP_READ); - ByteBuffer buffer = ByteBuffer.allocate(256); // arbitrary buffer size for now + ByteBuffer buffer = ByteBuffer.allocate(256); try { if (selector.select(timeout) == 0) { diff --git a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java index 60335a9881e..4ce8c3c18ec 100644 --- a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java +++ b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java @@ -6,7 +6,6 @@ import java.net.InetSocketAddress; import java.net.StandardProtocolFamily; import java.net.UnixDomainSocketAddress; -import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.file.Files; @@ -18,23 +17,19 @@ public class TunnelingJdkSocketTest { private static final AtomicBoolean is_server_running = new AtomicBoolean(false); + private final int client_timeout = 1000; + private final int test_timeout = 3000; @Test public void testTimeout() throws Exception { - // set socket path and address Path socketPath = getSocketPath(); UnixDomainSocketAddress socketAddress = UnixDomainSocketAddress.of(socketPath); - - // start server in a separate thread - startServer(socketAddress, false); - - // create client + startServer(socketAddress); TunnelingJdkSocket clientSocket = createClient(socketPath); - // will fail after three seconds if timeout (set to one second) is not supported - assertTimeoutPreemptively(Duration.ofMillis(3000), () -> clientSocket.getInputStream().read()); + assertTimeoutPreemptively( + Duration.ofMillis(test_timeout), () -> clientSocket.getInputStream().read()); - // clean up clientSocket.close(); is_server_running.set(false); } @@ -46,7 +41,7 @@ private Path getSocketPath() throws IOException { return socketPath; } - private static void startServer(UnixDomainSocketAddress socketAddress, boolean sendMessage) { + private static void startServer(UnixDomainSocketAddress socketAddress) { Thread serverThread = new Thread( () -> { @@ -55,12 +50,8 @@ private static void startServer(UnixDomainSocketAddress socketAddress, boolean s serverChannel.bind(socketAddress); is_server_running.set(true); - // wait for client connection while (is_server_running.get()) { SocketChannel clientChannel = serverChannel.accept(); - if (sendMessage) { - clientChannel.write(ByteBuffer.wrap("Hello!".getBytes())); - } } } catch (IOException e) { throw new RuntimeException(e); @@ -72,7 +63,7 @@ private static void startServer(UnixDomainSocketAddress socketAddress, boolean s private TunnelingJdkSocket createClient(Path socketPath) throws IOException { TunnelingJdkSocket clientSocket = new TunnelingJdkSocket(socketPath); clientSocket.connect(new InetSocketAddress("localhost", 0)); - clientSocket.setSoTimeout(1000); + clientSocket.setSoTimeout(client_timeout); return clientSocket; } } From 2aec0e7f8b88a7ee6dde194cb04de6f3daa6a9b1 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Mon, 10 Feb 2025 15:45:21 -0500 Subject: [PATCH 09/24] Adjust dd-java-agent build.gradle. --- dd-java-agent/build.gradle | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/dd-java-agent/build.gradle b/dd-java-agent/build.gradle index 459880a0ee7..c253b1b2c60 100644 --- a/dd-java-agent/build.gradle +++ b/dd-java-agent/build.gradle @@ -203,6 +203,7 @@ tasks.withType(GenerateMavenPom).configureEach { task -> dependencies { implementation project(path: ':components:json') + implementation project(':utils:socket-utils') modules { module("com.squareup.okio:okio") { replacedBy("com.datadoghq.okio:okio") // embed our patched fork @@ -250,9 +251,6 @@ dependencies { sharedShadowInclude project(':utils:container-utils'), { transitive = false } - sharedShadowInclude project(':utils:socket-utils'), { - transitive = false - } sharedShadowInclude project(':utils:version-utils'), { transitive = false } @@ -289,7 +287,7 @@ tasks.register('checkAgentJarSize').configure { doLast { // Arbitrary limit to prevent unintentional increases to the agent jar size // Raise or lower as required - assert shadowJar.archiveFile.get().getAsFile().length() <= 30 * 1024 * 1024 + assert shadowJar.archiveFile.get().getAsFile().length() <= 31 * 1024 * 1024 } dependsOn "shadowJar" From ea9a237f36c715bb00f9b24a390ed5136849b7c4 Mon Sep 17 00:00:00 2001 From: Stuart McCulloch Date: Wed, 12 Feb 2025 15:43:13 +0000 Subject: [PATCH 10/24] Revert dd-java-agent build.gradle change --- dd-java-agent/build.gradle | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dd-java-agent/build.gradle b/dd-java-agent/build.gradle index c253b1b2c60..6528a2e3c2e 100644 --- a/dd-java-agent/build.gradle +++ b/dd-java-agent/build.gradle @@ -203,7 +203,6 @@ tasks.withType(GenerateMavenPom).configureEach { task -> dependencies { implementation project(path: ':components:json') - implementation project(':utils:socket-utils') modules { module("com.squareup.okio:okio") { replacedBy("com.datadoghq.okio:okio") // embed our patched fork @@ -251,6 +250,9 @@ dependencies { sharedShadowInclude project(':utils:container-utils'), { transitive = false } + sharedShadowInclude project(':utils:socket-utils'), { + transitive = false + } sharedShadowInclude project(':utils:version-utils'), { transitive = false } From e475abdd047759b05edf1c325f15f8d1d09e35c1 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 18 Feb 2025 16:44:39 -0500 Subject: [PATCH 11/24] Revert another build.gradle change. --- dd-java-agent/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dd-java-agent/build.gradle b/dd-java-agent/build.gradle index 6528a2e3c2e..459880a0ee7 100644 --- a/dd-java-agent/build.gradle +++ b/dd-java-agent/build.gradle @@ -289,7 +289,7 @@ tasks.register('checkAgentJarSize').configure { doLast { // Arbitrary limit to prevent unintentional increases to the agent jar size // Raise or lower as required - assert shadowJar.archiveFile.get().getAsFile().length() <= 31 * 1024 * 1024 + assert shadowJar.archiveFile.get().getAsFile().length() <= 30 * 1024 * 1024 } dependsOn "shadowJar" From 38b98db9b1e09e4ec7f9e4411836b4be2ee5d9d1 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 18 Feb 2025 17:05:18 -0500 Subject: [PATCH 12/24] Try specifying setSrcDirs. --- utils/socket-utils/build.gradle | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/utils/socket-utils/build.gradle b/utils/socket-utils/build.gradle index 1626e511ef6..4b5355e9110 100644 --- a/utils/socket-utils/build.gradle +++ b/utils/socket-utils/build.gradle @@ -13,6 +13,14 @@ apply plugin: "idea" } } +sourceSets { + main { + java { + setSrcDirs(['src/main/java', 'src/main/java17']) + } + } +} + dependencies { implementation libs.slf4j implementation project(':internal-api') From 5ae79f9027c34386f29eb735830a095d663d8203 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 18 Feb 2025 17:23:34 -0500 Subject: [PATCH 13/24] Try changing compatibility version. --- utils/socket-utils/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/utils/socket-utils/build.gradle b/utils/socket-utils/build.gradle index 4b5355e9110..92dc0213b9e 100644 --- a/utils/socket-utils/build.gradle +++ b/utils/socket-utils/build.gradle @@ -8,8 +8,8 @@ apply plugin: "idea" [compileMain_java17Java, compileTestJava].each { it.configure { setJavaVersion(it, 17) - sourceCompatibility = JavaVersion.VERSION_1_8 - targetCompatibility = JavaVersion.VERSION_1_8 + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 } } From e80ce7ef886557d20cbc2b8851446feef62ea953 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 18 Feb 2025 17:32:32 -0500 Subject: [PATCH 14/24] Revert previous two changes. --- utils/socket-utils/build.gradle | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/utils/socket-utils/build.gradle b/utils/socket-utils/build.gradle index 92dc0213b9e..1626e511ef6 100644 --- a/utils/socket-utils/build.gradle +++ b/utils/socket-utils/build.gradle @@ -8,16 +8,8 @@ apply plugin: "idea" [compileMain_java17Java, compileTestJava].each { it.configure { setJavaVersion(it, 17) - sourceCompatibility = JavaVersion.VERSION_17 - targetCompatibility = JavaVersion.VERSION_17 - } -} - -sourceSets { - main { - java { - setSrcDirs(['src/main/java', 'src/main/java17']) - } + sourceCompatibility = JavaVersion.VERSION_1_8 + targetCompatibility = JavaVersion.VERSION_1_8 } } From 6c188d9cc9d3501bf8914815153c727c944339c9 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Thu, 20 Feb 2025 16:54:46 -0500 Subject: [PATCH 15/24] Avoid implementation dependency for Java17. --- gradle/java_no_deps.gradle | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/gradle/java_no_deps.gradle b/gradle/java_no_deps.gradle index 4a05431c074..0c67d5c8c47 100644 --- a/gradle/java_no_deps.gradle +++ b/gradle/java_no_deps.gradle @@ -60,8 +60,12 @@ if (project.hasProperty('minJavaVersionForTests') && project.getProperty('minJav } dependencies { - compileOnly files(project.sourceSets."main_$name".compileClasspath) - implementation files(project.sourceSets."main_$name".output) + if (name == "java17") { + compileOnly files(project.sourceSets."main_$name".output) + } else { + compileOnly files(project.sourceSets."main_$name".compileClasspath) + implementation files(project.sourceSets."main_$name".output) + } } jar { From dda0997976f191f8f09078604670ca7a47581b22 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Thu, 20 Feb 2025 17:22:40 -0500 Subject: [PATCH 16/24] Make gradle dependency more specific and add testImplementation to socket-utils. --- gradle/java_no_deps.gradle | 2 +- utils/socket-utils/build.gradle | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/gradle/java_no_deps.gradle b/gradle/java_no_deps.gradle index 0c67d5c8c47..af9dbaa74d4 100644 --- a/gradle/java_no_deps.gradle +++ b/gradle/java_no_deps.gradle @@ -60,7 +60,7 @@ if (project.hasProperty('minJavaVersionForTests') && project.getProperty('minJav } dependencies { - if (name == "java17") { + if ("${project.projectDir}".endsWith("socket-utils")) { compileOnly files(project.sourceSets."main_$name".output) } else { compileOnly files(project.sourceSets."main_$name".compileClasspath) diff --git a/utils/socket-utils/build.gradle b/utils/socket-utils/build.gradle index 1626e511ef6..d2e05364414 100644 --- a/utils/socket-utils/build.gradle +++ b/utils/socket-utils/build.gradle @@ -17,6 +17,7 @@ dependencies { implementation libs.slf4j implementation project(':internal-api') implementation libs.jnr.unixsocket + testImplementation files(sourceSets.main_java17.output) } forbiddenApisMain_java17 { From 412e4d3c5664b955e4398e978128a9b4cf738ca9 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Thu, 20 Feb 2025 18:06:36 -0500 Subject: [PATCH 17/24] Add synchronization to ensure that server starts before client connects. --- .../common/socket/TunnelingJdkSocketTest.java | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java index 4ce8c3c18ec..206532067ec 100644 --- a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java +++ b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java @@ -16,9 +16,9 @@ public class TunnelingJdkSocketTest { - private static final AtomicBoolean is_server_running = new AtomicBoolean(false); - private final int client_timeout = 1000; - private final int test_timeout = 3000; + private static final AtomicBoolean isServerRunning = new AtomicBoolean(false); + private final int clientTimeout = 1000; + private final int testTimeout = 3000; @Test public void testTimeout() throws Exception { @@ -28,10 +28,10 @@ public void testTimeout() throws Exception { TunnelingJdkSocket clientSocket = createClient(socketPath); assertTimeoutPreemptively( - Duration.ofMillis(test_timeout), () -> clientSocket.getInputStream().read()); + Duration.ofMillis(testTimeout), () -> clientSocket.getInputStream().read()); clientSocket.close(); - is_server_running.set(false); + isServerRunning.set(false); } private Path getSocketPath() throws IOException { @@ -48,9 +48,13 @@ private static void startServer(UnixDomainSocketAddress socketAddress) { try (ServerSocketChannel serverChannel = ServerSocketChannel.open(StandardProtocolFamily.UNIX)) { serverChannel.bind(socketAddress); - is_server_running.set(true); + isServerRunning.set(true); - while (is_server_running.get()) { + synchronized (isServerRunning) { + isServerRunning.notifyAll(); + } + + while (isServerRunning.get()) { SocketChannel clientChannel = serverChannel.accept(); } } catch (IOException e) { @@ -58,12 +62,22 @@ private static void startServer(UnixDomainSocketAddress socketAddress) { } }); serverThread.start(); + + synchronized (isServerRunning) { + while (!isServerRunning.get()) { + try { + isServerRunning.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } } private TunnelingJdkSocket createClient(Path socketPath) throws IOException { TunnelingJdkSocket clientSocket = new TunnelingJdkSocket(socketPath); clientSocket.connect(new InetSocketAddress("localhost", 0)); - clientSocket.setSoTimeout(client_timeout); + clientSocket.setSoTimeout(clientTimeout); return clientSocket; } } From 381f841e37ec1852b1139950a1f5a28b7d5fb5db Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Thu, 27 Feb 2025 15:44:14 -0500 Subject: [PATCH 18/24] Try this... --- .../common/socket/TunnelingJdkSocket.java | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java index 9740a0695f7..d35f10853c9 100644 --- a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java +++ b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java @@ -15,6 +15,7 @@ import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.file.Path; +import java.util.Iterator; /** * Subtype UNIX socket for a higher-fidelity impersonation of TCP sockets. This is named "tunneling" @@ -128,19 +129,27 @@ public InputStream getInputStream() throws IOException { ByteBuffer buffer = ByteBuffer.allocate(256); try { - if (selector.select(timeout) == 0) { - System.out.println("Timeout (" + timeout + "ms) while waiting for data."); - } - for (SelectionKey key : selector.selectedKeys()) { - if (key.isReadable()) { - int r = unixSocketChannel.read(buffer); - if (r == -1) { - unixSocketChannel.close(); - System.out.println("Not accepting client messages anymore."); + while (true) { + if (selector.select(timeout) == 0) { + System.out.println("Timeout (" + timeout + "ms) while waiting for data."); + break; + } + Iterator keyIterator = selector.selectedKeys().iterator(); + while (keyIterator.hasNext()) { + SelectionKey key = keyIterator.next(); + keyIterator.remove(); + if (key.isReadable()) { + int r = unixSocketChannel.read(buffer); + if (r == -1) { + unixSocketChannel.close(); + System.out.println("Not accepting client messages anymore."); + return InputStream.nullInputStream(); + } } } + buffer.flip(); + break; } - buffer.flip(); } finally { selector.close(); } From beba3166f79d72c96d76352a940ccd05b0bafa29 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 4 Mar 2025 15:28:47 -0500 Subject: [PATCH 19/24] Add print statements. --- .../common/socket/TunnelingJdkSocket.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java index d35f10853c9..0c1162f3a97 100644 --- a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java +++ b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java @@ -123,37 +123,56 @@ public InputStream getInputStream() throws IOException { throw new SocketException("Socket input is shutdown"); } + System.out.println("=====start====="); + Selector selector = Selector.open(); + System.out.println("=====1====="); unixSocketChannel.configureBlocking(false); + System.out.println("=====2====="); unixSocketChannel.register(selector, SelectionKey.OP_READ); + System.out.println("=====3====="); ByteBuffer buffer = ByteBuffer.allocate(256); + System.out.println("=====4====="); + try { while (true) { if (selector.select(timeout) == 0) { System.out.println("Timeout (" + timeout + "ms) while waiting for data."); break; } + System.out.println("=====5====="); Iterator keyIterator = selector.selectedKeys().iterator(); + System.out.println("=====6====="); while (keyIterator.hasNext()) { + System.out.println("=====7====="); SelectionKey key = keyIterator.next(); + System.out.println("=====8====="); keyIterator.remove(); + System.out.println("=====9====="); if (key.isReadable()) { + System.out.println("=====10====="); int r = unixSocketChannel.read(buffer); + System.out.println("=====11====="); if (r == -1) { + System.out.println("=====12====="); unixSocketChannel.close(); System.out.println("Not accepting client messages anymore."); return InputStream.nullInputStream(); } } } + System.out.println("=====13====="); buffer.flip(); break; } } finally { + System.out.println("=====14====="); selector.close(); } + System.out.println("=====15====="); + return new InputStream() { @Override public int read() { @@ -162,11 +181,15 @@ public int read() { @Override public int read(byte[] bytes, int off, int len) { + System.out.println("=====16====="); if (!buffer.hasRemaining()) { + System.out.println("=====17====="); return -1; } len = Math.min(len, buffer.remaining()); + System.out.println("=====18====="); buffer.get(bytes, off, len); + System.out.println("=====19====="); return len; } }; From 305d9afb165833dca5a7ed257d3c4e458b5c020e Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 4 Mar 2025 17:03:26 -0500 Subject: [PATCH 20/24] Add catch statement. --- .../main/java17/datadog/common/socket/TunnelingJdkSocket.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java index 0c1162f3a97..40fa57d16dd 100644 --- a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java +++ b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java @@ -166,6 +166,8 @@ public InputStream getInputStream() throws IOException { buffer.flip(); break; } + } catch (Exception e) { + System.out.println("=====Error while reading from client: " + e + "====="); } finally { System.out.println("=====14====="); selector.close(); From 8ff0eabe39f0f7b02ae6fdeed4c918d85ec13c9a Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Wed, 5 Mar 2025 17:27:29 -0500 Subject: [PATCH 21/24] Refactor getInputStream and getOutputStream. --- .../common/socket/TunnelingJdkSocket.java | 103 ++++++++---------- 1 file changed, 48 insertions(+), 55 deletions(-) diff --git a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java index 40fa57d16dd..cf3dfc8bb17 100644 --- a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java +++ b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java @@ -10,12 +10,12 @@ import java.net.SocketException; import java.net.UnixDomainSocketAddress; import java.nio.ByteBuffer; -import java.nio.channels.Channels; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.file.Path; import java.util.Iterator; +import java.util.Set; /** * Subtype UNIX socket for a higher-fidelity impersonation of TCP sockets. This is named "tunneling" @@ -123,76 +123,53 @@ public InputStream getInputStream() throws IOException { throw new SocketException("Socket input is shutdown"); } - System.out.println("=====start====="); + return new InputStream() { + private final ByteBuffer buffer = ByteBuffer.allocate(256); + private final Selector selector = Selector.open(); + + { + unixSocketChannel.configureBlocking(false); + unixSocketChannel.register(selector, SelectionKey.OP_READ); + } - Selector selector = Selector.open(); - System.out.println("=====1====="); - unixSocketChannel.configureBlocking(false); - System.out.println("=====2====="); - unixSocketChannel.register(selector, SelectionKey.OP_READ); - System.out.println("=====3====="); - ByteBuffer buffer = ByteBuffer.allocate(256); + @Override + public int read() throws IOException { + byte[] nextByte = new byte[1]; + return (read(nextByte, 0, 1) == -1) ? -1 : (nextByte[0] & 0xFF); + } - System.out.println("=====4====="); + @Override + public int read(byte[] b, int off, int len) throws IOException { + buffer.clear(); - try { - while (true) { - if (selector.select(timeout) == 0) { + int readyChannels = selector.select(timeout); + if (readyChannels == 0) { System.out.println("Timeout (" + timeout + "ms) while waiting for data."); - break; + return 0; } - System.out.println("=====5====="); - Iterator keyIterator = selector.selectedKeys().iterator(); - System.out.println("=====6====="); + + Set selectedKeys = selector.selectedKeys(); + Iterator keyIterator = selectedKeys.iterator(); while (keyIterator.hasNext()) { - System.out.println("=====7====="); SelectionKey key = keyIterator.next(); - System.out.println("=====8====="); keyIterator.remove(); - System.out.println("=====9====="); if (key.isReadable()) { - System.out.println("=====10====="); int r = unixSocketChannel.read(buffer); - System.out.println("=====11====="); if (r == -1) { - System.out.println("=====12====="); - unixSocketChannel.close(); - System.out.println("Not accepting client messages anymore."); - return InputStream.nullInputStream(); + return -1; } + buffer.flip(); + len = Math.min(r, len); + buffer.get(b, off, len); + return len; } } - System.out.println("=====13====="); - buffer.flip(); - break; + return 0; } - } catch (Exception e) { - System.out.println("=====Error while reading from client: " + e + "====="); - } finally { - System.out.println("=====14====="); - selector.close(); - } - - System.out.println("=====15====="); - return new InputStream() { @Override - public int read() { - return buffer.hasRemaining() ? (buffer.get() & 0xFF) : -1; - } - - @Override - public int read(byte[] bytes, int off, int len) { - System.out.println("=====16====="); - if (!buffer.hasRemaining()) { - System.out.println("=====17====="); - return -1; - } - len = Math.min(len, buffer.remaining()); - System.out.println("=====18====="); - buffer.get(bytes, off, len); - System.out.println("=====19====="); - return len; + public void close() throws IOException { + selector.close(); } }; } @@ -208,7 +185,23 @@ public OutputStream getOutputStream() throws IOException { if (isInputShutdown()) { throw new SocketException("Socket output is shutdown"); } - return Channels.newOutputStream(unixSocketChannel); + + return new OutputStream() { + @Override + public void write(int b) throws IOException { + byte[] array = ByteBuffer.allocate(4).putInt(b).array(); + write(array, 0, 4); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + ByteBuffer buffer = ByteBuffer.wrap(b, off, len); + + while (buffer.hasRemaining()) { + unixSocketChannel.write(buffer); + } + } + }; } @Override From 0251e37d114f34980a5fecd08822ab049a62044b Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 11 Mar 2025 15:28:22 -0400 Subject: [PATCH 22/24] Address PR comments. --- gradle/java_no_deps.gradle | 2 ++ .../datadog/common/socket/TunnelingJdkSocket.java | 10 +++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/gradle/java_no_deps.gradle b/gradle/java_no_deps.gradle index af9dbaa74d4..95a87f0e8ed 100644 --- a/gradle/java_no_deps.gradle +++ b/gradle/java_no_deps.gradle @@ -59,6 +59,8 @@ if (project.hasProperty('minJavaVersionForTests') && project.getProperty('minJav targetCompatibility = version } + // "socket-utils" is only set to compileOnly because the implementation dependency incorrectly adds Java17 classes to all jar prefixes. + // This causes the AgentJarIndex to search for other non-Java17 classes in the wrong prefix location and fail to resolve class names. dependencies { if ("${project.projectDir}".endsWith("socket-utils")) { compileOnly files(project.sourceSets."main_$name".output) diff --git a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java index cf3dfc8bb17..a814fcbc710 100644 --- a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java +++ b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java @@ -94,6 +94,9 @@ public void connect(final SocketAddress endpoint) throws IOException { unixSocketChannel = SocketChannel.open(unixSocketAddress); } + // `timeout` is intentionally ignored here, like in the jnr-unixsocket implementation. + // See: + // https://github.com/jnr/jnr-unixsocket/blob/master/src/main/java/jnr/unixsocket/UnixSocket.java#L89-L97 @Override public void connect(final SocketAddress endpoint, final int timeout) throws IOException { if (isClosed()) { @@ -124,7 +127,7 @@ public InputStream getInputStream() throws IOException { } return new InputStream() { - private final ByteBuffer buffer = ByteBuffer.allocate(256); + private final ByteBuffer buffer = ByteBuffer.allocate(8192); private final Selector selector = Selector.open(); { @@ -140,6 +143,11 @@ public int read() throws IOException { @Override public int read(byte[] b, int off, int len) throws IOException { + if (timeout == 0) { + System.out.println("Timeout (0ms) while waiting for data."); + return 0; + } + buffer.clear(); int readyChannels = selector.select(timeout); From 114d2306b85a19190eef11cf67bd6a4b95a8d8a9 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 11 Mar 2025 16:56:59 -0400 Subject: [PATCH 23/24] Add test for when timeout is 0. --- .../common/socket/TunnelingJdkSocket.java | 5 ----- .../common/socket/TunnelingJdkSocketTest.java | 21 ++++++++++++++++--- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java index a814fcbc710..6db94aa15ac 100644 --- a/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java +++ b/utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java @@ -143,11 +143,6 @@ public int read() throws IOException { @Override public int read(byte[] b, int off, int len) throws IOException { - if (timeout == 0) { - System.out.println("Timeout (0ms) while waiting for data."); - return 0; - } - buffer.clear(); int readyChannels = selector.select(timeout); diff --git a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java index 206532067ec..1441e6090e5 100644 --- a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java +++ b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java @@ -1,6 +1,7 @@ package datadog.common.socket; import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; +import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; import java.net.InetSocketAddress; @@ -17,19 +18,34 @@ public class TunnelingJdkSocketTest { private static final AtomicBoolean isServerRunning = new AtomicBoolean(false); - private final int clientTimeout = 1000; - private final int testTimeout = 3000; @Test public void testTimeout() throws Exception { + int testTimeout = 3000; Path socketPath = getSocketPath(); UnixDomainSocketAddress socketAddress = UnixDomainSocketAddress.of(socketPath); startServer(socketAddress); TunnelingJdkSocket clientSocket = createClient(socketPath); + // Test that the socket unblocks when timeout is set to >0 + clientSocket.setSoTimeout(1000); assertTimeoutPreemptively( Duration.ofMillis(testTimeout), () -> clientSocket.getInputStream().read()); + // Test that the socket blocks indefinitely when timeout is set to 0, per + // https://docs.oracle.com/en/java/javase/16/docs/api//java.base/java/net/Socket.html#setSoTimeout(int). + clientSocket.setSoTimeout(0); + boolean infiniteTimeOut = false; + try { + assertTimeoutPreemptively( + Duration.ofMillis(testTimeout), () -> clientSocket.getInputStream().read()); + } catch (AssertionError e) { + infiniteTimeOut = true; + } + if (!infiniteTimeOut) { + fail("Test failed: Expected infinite blocking when timeout is set to 0."); + } + clientSocket.close(); isServerRunning.set(false); } @@ -77,7 +93,6 @@ private static void startServer(UnixDomainSocketAddress socketAddress) { private TunnelingJdkSocket createClient(Path socketPath) throws IOException { TunnelingJdkSocket clientSocket = new TunnelingJdkSocket(socketPath); clientSocket.connect(new InetSocketAddress("localhost", 0)); - clientSocket.setSoTimeout(clientTimeout); return clientSocket; } } From 37ca729ec685228196da70949b8774a2ce8878c9 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Wed, 12 Mar 2025 17:11:33 -0400 Subject: [PATCH 24/24] Add config option. --- .../java/datadog/trace/api/config/GeneralConfig.java | 1 + .../src/main/java/datadog/trace/api/Config.java | 10 ++++++++++ .../test/groovy/datadog/trace/api/ConfigTest.groovy | 3 +++ .../datadog/common/socket/UnixDomainSocketFactory.java | 2 +- .../datadog/common/socket/TunnelingJdkSocketTest.java | 7 +++++++ 5 files changed, 22 insertions(+), 1 deletion(-) diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java index 9a5caf5c4a2..a882a097e59 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java @@ -94,6 +94,7 @@ public final class GeneralConfig { public static final String AGENTLESS_LOG_SUBMISSION_LEVEL = "agentless.log.submission.level"; public static final String AGENTLESS_LOG_SUBMISSION_URL = "agentless.log.submission.url"; public static final String APM_TRACING_ENABLED = "apm.tracing.enabled"; + public static final String JDK_SOCKET_ENABLED = "jdk.socket.enabled"; private GeneralConfig() {} } diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index 004a609898e..6554f6517d9 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -557,6 +557,8 @@ public static String getHostName() { private final boolean apmTracingEnabled; + private final boolean jdkSocketEnabled; + // Read order: System Properties -> Env Variables, [-> properties file], [-> default value] private Config() { this(ConfigProvider.createDefault()); @@ -1924,6 +1926,8 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) this.apmTracingEnabled = configProvider.getBoolean(GeneralConfig.APM_TRACING_ENABLED, true); + this.jdkSocketEnabled = configProvider.getBoolean(JDK_SOCKET_ENABLED, true); + log.debug("New instance: {}", this); } @@ -3466,6 +3470,10 @@ public boolean isApmTracingEnabled() { return apmTracingEnabled; } + public boolean isJdkSocketEnabled() { + return jdkSocketEnabled; + } + /** @return A map of tags to be applied only to the local application root span. */ public Map getLocalRootSpanTags() { final Map runtimeTags = getRuntimeTags(); @@ -4703,6 +4711,8 @@ public String toString() { + dataJobsCommandPattern + ", apmTracingEnabled=" + apmTracingEnabled + + ", jdkSocketEnabled=" + + jdkSocketEnabled + ", cloudRequestPayloadTagging=" + cloudRequestPayloadTagging + ", cloudResponsePayloadTagging=" diff --git a/internal-api/src/test/groovy/datadog/trace/api/ConfigTest.groovy b/internal-api/src/test/groovy/datadog/trace/api/ConfigTest.groovy index b3e2a3d9b09..a553488676b 100644 --- a/internal-api/src/test/groovy/datadog/trace/api/ConfigTest.groovy +++ b/internal-api/src/test/groovy/datadog/trace/api/ConfigTest.groovy @@ -50,6 +50,7 @@ import static datadog.trace.api.config.GeneralConfig.GLOBAL_TAGS import static datadog.trace.api.config.GeneralConfig.HEALTH_METRICS_ENABLED import static datadog.trace.api.config.GeneralConfig.HEALTH_METRICS_STATSD_HOST import static datadog.trace.api.config.GeneralConfig.HEALTH_METRICS_STATSD_PORT +import static datadog.trace.api.config.GeneralConfig.JDK_SOCKET_ENABLED import static datadog.trace.api.config.GeneralConfig.PERF_METRICS_ENABLED import static datadog.trace.api.config.GeneralConfig.SERVICE_NAME import static datadog.trace.api.config.GeneralConfig.SITE @@ -257,6 +258,7 @@ class ConfigTest extends DDSpecification { prop.setProperty(DYNAMIC_INSTRUMENTATION_EXCLUDE_FILES, "exclude file") prop.setProperty(EXCEPTION_REPLAY_ENABLED, "true") prop.setProperty(TRACE_X_DATADOG_TAGS_MAX_LENGTH, "128") + prop.setProperty(JDK_SOCKET_ENABLED, "false") when: Config config = Config.get(prop) @@ -348,6 +350,7 @@ class ConfigTest extends DDSpecification { config.dynamicInstrumentationInstrumentTheWorld == true config.dynamicInstrumentationExcludeFiles == "exclude file" config.debuggerExceptionEnabled == true + config.jdkSocketEnabled == false config.xDatadogTagsMaxLength == 128 } diff --git a/utils/socket-utils/src/main/java/datadog/common/socket/UnixDomainSocketFactory.java b/utils/socket-utils/src/main/java/datadog/common/socket/UnixDomainSocketFactory.java index bf4001d0a03..8e141b939f2 100644 --- a/utils/socket-utils/src/main/java/datadog/common/socket/UnixDomainSocketFactory.java +++ b/utils/socket-utils/src/main/java/datadog/common/socket/UnixDomainSocketFactory.java @@ -38,7 +38,7 @@ public UnixDomainSocketFactory(final File path) { @Override public Socket createSocket() throws IOException { try { - if (JDK_SUPPORTS_UDS) { + if (JDK_SUPPORTS_UDS && Config.get().isJdkSocketEnabled()) { try { return new TunnelingJdkSocket(path.toPath()); } catch (Throwable ignore) { diff --git a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java index 1441e6090e5..05cf96e94d8 100644 --- a/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java +++ b/utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.junit.jupiter.api.Assertions.fail; +import datadog.trace.api.Config; import java.io.IOException; import java.net.InetSocketAddress; import java.net.StandardProtocolFamily; @@ -21,6 +22,12 @@ public class TunnelingJdkSocketTest { @Test public void testTimeout() throws Exception { + if (!Config.get().isJdkSocketEnabled()) { + System.out.println( + "TunnelingJdkSocket usage is disabled. Enable it by setting the property 'JDK_SOCKET_ENABLED' to 'true'."); + return; + } + int testTimeout = 3000; Path socketPath = getSocketPath(); UnixDomainSocketAddress socketAddress = UnixDomainSocketAddress.of(socketPath);