Skip to content

Commit 6c10f7f

Browse files
committed
Copy 3.x PortAllocator
1 parent 44ccd12 commit 6c10f7f

File tree

1 file changed

+70
-0
lines changed

1 file changed

+70
-0
lines changed

core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,17 @@
5151
import io.netty.channel.ChannelInitializer;
5252
import io.netty.channel.ChannelOption;
5353
import io.netty.channel.ChannelPipeline;
54+
import java.io.IOException;
55+
import java.net.InetSocketAddress;
56+
import java.net.ServerSocket;
5457
import java.util.List;
5558
import java.util.Map;
5659
import java.util.Optional;
5760
import java.util.concurrent.CompletableFuture;
5861
import java.util.concurrent.CompletionStage;
5962
import java.util.concurrent.CopyOnWriteArrayList;
6063
import java.util.concurrent.atomic.AtomicBoolean;
64+
import java.util.concurrent.atomic.AtomicInteger;
6165
import net.jcip.annotations.ThreadSafe;
6266
import org.slf4j.Logger;
6367
import org.slf4j.LoggerFactory;
@@ -391,4 +395,70 @@ protected void initChannel(Channel channel) {
391395
}
392396
}
393397
}
398+
399+
static class PortAllocator {
400+
private static final AtomicInteger lastPort = new AtomicInteger(-1);
401+
402+
public static int getNextAvailablePort(int shardCount, int shardId, int lowPort, int highPort) {
403+
int lastPortValue, foundPort = -1;
404+
do {
405+
lastPortValue = lastPort.get();
406+
407+
// We will scan from lastPortValue
408+
// (or lowPort is there was no lastPort or lastPort is too low)
409+
int scanStart = lastPortValue == -1 ? lowPort : lastPortValue;
410+
if (scanStart < lowPort) {
411+
scanStart = lowPort;
412+
}
413+
414+
// Round it up to "% shardCount == shardId"
415+
scanStart += (shardCount - scanStart % shardCount) + shardId;
416+
417+
// Scan from scanStart upwards to highPort.
418+
for (int port = scanStart; port <= highPort; port += shardCount) {
419+
if (isTcpPortAvailable(port)) {
420+
foundPort = port;
421+
break;
422+
}
423+
}
424+
425+
// If we started scanning from a high scanStart port
426+
// there might have been not enough ports left that are
427+
// smaller than highPort. Scan from the beginning
428+
// from the lowPort.
429+
if (foundPort == -1) {
430+
scanStart = lowPort + (shardCount - lowPort % shardCount) + shardId;
431+
432+
for (int port = scanStart; port <= highPort; port += shardCount) {
433+
if (isTcpPortAvailable(port)) {
434+
foundPort = port;
435+
break;
436+
}
437+
}
438+
}
439+
440+
// No luck! All ports taken!
441+
if (foundPort == -1) {
442+
return -1;
443+
}
444+
} while (!lastPort.compareAndSet(lastPortValue, foundPort));
445+
446+
return foundPort;
447+
}
448+
449+
public static boolean isTcpPortAvailable(int port) {
450+
try {
451+
ServerSocket serverSocket = new ServerSocket();
452+
try {
453+
serverSocket.setReuseAddress(false);
454+
serverSocket.bind(new InetSocketAddress(port), 1);
455+
return true;
456+
} finally {
457+
serverSocket.close();
458+
}
459+
} catch (IOException ex) {
460+
return false;
461+
}
462+
}
463+
}
394464
}

0 commit comments

Comments
 (0)