From 561f192644ffbd0e8bcb145f80ee9883c69694f1 Mon Sep 17 00:00:00 2001 From: Laplie Anderson Date: Wed, 9 Apr 2025 12:19:13 -0400 Subject: [PATCH 1/7] Make crash tracking smoke test more resiliant --- .../datadog/smoketest/CrashTelemetryData.java | 13 + .../smoketest/CrashtrackingSmokeTest.java | 261 ++++++------------ .../smoketest/MinimalTelemetryData.java | 5 + .../java/datadog/smoketest/TestUDPServer.java | 132 +++++++++ .../datadog/smoketest/ProcessManager.groovy | 129 +-------- .../java/datadog/smoketest/OutputThreads.java | 170 ++++++++++++ 6 files changed, 408 insertions(+), 302 deletions(-) create mode 100644 dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashTelemetryData.java create mode 100644 dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/MinimalTelemetryData.java create mode 100644 dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/TestUDPServer.java create mode 100644 dd-smoke-tests/src/main/java/datadog/smoketest/OutputThreads.java diff --git a/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashTelemetryData.java b/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashTelemetryData.java new file mode 100644 index 00000000000..21f6df81f01 --- /dev/null +++ b/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashTelemetryData.java @@ -0,0 +1,13 @@ +package datadog.smoketest; + +import java.util.List; + +public class CrashTelemetryData extends MinimalTelemetryData { + List payload; + + public static class LogMessage { + public String message; + public String level; + public String tags; + } +} diff --git a/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashtrackingSmokeTest.java b/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashtrackingSmokeTest.java index 469f86f0620..e6c0b8c01c9 100644 --- a/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashtrackingSmokeTest.java +++ b/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashtrackingSmokeTest.java @@ -1,24 +1,33 @@ package datadog.smoketest; import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assumptions.assumeFalse; +import com.squareup.moshi.JsonAdapter; +import com.squareup.moshi.Moshi; import datadog.trace.api.Platform; -import java.io.BufferedReader; import java.io.File; -import java.io.InputStreamReader; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Comparator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import okhttp3.mockwebserver.Dispatcher; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -29,34 +38,70 @@ * that ships with OS X by default. */ public class CrashtrackingSmokeTest { + private static final long DATA_TIMEOUT_MS = 10 * 1000; + private static Path LOG_FILE_DIR; private MockWebServer tracingServer; + private TestUDPServer udpServer; + private BlockingQueue crashEvents = new LinkedBlockingQueue<>(); @BeforeAll static void setupAll() { // Only Hotspot based implementation are supported assumeFalse(Platform.isJ9()); + + LOG_FILE_DIR = Paths.get(System.getProperty("datadog.smoketest.builddir"), "reports"); } private Path tempDir; + private static OutputThreads outputThreads = new OutputThreads(); @BeforeEach void setup() throws Exception { tempDir = Files.createTempDirectory("dd-smoketest-"); + crashEvents.clear(); + + Moshi moshi = new Moshi.Builder().build(); tracingServer = new MockWebServer(); tracingServer.setDispatcher( new Dispatcher() { @Override public MockResponse dispatch(final RecordedRequest request) throws InterruptedException { + String data = request.getBody().readString(StandardCharsets.UTF_8); + + if ("/telemetry/proxy/api/v2/apmtelemetry".equals(request.getPath())) { + try { + JsonAdapter adapter = + moshi.adapter(MinimalTelemetryData.class); + MinimalTelemetryData minimal = adapter.fromJson(data); + if ("logs".equals(minimal.request_type)) { + JsonAdapter crashAdapter = + moshi.adapter(CrashTelemetryData.class); + crashEvents.add(crashAdapter.fromJson(data)); + } + } catch (IOException e) { + System.out.println("Unable to parse " + e); + } + } + System.out.println("URL ====== " + request.getPath()); + System.out.println(data); + return new MockResponse().setResponseCode(200); } }); - // tracingServer.start(8126); + + udpServer = new TestUDPServer(); + udpServer.start(); + + synchronized (outputThreads.testLogMessages) { + outputThreads.testLogMessages.clear(); + } } @AfterEach void teardown() throws Exception { tracingServer.shutdown(); + udpServer.close(); try (Stream fileStream = Files.walk(tempDir)) { fileStream.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); @@ -64,6 +109,11 @@ void teardown() throws Exception { Files.deleteIfExists(tempDir); } + @AfterAll + static void shutdown() { + outputThreads.close(); + } + private static String javaPath() { final String separator = FileSystems.getDefault().getSeparator(); return System.getProperty("java.home") + separator + "bin" + separator + "java"; @@ -108,52 +158,13 @@ void testCrashTracking() throws Exception { appShadowJar(), script.toString())); pb.environment().put("DD_TRACE_AGENT_PORT", String.valueOf(tracingServer.getPort())); - StringBuilder stdoutStr = new StringBuilder(); - StringBuilder stderrStr = new StringBuilder(); Process p = pb.start(); - Thread stdout = - new Thread( - () -> { - try (BufferedReader br = - new BufferedReader(new InputStreamReader(p.getInputStream()))) { - br.lines() - .forEach( - l -> { - System.out.println(l); - stdoutStr.append(l).append('\n'); - }); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - Thread stderr = - new Thread( - () -> { - try (BufferedReader br = - new BufferedReader(new InputStreamReader(p.getErrorStream()))) { - br.lines() - .forEach( - l -> { - System.err.println(l); - stderrStr.append(l).append('\n'); - }); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - stdout.setDaemon(true); - stderr.setDaemon(true); - stdout.start(); - stderr.start(); + outputThreads.captureOutput( + p, LOG_FILE_DIR.resolve("testProcess.testCrashTracking.log").toFile()); assertNotEquals(0, p.waitFor(), "Application should have crashed"); - - assertThat(stdoutStr.toString(), containsString(" was uploaded successfully")); - assertThat( - stderrStr.toString(), - containsString( - "com.datadog.crashtracking.CrashUploader - Successfully uploaded the crash files")); + assertCrashData(); } /* @@ -183,52 +194,14 @@ void testCrashTrackingLegacy() throws Exception { appShadowJar(), script.toString())); pb.environment().put("DD_TRACE_AGENT_PORT", String.valueOf(tracingServer.getPort())); - StringBuilder stdoutStr = new StringBuilder(); - StringBuilder stderrStr = new StringBuilder(); Process p = pb.start(); - Thread stdout = - new Thread( - () -> { - try (BufferedReader br = - new BufferedReader(new InputStreamReader(p.getInputStream()))) { - br.lines() - .forEach( - l -> { - System.out.println(l); - stdoutStr.append(l).append('\n'); - }); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - Thread stderr = - new Thread( - () -> { - try (BufferedReader br = - new BufferedReader(new InputStreamReader(p.getErrorStream()))) { - br.lines() - .forEach( - l -> { - System.err.println(l); - stderrStr.append(l).append('\n'); - }); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - stdout.setDaemon(true); - stderr.setDaemon(true); - stdout.start(); - stderr.start(); + outputThreads.captureOutput( + p, LOG_FILE_DIR.resolve("testProcess.testCrashTrackingLegacy.log").toFile()); assertNotEquals(0, p.waitFor(), "Application should have crashed"); - assertThat(stdoutStr.toString(), containsString(" was uploaded successfully")); - assertThat( - stderrStr.toString(), - containsString( - "com.datadog.crashtracking.CrashUploader - Successfully uploaded the crash files")); + assertCrashData(); } /* @@ -255,51 +228,14 @@ void testOomeTracking() throws Exception { "-jar", appShadowJar(), script.toString())); - StringBuilder stdoutStr = new StringBuilder(); - StringBuilder stderrStr = new StringBuilder(); Process p = pb.start(); - Thread stdout = - new Thread( - () -> { - try (BufferedReader br = - new BufferedReader(new InputStreamReader(p.getInputStream()))) { - br.lines() - .forEach( - l -> { - System.out.println(l); - stdoutStr.append(l).append('\n'); - }); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - Thread stderr = - new Thread( - () -> { - try (BufferedReader br = - new BufferedReader(new InputStreamReader(p.getErrorStream()))) { - br.lines() - .forEach( - l -> { - System.err.println(l); - stderrStr.append(l).append('\n'); - }); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - stdout.setDaemon(true); - stderr.setDaemon(true); - stdout.start(); - stderr.start(); + outputThreads.captureOutput( + p, LOG_FILE_DIR.resolve("testProcess.testOomeTracking.log").toFile()); + pb.environment().put("DD_DOGSTATSD_PORT", String.valueOf(udpServer.getPort())); assertNotEquals(0, p.waitFor(), "Application should have crashed"); - - assertThat( - stderrStr.toString(), - containsString("com.datadog.crashtracking.OOMENotifier - OOME event sent")); - assertThat(stdoutStr.toString(), containsString("OOME Event generated successfully")); + assertOOMEvent(); } @Test @@ -326,58 +262,33 @@ void testCombineTracking() throws Exception { appShadowJar(), oomeScript.toString())); pb.environment().put("DD_TRACE_AGENT_PORT", String.valueOf(tracingServer.getPort())); - StringBuilder stdoutStr = new StringBuilder(); - StringBuilder stderrStr = new StringBuilder(); + pb.environment().put("DD_DOGSTATSD_PORT", String.valueOf(udpServer.getPort())); Process p = pb.start(); - Thread stdout = - new Thread( - () -> { - try (BufferedReader br = - new BufferedReader(new InputStreamReader(p.getInputStream()))) { - br.lines() - .forEach( - l -> { - System.out.println(l); - stdoutStr.append(l).append('\n'); - }); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - Thread stderr = - new Thread( - () -> { - try (BufferedReader br = - new BufferedReader(new InputStreamReader(p.getErrorStream()))) { - br.lines() - .forEach( - l -> { - System.err.println(l); - stderrStr.append(l).append('\n'); - }); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - stdout.setDaemon(true); - stderr.setDaemon(true); - stdout.start(); - stderr.start(); + outputThreads.captureOutput( + p, LOG_FILE_DIR.resolve("testProcess.testCombineTracking.log").toFile()); assertNotEquals(0, p.waitFor(), "Application should have crashed"); - // Crash uploader did get triggered - assertThat(stdoutStr.toString(), containsString(" was uploaded successfully")); - assertThat( - stderrStr.toString(), - containsString( - "com.datadog.crashtracking.CrashUploader - Successfully uploaded the crash files")); - - // OOME notifier did get triggered - assertThat( - stderrStr.toString(), - containsString("com.datadog.crashtracking.OOMENotifier - OOME event sent")); - assertThat(stdoutStr.toString(), containsString("OOME Event generated successfully")); + assertCrashData(); + assertOOMEvent(); + } + + private void assertCrashData() throws InterruptedException { + CrashTelemetryData crashData = crashEvents.poll(DATA_TIMEOUT_MS, TimeUnit.MILLISECONDS); + assertNotNull(crashData, "Crash data not uploaded"); + assertThat(crashData.payload.get(0).message, containsString("OutOfMemory")); + assertThat(crashData.payload.get(0).tags, containsString("severity:crash")); + } + + private void assertOOMEvent() throws InterruptedException { + byte[] data = udpServer.getMessages().poll(DATA_TIMEOUT_MS, TimeUnit.MILLISECONDS); + assertNotNull(data, "OOM Event not received"); + String event = new String(data); + + assertThat(event, startsWith("_e")); + assertThat(event, containsString(":OutOfMemoryError")); + assertThat(event, containsString("t:error")); + assertThat(event, containsString("s:java")); } } diff --git a/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/MinimalTelemetryData.java b/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/MinimalTelemetryData.java new file mode 100644 index 00000000000..4940d794de9 --- /dev/null +++ b/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/MinimalTelemetryData.java @@ -0,0 +1,5 @@ +package datadog.smoketest; + +public class MinimalTelemetryData { + String request_type; +} diff --git a/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/TestUDPServer.java b/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/TestUDPServer.java new file mode 100644 index 00000000000..6668c78a41d --- /dev/null +++ b/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/TestUDPServer.java @@ -0,0 +1,132 @@ +package datadog.smoketest; + +import java.io.Closeable; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.util.Arrays; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** Simple test UDP Server. Not for production use but good enough for tests */ +public class TestUDPServer implements Closeable { + public static final int DEFAULT_TIMEOUT_MS = 30 * 1000; + public static final int DEFAULT_PACKET_SIZE = 2000; + + private static final byte[] END_MESSAGE = "END____".getBytes(); + + private final BlockingQueue dataPackets = new LinkedBlockingQueue<>(); + private final int timeout; + private final int packetSize; + private final int port; + + private volatile boolean closed = false; + private volatile boolean closing = false; + private DatagramSocket socket; + private Thread readerThread; + + public TestUDPServer() { + this(DEFAULT_TIMEOUT_MS, DEFAULT_PACKET_SIZE, 0); + } + + public TestUDPServer(int timeout, int packetSize, int port) { + this.timeout = timeout; + this.packetSize = packetSize; + this.port = port; + } + + public synchronized void start() throws SocketException { + if (closed) { + throw new IllegalStateException("Server closed"); + } + if (socket != null) { + // already started + return; + } + + socket = new DatagramSocket(port); + socket.setSoTimeout(timeout); + readerThread = + new Thread( + () -> { + while (!closed && !closing) { + byte[] data = new byte[packetSize]; + try { + DatagramPacket packet = new DatagramPacket(data, packetSize); + socket.receive(packet); + + byte[] trimmedData = new byte[packet.getLength()]; + System.arraycopy( + packet.getData(), packet.getOffset(), trimmedData, 0, packet.getLength()); + + if (Arrays.equals(trimmedData, END_MESSAGE)) { + System.out.println("[TestUDPServer] Received message to close"); + break; + } + System.out.println( + "[TestUDPServer] Received message: " + new String(trimmedData)); + dataPackets.add(trimmedData); + } catch (SocketTimeoutException e) { + System.out.println("[TestUDPServer] Timeout waiting for message"); + // ignore no data sent + } catch (IOException e) { + System.out.println("[TestUDPServer] Error in receiving packet " + e.getMessage()); + e.printStackTrace(); + break; + } + } + closed = true; + }, + "Test UDP Server Receiver"); + + readerThread.setDaemon(true); + readerThread.start(); + } + + @Override + public synchronized void close() { + if (closed) { + // Already closed + return; + } + if (socket == null) { + throw new IllegalStateException("Socket not open"); + } + + closing = true; + + try (DatagramSocket clientSocket = new DatagramSocket()) { + clientSocket.send( + new DatagramPacket( + END_MESSAGE, END_MESSAGE.length, InetAddress.getByName("localhost"), getPort())); + } catch (IOException e) { + System.out.println( + "[TestUDPServer] Exception sending close message. Will rely on socket timeout"); + e.printStackTrace(); + } + + // Closed state is set by the reader thread. Wait for it to finish + try { + readerThread.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + public int getPort() { + if (port != 0) { + return port; + } else if (socket != null) { + return socket.getLocalPort(); + } else { + throw new IllegalStateException("Socket not open and port not explicitly set"); + } + } + + public BlockingQueue getMessages() { + return dataPackets; + } +} diff --git a/dd-smoke-tests/src/main/groovy/datadog/smoketest/ProcessManager.groovy b/dd-smoke-tests/src/main/groovy/datadog/smoketest/ProcessManager.groovy index fdcc9db82be..9d7fe392fd7 100644 --- a/dd-smoke-tests/src/main/groovy/datadog/smoketest/ProcessManager.groovy +++ b/dd-smoke-tests/src/main/groovy/datadog/smoketest/ProcessManager.groovy @@ -7,13 +7,7 @@ import spock.lang.AutoCleanup import spock.lang.Shared import spock.lang.Specification -import java.nio.ByteBuffer import java.nio.CharBuffer -import java.nio.channels.Channels -import java.nio.channels.ReadableByteChannel -import java.nio.channels.WritableByteChannel -import java.nio.charset.CharsetDecoder -import java.nio.charset.StandardCharsets import java.nio.file.Files import java.nio.file.Paths import java.util.concurrent.TimeoutException @@ -78,93 +72,6 @@ abstract class ProcessManager extends Specification { @AutoCleanup OutputThreads outputThreads = new OutputThreads() - class OutputThreads implements Closeable { - final ThreadGroup tg = new ThreadGroup("smoke-output") - final List testLogMessages = new ArrayList<>() - - void close() { - tg.interrupt() - Thread[] threads = new Thread[tg.activeCount()] - tg.enumerate(threads) - threads*.join() - } - - @CompileStatic - class ProcessOutputRunnable implements Runnable { - final ReadableByteChannel rc - ByteBuffer buffer = ByteBuffer.allocate(MAX_LINE_SIZE) - final WritableByteChannel wc - CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder() - - ProcessOutputRunnable(InputStream is, File output) { - rc = Channels.newChannel(is) - wc = Channels.newChannel(new FileOutputStream(output)) - } - - @Override - void run() { - boolean online = true - while (online) { - // we may have data in the buffer we did not consume for line splitting purposes - int skip = buffer.position() - - try { - if (rc.read(buffer) == -1) { - online = false - } - } catch (IOException ioe) { - online = false - } - - buffer.flip() - // write to log file - wc.write(buffer.duplicate().position(skip) as ByteBuffer) - - // subBuff will always start at the beginning of the next (potential) line - ByteBuffer subBuff = buffer.duplicate() - int consumed = 0 - while (true) { - boolean hasRemaining = subBuff.hasRemaining() - if (hasRemaining) { - int c = subBuff.get() - if (c != '\n' && c != '\r') { - continue - } - // found line end - } else if (online && consumed > 0) { - break - // did not find line end, but we already consumed a line - // save the data for the next read iteration - } // else we did not consume any line, or there will be no further reads. - // Treat the buffer as single line despite lack of terminator - - consumed += subBuff.position() - String line = decoder.decode(subBuff.duplicate().flip() as ByteBuffer).toString().trim() - if (line != '') { - synchronized (testLogMessages) { - testLogMessages << line - testLogMessages.notifyAll() - } - } - - if (hasRemaining) { - subBuff = subBuff.slice() - } else { - break - } - } - - buffer.position(consumed) - buffer.compact() - } - } - } - - void captureOutput(Process p, File outputFile) { - new Thread(tg, new ProcessOutputRunnable(p.inputStream, outputFile)).start() - } - } - def setupSpec() { if (buildDirectory == null || shadowJarPath == null) { throw new AssertionError("Expected system properties not found. Smoke tests have to be run from Gradle. Please make sure that is the case.") @@ -333,37 +240,7 @@ abstract class ProcessManager extends Specification { * @param checker should return true if a match is found */ void processTestLogLines(Closure checker) { - int l = 0 - def tlm = outputThreads.testLogMessages - long waitStart - - while (true) { - String msg - synchronized (tlm) { - if (l >= tlm.size()) { - long waitTime - if (waitStart != 0) { - waitTime = 5000 - (System.currentTimeMillis() - waitStart) - if (waitTime < 0) { - throw new TimeoutException() - } - } else { - waitStart = System.currentTimeMillis() - waitTime = 5000 - } - tlm.wait(waitTime) - } - if (l >= tlm.size()) { - throw new TimeoutException() - } - // the array is only cleared at the end of the test, so index l exists - msg = tlm.get(l++) - } - - if (checker(msg)) { - break - } - } + outputThreads.processTestLogLines {return checker(it) } } protected void beforeProcessBuilders() {} @@ -383,13 +260,11 @@ abstract class ProcessManager extends Specification { return "01234567890abcdef123456789ABCDEF" } - static final int MAX_LINE_SIZE = 1024 * 1024 - @CompileStatic @SuppressForbidden private static void eachLine(File file, Closure closure) { def reader = new InputStreamReader(new FileInputStream(file)) - CharBuffer buffer = CharBuffer.allocate(MAX_LINE_SIZE) + CharBuffer buffer = CharBuffer.allocate(OutputThreads.MAX_LINE_SIZE) while (reader.read(buffer) != -1) { buffer.flip() while (buffer.hasRemaining()) { diff --git a/dd-smoke-tests/src/main/java/datadog/smoketest/OutputThreads.java b/dd-smoke-tests/src/main/java/datadog/smoketest/OutputThreads.java new file mode 100644 index 00000000000..5201988c894 --- /dev/null +++ b/dd-smoke-tests/src/main/java/datadog/smoketest/OutputThreads.java @@ -0,0 +1,170 @@ +package datadog.smoketest; + +import java.io.Closeable; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; + +public class OutputThreads implements Closeable { + private static final long THREAD_JOIN_TIMEOUT_MILLIS = 10 * 1000; + public static final int MAX_LINE_SIZE = 1024 * 1024; + + final ThreadGroup tg = new ThreadGroup("smoke-output"); + final List testLogMessages = new ArrayList<>(); + + public void close() { + tg.interrupt(); + Thread[] threads = new Thread[tg.activeCount()]; + tg.enumerate(threads); + + for (Thread thread : threads) { + try { + thread.join(THREAD_JOIN_TIMEOUT_MILLIS); + } catch (InterruptedException e) { + // ignore + } + } + } + + class ProcessOutputRunnable implements Runnable { + final ReadableByteChannel rc; + ByteBuffer buffer = ByteBuffer.allocate(MAX_LINE_SIZE); + final WritableByteChannel wc; + CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder(); + + ProcessOutputRunnable(InputStream is, File output) throws FileNotFoundException { + rc = Channels.newChannel(is); + wc = Channels.newChannel(new FileOutputStream(output)); + } + + @Override + public void run() { + boolean online = true; + while (online) { + // we may have data in the buffer we did not consume for line splitting purposes + int skip = buffer.position(); + + try { + if (rc.read(buffer) == -1) { + online = false; + } + } catch (IOException ioe) { + online = false; + } + + buffer.flip(); + // write to log file + try { + wc.write((ByteBuffer) buffer.duplicate().position(skip)); + } catch (IOException e) { + System.out.println("ERROR WRITING TO LOG FILE: " + e.getMessage()); + e.printStackTrace(); + return; + } + + // subBuff will always start at the beginning of the next (potential) line + ByteBuffer subBuff = buffer.duplicate(); + int consumed = 0; + while (true) { + boolean hasRemaining = subBuff.hasRemaining(); + if (hasRemaining) { + int c = subBuff.get(); + if (c != '\n' && c != '\r') { + continue; + } + // found line end + } else if (online && consumed > 0) { + break; + // did not find line end, but we already consumed a line + // save the data for the next read iteration + } // else we did not consume any line, or there will be no further reads. + // Treat the buffer as single line despite lack of terminator + + consumed += subBuff.position(); + String line = null; + try { + line = decoder.decode((ByteBuffer) subBuff.duplicate().flip()).toString().trim(); + } catch (CharacterCodingException e) { + throw new RuntimeException(e); + } + + if (!line.isEmpty()) { + synchronized (testLogMessages) { + testLogMessages.add(line); + testLogMessages.notifyAll(); + } + } + + if (hasRemaining) { + subBuff = subBuff.slice(); + } else { + break; + } + } + + buffer.position(consumed); + buffer.compact(); + } + } + } + + public void captureOutput(Process p, File outputFile) throws FileNotFoundException { + new Thread(tg, new ProcessOutputRunnable(p.getInputStream(), outputFile)).start(); + } + + /** + * Tries to find a log line that matches the given predicate. After reading all the log lines + * already collected, it will wait up to 5 seconds for a new line matching the predicate. + * + * @param checker should return true if a match is found + */ + public boolean processTestLogLines(Function checker) throws TimeoutException { + int l = 0; + long waitStart = 0; + + while (true) { + String msg; + synchronized (testLogMessages) { + if (l >= testLogMessages.size()) { + long waitTime; + if (waitStart != 0) { + waitTime = 10000 - (System.currentTimeMillis() - waitStart); + if (waitTime < 0) { + throw new TimeoutException(); + } + } else { + waitStart = System.currentTimeMillis(); + waitTime = 10000; + } + try { + testLogMessages.wait(waitTime); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + if (l >= testLogMessages.size()) { + throw new TimeoutException(); + } + // the array is only cleared at the end of the test, so index l exists + msg = testLogMessages.get(l++); + } + + if (checker.apply(msg)) { + return true; + } + } + } +} From 551f88e69b54d4c2e2b5e2efb8723272b4a12bdf Mon Sep 17 00:00:00 2001 From: Laplie Anderson Date: Wed, 9 Apr 2025 13:17:53 -0400 Subject: [PATCH 2/7] loop through messages until the correct event --- .../smoketest/CrashtrackingSmokeTest.java | 11 ++++++----- .../java/datadog/smoketest/TestUDPServer.java | 16 ++++++++-------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashtrackingSmokeTest.java b/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashtrackingSmokeTest.java index e6c0b8c01c9..9e0f3cd9f5a 100644 --- a/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashtrackingSmokeTest.java +++ b/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashtrackingSmokeTest.java @@ -1,7 +1,6 @@ package datadog.smoketest; import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -282,11 +281,13 @@ private void assertCrashData() throws InterruptedException { } private void assertOOMEvent() throws InterruptedException { - byte[] data = udpServer.getMessages().poll(DATA_TIMEOUT_MS, TimeUnit.MILLISECONDS); - assertNotNull(data, "OOM Event not received"); - String event = new String(data); + String event; + do { + event = udpServer.getMessages().poll(DATA_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } while (event != null && !event.startsWith("_e")); + + assertNotNull(event, "OOM Event not received"); - assertThat(event, startsWith("_e")); assertThat(event, containsString(":OutOfMemoryError")); assertThat(event, containsString("t:error")); assertThat(event, containsString("s:java")); diff --git a/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/TestUDPServer.java b/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/TestUDPServer.java index 6668c78a41d..a748b907467 100644 --- a/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/TestUDPServer.java +++ b/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/TestUDPServer.java @@ -18,7 +18,7 @@ public class TestUDPServer implements Closeable { private static final byte[] END_MESSAGE = "END____".getBytes(); - private final BlockingQueue dataPackets = new LinkedBlockingQueue<>(); + private final BlockingQueue dataPackets = new LinkedBlockingQueue<>(); private final int timeout; private final int packetSize; private final int port; @@ -63,17 +63,17 @@ public synchronized void start() throws SocketException { packet.getData(), packet.getOffset(), trimmedData, 0, packet.getLength()); if (Arrays.equals(trimmedData, END_MESSAGE)) { - System.out.println("[TestUDPServer] Received message to close"); + System.err.println("[TestUDPServer] Received message to close"); break; } - System.out.println( + System.err.println( "[TestUDPServer] Received message: " + new String(trimmedData)); - dataPackets.add(trimmedData); + dataPackets.add(new String(trimmedData)); } catch (SocketTimeoutException e) { - System.out.println("[TestUDPServer] Timeout waiting for message"); + System.err.println("[TestUDPServer] Timeout waiting for message"); // ignore no data sent } catch (IOException e) { - System.out.println("[TestUDPServer] Error in receiving packet " + e.getMessage()); + System.err.println("[TestUDPServer] Error in receiving packet " + e.getMessage()); e.printStackTrace(); break; } @@ -103,7 +103,7 @@ public synchronized void close() { new DatagramPacket( END_MESSAGE, END_MESSAGE.length, InetAddress.getByName("localhost"), getPort())); } catch (IOException e) { - System.out.println( + System.err.println( "[TestUDPServer] Exception sending close message. Will rely on socket timeout"); e.printStackTrace(); } @@ -126,7 +126,7 @@ public int getPort() { } } - public BlockingQueue getMessages() { + public BlockingQueue getMessages() { return dataPackets; } } From 3df5bbfe7a0cc467e4412a9da2829db047e5e776 Mon Sep 17 00:00:00 2001 From: Laplie Anderson Date: Wed, 9 Apr 2025 15:35:30 -0400 Subject: [PATCH 3/7] some debug --- .../test/java/datadog/smoketest/CrashtrackingSmokeTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashtrackingSmokeTest.java b/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashtrackingSmokeTest.java index 9e0f3cd9f5a..3fe8800f954 100644 --- a/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashtrackingSmokeTest.java +++ b/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashtrackingSmokeTest.java @@ -37,7 +37,7 @@ * that ships with OS X by default. */ public class CrashtrackingSmokeTest { - private static final long DATA_TIMEOUT_MS = 10 * 1000; + private static final long DATA_TIMEOUT_MS = 25 * 1000; private static Path LOG_FILE_DIR; private MockWebServer tracingServer; private TestUDPServer udpServer; @@ -232,6 +232,7 @@ void testOomeTracking() throws Exception { outputThreads.captureOutput( p, LOG_FILE_DIR.resolve("testProcess.testOomeTracking.log").toFile()); pb.environment().put("DD_DOGSTATSD_PORT", String.valueOf(udpServer.getPort())); + System.out.println("Set port to: " + pb.environment().get("DD_DOGSTATSD_PORT")); assertNotEquals(0, p.waitFor(), "Application should have crashed"); assertOOMEvent(); @@ -263,6 +264,8 @@ void testCombineTracking() throws Exception { pb.environment().put("DD_TRACE_AGENT_PORT", String.valueOf(tracingServer.getPort())); pb.environment().put("DD_DOGSTATSD_PORT", String.valueOf(udpServer.getPort())); + System.out.println("Set port to: " + pb.environment().get("DD_DOGSTATSD_PORT")); + Process p = pb.start(); outputThreads.captureOutput( p, LOG_FILE_DIR.resolve("testProcess.testCombineTracking.log").toFile()); From e4b8c9ea93d4d3bdbeee627b6d73775ee4aeb210 Mon Sep 17 00:00:00 2001 From: Laplie Anderson Date: Thu, 10 Apr 2025 09:38:35 -0400 Subject: [PATCH 4/7] configurable dogstatsd port --- .../communication/monitor/DDAgentStatsDClientManager.java | 3 +-- internal-api/src/main/java/datadog/trace/api/Config.java | 7 +++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/communication/src/main/java/datadog/communication/monitor/DDAgentStatsDClientManager.java b/communication/src/main/java/datadog/communication/monitor/DDAgentStatsDClientManager.java index 5dea07a019e..43fd170ac7d 100644 --- a/communication/src/main/java/datadog/communication/monitor/DDAgentStatsDClientManager.java +++ b/communication/src/main/java/datadog/communication/monitor/DDAgentStatsDClientManager.java @@ -1,6 +1,5 @@ package datadog.communication.monitor; -import static datadog.trace.api.ConfigDefaults.DEFAULT_DOGSTATSD_PORT; import static datadog.trace.bootstrap.instrumentation.api.WriterConstants.LOGGING_WRITER_TYPE; import datadog.trace.api.Config; @@ -22,7 +21,7 @@ public static StatsDClientManager statsDClientManager() { return INSTANCE; } - private static final AtomicInteger defaultStatsDPort = new AtomicInteger(DEFAULT_DOGSTATSD_PORT); + private static final AtomicInteger defaultStatsDPort = new AtomicInteger(Config.get().getDogsStatsDPort()); public static void setDefaultStatsDPort(final int newPort) { if (newPort > 0 && defaultStatsDPort.getAndSet(newPort) != newPort) { diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index 69cec835552..0a6f84057b4 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -527,6 +527,7 @@ public static String getHostName() { private final List traceAgentArgs; private final String dogStatsDPath; private final List dogStatsDArgs; + private final int dogStatsDPort; private String env; private String version; @@ -1084,6 +1085,8 @@ private Config(final ConfigProvider configProvider, final InstrumenterConfig ins configProvider.getInteger( DOGSTATSD_START_DELAY, DEFAULT_DOGSTATSD_START_DELAY, JMX_FETCH_START_DELAY); + dogStatsDPort = configProvider.getInteger(DOGSTATSD_PORT, DEFAULT_DOGSTATSD_PORT); + statsDClientQueueSize = configProvider.getInteger(STATSD_CLIENT_QUEUE_SIZE); statsDClientSocketBuffer = configProvider.getInteger(STATSD_CLIENT_SOCKET_BUFFER); statsDClientSocketTimeout = configProvider.getInteger(STATSD_CLIENT_SOCKET_TIMEOUT); @@ -3528,6 +3531,10 @@ public List getDogStatsDArgs() { return dogStatsDArgs; } + public int getDogsStatsDPort() { + return dogStatsDPort; + } + public String getConfigFileStatus() { return configFileStatus; } From d6250d5207eea976c9d36d7a35ab07bbbb1c1c17 Mon Sep 17 00:00:00 2001 From: Laplie Anderson Date: Thu, 10 Apr 2025 18:17:07 -0400 Subject: [PATCH 5/7] use sleep instead of park nanos --- .../src/main/java/com/datadog/crashtracking/OOMENotifier.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/OOMENotifier.java b/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/OOMENotifier.java index c9728426a98..4898aee608c 100644 --- a/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/OOMENotifier.java +++ b/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/OOMENotifier.java @@ -13,7 +13,7 @@ public final class OOMENotifier { // This method is called via CLI so we don't need to be paranoid about the forbiddend APIs @SuppressForbidden - public static void sendOomeEvent(String taglist) { + public static void sendOomeEvent(String taglist) throws Exception { try (StatsDClient client = statsDClientManager().statsDClient(null, null, null, null, null, false)) { String[] tags = taglist.split(","); @@ -24,7 +24,7 @@ public static void sendOomeEvent(String taglist) { "Java process encountered out of memory error", tags); log.info("OOME event sent"); - LockSupport.parkNanos(2_000_000_000L); // wait 2s to allow statsd client flushing the event + Thread.sleep(2 * 1000); // wait 2s to allow statsd client flushing the event } } } From 2e0d7ba52e4f78ce953c7291db53bd812b669db4 Mon Sep 17 00:00:00 2001 From: Laplie Anderson Date: Thu, 10 Apr 2025 19:10:14 -0400 Subject: [PATCH 6/7] spotless --- .../src/main/java/com/datadog/crashtracking/OOMENotifier.java | 1 - 1 file changed, 1 deletion(-) diff --git a/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/OOMENotifier.java b/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/OOMENotifier.java index 4898aee608c..1c1b317074d 100644 --- a/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/OOMENotifier.java +++ b/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/OOMENotifier.java @@ -4,7 +4,6 @@ import datadog.trace.api.StatsDClient; import de.thetaphi.forbiddenapis.SuppressForbidden; -import java.util.concurrent.locks.LockSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From ac863f0f44f7b34c8175a4005461d095bbfc0a74 Mon Sep 17 00:00:00 2001 From: Laplie Anderson Date: Fri, 11 Apr 2025 12:32:29 -0400 Subject: [PATCH 7/7] cleanup --- .../datadog/crashtracking/OOMENotifier.java | 5 ++- .../smoketest/CrashtrackingSmokeTest.java | 40 ++++++++----------- .../java/datadog/smoketest/OutputThreads.java | 6 +++ 3 files changed, 26 insertions(+), 25 deletions(-) diff --git a/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/OOMENotifier.java b/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/OOMENotifier.java index 1c1b317074d..c9728426a98 100644 --- a/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/OOMENotifier.java +++ b/dd-java-agent/agent-crashtracking/src/main/java/com/datadog/crashtracking/OOMENotifier.java @@ -4,6 +4,7 @@ import datadog.trace.api.StatsDClient; import de.thetaphi.forbiddenapis.SuppressForbidden; +import java.util.concurrent.locks.LockSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -12,7 +13,7 @@ public final class OOMENotifier { // This method is called via CLI so we don't need to be paranoid about the forbiddend APIs @SuppressForbidden - public static void sendOomeEvent(String taglist) throws Exception { + public static void sendOomeEvent(String taglist) { try (StatsDClient client = statsDClientManager().statsDClient(null, null, null, null, null, false)) { String[] tags = taglist.split(","); @@ -23,7 +24,7 @@ public static void sendOomeEvent(String taglist) throws Exception { "Java process encountered out of memory error", tags); log.info("OOME event sent"); - Thread.sleep(2 * 1000); // wait 2s to allow statsd client flushing the event + LockSupport.parkNanos(2_000_000_000L); // wait 2s to allow statsd client flushing the event } } } diff --git a/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashtrackingSmokeTest.java b/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashtrackingSmokeTest.java index 3fe8800f954..54d9290941c 100644 --- a/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashtrackingSmokeTest.java +++ b/dd-smoke-tests/crashtracking/src/test/java/datadog/smoketest/CrashtrackingSmokeTest.java @@ -37,22 +37,22 @@ * that ships with OS X by default. */ public class CrashtrackingSmokeTest { - private static final long DATA_TIMEOUT_MS = 25 * 1000; - private static Path LOG_FILE_DIR; + private static final long DATA_TIMEOUT_MS = 10 * 1000; + private static final OutputThreads OUTPUT = new OutputThreads(); + private static final Path LOG_FILE_DIR = + Paths.get(System.getProperty("datadog.smoketest.builddir"), "reports"); + private MockWebServer tracingServer; private TestUDPServer udpServer; - private BlockingQueue crashEvents = new LinkedBlockingQueue<>(); + private final BlockingQueue crashEvents = new LinkedBlockingQueue<>(); @BeforeAll static void setupAll() { // Only Hotspot based implementation are supported assumeFalse(Platform.isJ9()); - - LOG_FILE_DIR = Paths.get(System.getProperty("datadog.smoketest.builddir"), "reports"); } private Path tempDir; - private static OutputThreads outputThreads = new OutputThreads(); @BeforeEach void setup() throws Exception { @@ -65,7 +65,9 @@ void setup() throws Exception { tracingServer.setDispatcher( new Dispatcher() { @Override - public MockResponse dispatch(final RecordedRequest request) throws InterruptedException { + public MockResponse dispatch(final RecordedRequest request) { + System.out.println("URL ====== " + request.getPath()); + String data = request.getBody().readString(StandardCharsets.UTF_8); if ("/telemetry/proxy/api/v2/apmtelemetry".equals(request.getPath())) { @@ -82,7 +84,7 @@ public MockResponse dispatch(final RecordedRequest request) throws InterruptedEx System.out.println("Unable to parse " + e); } } - System.out.println("URL ====== " + request.getPath()); + System.out.println(data); return new MockResponse().setResponseCode(200); @@ -92,9 +94,7 @@ public MockResponse dispatch(final RecordedRequest request) throws InterruptedEx udpServer = new TestUDPServer(); udpServer.start(); - synchronized (outputThreads.testLogMessages) { - outputThreads.testLogMessages.clear(); - } + OUTPUT.clearMessages(); } @AfterEach @@ -110,7 +110,7 @@ void teardown() throws Exception { @AfterAll static void shutdown() { - outputThreads.close(); + OUTPUT.close(); } private static String javaPath() { @@ -159,8 +159,7 @@ void testCrashTracking() throws Exception { pb.environment().put("DD_TRACE_AGENT_PORT", String.valueOf(tracingServer.getPort())); Process p = pb.start(); - outputThreads.captureOutput( - p, LOG_FILE_DIR.resolve("testProcess.testCrashTracking.log").toFile()); + OUTPUT.captureOutput(p, LOG_FILE_DIR.resolve("testProcess.testCrashTracking.log").toFile()); assertNotEquals(0, p.waitFor(), "Application should have crashed"); assertCrashData(); @@ -195,7 +194,7 @@ void testCrashTrackingLegacy() throws Exception { pb.environment().put("DD_TRACE_AGENT_PORT", String.valueOf(tracingServer.getPort())); Process p = pb.start(); - outputThreads.captureOutput( + OUTPUT.captureOutput( p, LOG_FILE_DIR.resolve("testProcess.testCrashTrackingLegacy.log").toFile()); assertNotEquals(0, p.waitFor(), "Application should have crashed"); @@ -227,12 +226,10 @@ void testOomeTracking() throws Exception { "-jar", appShadowJar(), script.toString())); + pb.environment().put("DD_DOGSTATSD_PORT", String.valueOf(udpServer.getPort())); Process p = pb.start(); - outputThreads.captureOutput( - p, LOG_FILE_DIR.resolve("testProcess.testOomeTracking.log").toFile()); - pb.environment().put("DD_DOGSTATSD_PORT", String.valueOf(udpServer.getPort())); - System.out.println("Set port to: " + pb.environment().get("DD_DOGSTATSD_PORT")); + OUTPUT.captureOutput(p, LOG_FILE_DIR.resolve("testProcess.testOomeTracking.log").toFile()); assertNotEquals(0, p.waitFor(), "Application should have crashed"); assertOOMEvent(); @@ -264,11 +261,8 @@ void testCombineTracking() throws Exception { pb.environment().put("DD_TRACE_AGENT_PORT", String.valueOf(tracingServer.getPort())); pb.environment().put("DD_DOGSTATSD_PORT", String.valueOf(udpServer.getPort())); - System.out.println("Set port to: " + pb.environment().get("DD_DOGSTATSD_PORT")); - Process p = pb.start(); - outputThreads.captureOutput( - p, LOG_FILE_DIR.resolve("testProcess.testCombineTracking.log").toFile()); + OUTPUT.captureOutput(p, LOG_FILE_DIR.resolve("testProcess.testCombineTracking.log").toFile()); assertNotEquals(0, p.waitFor(), "Application should have crashed"); diff --git a/dd-smoke-tests/src/main/java/datadog/smoketest/OutputThreads.java b/dd-smoke-tests/src/main/java/datadog/smoketest/OutputThreads.java index 5201988c894..7e369280cf7 100644 --- a/dd-smoke-tests/src/main/java/datadog/smoketest/OutputThreads.java +++ b/dd-smoke-tests/src/main/java/datadog/smoketest/OutputThreads.java @@ -167,4 +167,10 @@ public boolean processTestLogLines(Function checker) throws Tim } } } + + public void clearMessages() { + synchronized (testLogMessages) { + testLogMessages.clear(); + } + } }