diff --git a/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java index fe91f4bf0..db4eb3ec2 100644 --- a/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java @@ -75,6 +75,17 @@ public interface DuplexConnection extends Availability, Closeable { */ ByteBufAllocator alloc(); + /** + * Return the local address that this connection is connected to. The returned {@link + * SocketAddress} varies by transport type and should be downcast to obtain more detailed + * information. For TCP and WebSocket, the address type is {@link java.net.InetSocketAddress}. For + * local transport, it is {@link io.rsocket.transport.local.LocalSocketAddress}. + * + * @return the address + * @since 1.1.1 + */ + SocketAddress localAddress(); + /** * Return the remote address that this connection is connected to. The returned {@link * SocketAddress} varies by transport type and should be downcast to obtain more detailed diff --git a/rsocket-core/src/main/java/io/rsocket/RSocket.java b/rsocket-core/src/main/java/io/rsocket/RSocket.java index b05241365..d4da18d9f 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocket.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocket.java @@ -19,6 +19,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.net.SocketAddress; /** * A contract providing different interaction models for metadataPush(Payload payload) { return RSocketAdapter.metadataPush(payload); } + /** + * Returns the local address where this channel is bound to. The returned + * {@link SocketAddress} is supposed to be down-cast into more concrete + * type such as {@link java.net.InetSocketAddress} to retrieve the detailed + * information. + * + * @return the local address of this channel. + * {@code null} if this channel is not bound. + * @since 1.1.1 + */ + default SocketAddress localAddress() { + return null; + } + + /** + * Returns the remote address where this channel is connected to. The + * returned {@link SocketAddress} is supposed to be down-cast into more + * concrete type such as {@link java.net.InetSocketAddress} to retrieve the detailed + * information. + * + * @return the remote address of this channel. + * {@code null} if this channel is not connected. + * If this channel is not connected but it can receive messages + * from arbitrary remote addresses to determine + * the origination of the received message as this method will + * return {@code null}. + * @since 1.1.1 + */ + default SocketAddress remoteAddress() { + return null; + } + @Override default double availability() { return isDisposed() ? 0.0 : 1.0; diff --git a/rsocket-core/src/main/java/io/rsocket/core/ClientServerInputMultiplexer.java b/rsocket-core/src/main/java/io/rsocket/core/ClientServerInputMultiplexer.java index d6cb46d98..3e4fcc01d 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/ClientServerInputMultiplexer.java +++ b/rsocket-core/src/main/java/io/rsocket/core/ClientServerInputMultiplexer.java @@ -276,6 +276,11 @@ public ByteBufAllocator alloc() { return source.alloc(); } + @Override + public SocketAddress localAddress() { + return source.localAddress(); + } + @Override public SocketAddress remoteAddress() { return source.remoteAddress(); diff --git a/rsocket-core/src/main/java/io/rsocket/core/LoggingDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/core/LoggingDuplexConnection.java index 7b5d8f6c2..b2cb23dfe 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/LoggingDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/core/LoggingDuplexConnection.java @@ -57,6 +57,11 @@ public ByteBufAllocator alloc() { return source.alloc(); } + @Override + public SocketAddress localAddress() { + return source.localAddress(); + } + @Override public SocketAddress remoteAddress() { return source.remoteAddress(); diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index c5853531b..f3b0c5905 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -34,6 +34,7 @@ import io.rsocket.keepalive.KeepAliveHandler; import io.rsocket.keepalive.KeepAliveSupport; import io.rsocket.plugins.RequestInterceptor; +import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; @@ -174,6 +175,16 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) { return nextStreamId; } + @Override + public SocketAddress localAddress() { + return getDuplexConnection().localAddress(); + } + + @Override + public SocketAddress remoteAddress() { + return getDuplexConnection().remoteAddress(); + } + @Override public double availability() { final RequesterLeaseTracker requesterLeaseTracker = this.requesterLeaseTracker; diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index 969353bd6..c9232fcd0 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -32,6 +32,7 @@ import io.rsocket.frame.RequestStreamFrameCodec; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.plugins.RequestInterceptor; +import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -151,6 +152,16 @@ public Mono metadataPush(Payload payload) { } } + @Override + public SocketAddress localAddress() { + return getDuplexConnection().localAddress(); + } + + @Override + public SocketAddress remoteAddress() { + return getDuplexConnection().remoteAddress(); + } + @Override public void dispose() { tryTerminate(() -> new CancellationException("Disposed")); diff --git a/rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java index b6bc87513..36c6dd914 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java @@ -63,6 +63,11 @@ public Flux receive() { return this; } + @Override + public SocketAddress localAddress() { + return source.localAddress(); + } + @Override public SocketAddress remoteAddress() { return source.remoteAddress(); diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java index 6e90e6d63..de1310f6c 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java @@ -47,6 +47,7 @@ public class ResumableDuplexConnection extends Flux final UnboundedProcessor savableFramesSender; final Disposable framesSaverDisposable; final Sinks.Empty onClose; + final SocketAddress localAddress; final SocketAddress remoteAddress; final Sinks.Many onConnectionClosedSink; @@ -73,6 +74,7 @@ public ResumableDuplexConnection( this.savableFramesSender = new UnboundedProcessor(); this.framesSaverDisposable = resumableFramesStore.saveFrames(savableFramesSender).subscribe(); this.onClose = Sinks.empty(); + this.localAddress = initialConnection.localAddress(); this.remoteAddress = initialConnection.remoteAddress(); ACTIVE_CONNECTION.lazySet(this, initialConnection); @@ -219,6 +221,11 @@ public boolean isDisposed() { return onClose.scan(Scannable.Attr.TERMINATED) || onClose.scan(Scannable.Attr.CANCELLED); } + @Override + public SocketAddress localAddress() { + return localAddress; + } + @Override public SocketAddress remoteAddress() { return remoteAddress; @@ -278,6 +285,11 @@ public ByteBufAllocator alloc() { return ByteBufAllocator.DEFAULT; } + @Override + public SocketAddress localAddress() { + return null; + } + @Override @SuppressWarnings("ConstantConditions") public SocketAddress remoteAddress() { diff --git a/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java b/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java index 518b727c1..81d0b242a 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java +++ b/rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java @@ -21,6 +21,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.net.SocketAddress; /** Wrapper/Proxy for a RSocket. This is useful when we want to override a specific method. */ public class RSocketProxy implements RSocket { @@ -55,6 +56,16 @@ public Mono metadataPush(Payload payload) { return source.metadataPush(payload); } + @Override + public SocketAddress localAddress() { + return source.localAddress(); + } + + @Override + public SocketAddress remoteAddress() { + return source.remoteAddress(); + } + @Override public double availability() { return source.availability(); diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java b/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java index cdfcefdc8..666eca165 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java @@ -101,6 +101,11 @@ public ByteBufAllocator alloc() { return allocator; } + @Override + public SocketAddress localAddress() { + return new TestLocalSocketAddress(name); + } + @Override public SocketAddress remoteAddress() { return new TestLocalSocketAddress(name); diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java b/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java index 179afff58..db0daa1f7 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java @@ -21,6 +21,7 @@ import io.rsocket.Payload; import io.rsocket.RSocket; +import java.net.SocketAddress; import java.util.concurrent.atomic.AtomicInteger; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -71,6 +72,16 @@ public final Mono metadataPush(Payload payload) { return delegate.metadataPush(payload).doOnSubscribe(s -> pushCount.incrementAndGet()); } + @Override + public SocketAddress localAddress() { + return delegate.localAddress(); + } + + @Override + public SocketAddress remoteAddress() { + return delegate.remoteAddress(); + } + @Override public double availability() { return delegate.availability(); diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java b/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java index 8793d6ca4..59283e244 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java @@ -124,6 +124,11 @@ public LeaksTrackingByteBufAllocator alloc() { return allocator; } + @Override + public SocketAddress localAddress() { + return new TestLocalSocketAddress("TestDuplexConnection"); + } + @Override public SocketAddress remoteAddress() { return new TestLocalSocketAddress("TestDuplexConnection"); diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java index 6329da826..2509e328a 100644 --- a/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java +++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java @@ -23,6 +23,7 @@ import io.rsocket.stat.Median; import io.rsocket.stat.Quantile; import io.rsocket.util.Clock; +import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.time.Duration; import java.util.ArrayList; @@ -518,6 +519,16 @@ public Mono metadataPush(Payload payload) { return errorVoid; } + @Override + public SocketAddress localAddress() { + throw new RuntimeException(NoAvailableRSocketException.INSTANCE); + } + + @Override + public SocketAddress remoteAddress() { + throw new RuntimeException(NoAvailableRSocketException.INSTANCE); + } + @Override public double availability() { return 0; diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java index beb424797..26160955d 100644 --- a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java +++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java @@ -21,6 +21,7 @@ import io.rsocket.stat.FrugalQuantile; import io.rsocket.stat.Quantile; import io.rsocket.util.Clock; +import java.net.SocketAddress; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -84,6 +85,16 @@ public Mono metadataPush(Payload payload) { return child.metadataPush(payload); } + @Override + public SocketAddress localAddress() { + return child.localAddress(); + } + + @Override + public SocketAddress remoteAddress() { + return child.remoteAddress(); + } + @Override public double availability() { return child.availability(); diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java index aaf9f71e6..df4fe2507 100644 --- a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java +++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/RSocketSupplier.java @@ -23,6 +23,7 @@ import io.rsocket.stat.Ewma; import io.rsocket.util.Clock; import io.rsocket.util.RSocketProxy; +import java.net.SocketAddress; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.reactivestreams.Publisher; @@ -149,6 +150,16 @@ public Mono metadataPush(Payload payload) { .doOnSuccess(v -> updateErrorPercentage(1.0)); } + @Override + public SocketAddress localAddress() { + return source.localAddress(); + } + + @Override + public SocketAddress remoteAddress() { + return source.remoteAddress(); + } + @Override public double availability() { // If the window is expired set success and failure to zero and return diff --git a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java index 7c7ac37b9..c9e100ea2 100644 --- a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java +++ b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java @@ -89,6 +89,11 @@ public ByteBufAllocator alloc() { return delegate.alloc(); } + @Override + public SocketAddress localAddress() { + return delegate.localAddress(); + } + @Override public SocketAddress remoteAddress() { return delegate.remoteAddress(); diff --git a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerRSocket.java b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerRSocket.java index 9e1abbc03..faed8b785 100644 --- a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerRSocket.java +++ b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerRSocket.java @@ -29,6 +29,7 @@ import io.micrometer.core.instrument.Timer.Sample; import io.rsocket.Payload; import io.rsocket.RSocket; +import java.net.SocketAddress; import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -95,6 +96,16 @@ public Mono metadataPush(Payload payload) { return delegate.metadataPush(payload).doFinally(metadataPush); } + @Override + public SocketAddress localAddress() { + return delegate.localAddress(); + } + + @Override + public SocketAddress remoteAddress() { + return delegate.remoteAddress(); + } + @Override public Mono onClose() { return delegate.onClose(); diff --git a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java index 0bae8cd69..3fcad1d2c 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java +++ b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java @@ -679,6 +679,11 @@ public ByteBufAllocator alloc() { return duplexConnection.alloc(); } + @Override + public SocketAddress localAddress() { + return duplexConnection.localAddress(); + } + @Override public SocketAddress remoteAddress() { return duplexConnection.remoteAddress(); @@ -754,6 +759,11 @@ public ByteBufAllocator alloc() { return source.alloc(); } + @Override + public SocketAddress localAddress() { + return source.localAddress(); + } + @Override public SocketAddress remoteAddress() { return source.remoteAddress(); diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java index 5e18aa4cc..1a2176dfa 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java @@ -110,6 +110,11 @@ public ByteBufAllocator alloc() { return allocator; } + @Override + public SocketAddress localAddress() { + return address; + } + @Override public SocketAddress remoteAddress() { return address; diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java index f9ac705b1..a1783b428 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java @@ -57,6 +57,11 @@ public ByteBufAllocator alloc() { return connection.channel().alloc(); } + @Override + public SocketAddress localAddress() { + return connection.channel().localAddress(); + } + @Override public SocketAddress remoteAddress() { return connection.channel().remoteAddress(); diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java index c81f040da..fc0d55ed3 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java @@ -62,6 +62,11 @@ public ByteBufAllocator alloc() { return connection.channel().alloc(); } + @Override + public SocketAddress localAddress() { + return connection.channel().localAddress(); + } + @Override public SocketAddress remoteAddress() { return connection.channel().remoteAddress();