Skip to content

Commit 38a5ccd

Browse files
committed
Update getInputStream to use a selector.
1 parent b200274 commit 38a5ccd

File tree

2 files changed

+44
-2
lines changed

2 files changed

+44
-2
lines changed

utils/socket-utils/src/main/java17/datadog/common/socket/TunnelingJdkSocket.java

+43-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99
import java.net.SocketAddress;
1010
import java.net.SocketException;
1111
import java.net.UnixDomainSocketAddress;
12+
import java.nio.ByteBuffer;
1213
import java.nio.channels.Channels;
14+
import java.nio.channels.SelectionKey;
15+
import java.nio.channels.Selector;
1316
import java.nio.channels.SocketChannel;
1417
import java.nio.file.Path;
1518

@@ -118,7 +121,46 @@ public InputStream getInputStream() throws IOException {
118121
if (isInputShutdown()) {
119122
throw new SocketException("Socket input is shutdown");
120123
}
121-
return Channels.newInputStream(unixSocketChannel);
124+
125+
Selector selector = Selector.open();
126+
unixSocketChannel.configureBlocking(false);
127+
unixSocketChannel.register(selector, SelectionKey.OP_READ);
128+
ByteBuffer buffer = ByteBuffer.allocate(256); // arbitrary buffer size for now
129+
130+
try {
131+
if (selector.select(timeout) == 0) {
132+
System.out.println("Timeout (" + timeout + "ms) while waiting for data.");
133+
}
134+
for (SelectionKey key : selector.selectedKeys()) {
135+
if (key.isReadable()) {
136+
int r = unixSocketChannel.read(buffer);
137+
if (r == -1) {
138+
unixSocketChannel.close();
139+
System.out.println("Not accepting client messages anymore.");
140+
}
141+
}
142+
}
143+
buffer.flip();
144+
} finally {
145+
selector.close();
146+
}
147+
148+
return new InputStream() {
149+
@Override
150+
public int read() {
151+
return buffer.hasRemaining() ? (buffer.get() & 0xFF) : -1;
152+
}
153+
154+
@Override
155+
public int read(byte[] bytes, int off, int len) {
156+
if (!buffer.hasRemaining()) {
157+
return -1;
158+
}
159+
len = Math.min(len, buffer.remaining());
160+
buffer.get(bytes, off, len);
161+
return len;
162+
}
163+
};
122164
}
123165

124166
@Override

utils/socket-utils/src/test/java/datadog/common/socket/TunnelingJdkSocketTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public void testTimeout() throws Exception {
3131
// create client
3232
TunnelingJdkSocket clientSocket = createClient(socketPath);
3333

34-
// expect a failure after three seconds because timeout is not supported yet
34+
// will fail after three seconds if timeout (set to one second) is not supported
3535
assertTimeoutPreemptively(Duration.ofMillis(3000), () -> clientSocket.getInputStream().read());
3636

3737
// clean up

0 commit comments

Comments
 (0)