diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java index debe1f7fcac87..b0529c2a777e1 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java @@ -113,7 +113,7 @@ private static List parseCommaSeparatedConfigValue(String configValue) { } public CompletableFuture resolveAndCheckTargetAddress(String hostAndPort) { - int pos = hostAndPort.indexOf(':'); + int pos = hostAndPort.lastIndexOf(':'); String host = hostAndPort.substring(0, pos); int port = Integer.parseInt(hostAndPort.substring(pos + 1)); if (!isPortAllowed(port)) { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index 44a236a8e31c6..e04ac1e4ae83d 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -21,15 +21,14 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; - import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelId; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.socket.SocketChannel; @@ -41,27 +40,16 @@ import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.CharsetUtil; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; - import java.net.InetSocketAddress; -import java.net.URI; -import java.net.URISyntaxException; import java.util.Arrays; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; - import javax.net.ssl.SSLSession; - import lombok.Getter; - import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.common.tls.TlsHostnameVerifier; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.api.proto.CommandAuthChallenge; @@ -69,6 +57,7 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.PulsarDecoder; import org.apache.pulsar.common.stats.Rate; +import org.apache.pulsar.common.tls.TlsHostnameVerifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,11 +65,11 @@ public class DirectProxyHandler { @Getter private final Channel inboundChannel; + private final ProxyConnection proxyConnection; @Getter Channel outboundChannel; @Getter private final Rate inboundChannelRequestsRate; - protected static Map inboundOutboundChannelMap = new ConcurrentHashMap<>(); private final String originalPrincipal; private final AuthData clientAuthData; private final String clientAuthMethod; @@ -91,12 +80,13 @@ public class DirectProxyHandler { private final ProxyService service; private final Runnable onHandshakeCompleteAction; - public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl, + public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String brokerHostAndPort, InetSocketAddress targetBrokerAddress, int protocolVersion, Supplier sslHandlerSupplier) { this.service = service; this.authentication = proxyConnection.getClientAuthentication(); this.inboundChannel = proxyConnection.ctx().channel(); + this.proxyConnection = proxyConnection; this.inboundChannelRequestsRate = new Rate(); this.originalPrincipal = proxyConnection.clientAuthRole; this.clientAuthData = proxyConnection.clientAuthData; @@ -114,7 +104,18 @@ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, if (brokerProxyConnectTimeoutMs > 0) { b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, brokerProxyConnectTimeoutMs); } - b.group(inboundChannel.eventLoop()).channel(inboundChannel.getClass()).option(ChannelOption.AUTO_READ, false); + b.group(inboundChannel.eventLoop()) + .channel(inboundChannel.getClass()); + + String remoteHost; + try { + remoteHost = parseHost(brokerHostAndPort); + } catch (IllegalArgumentException e) { + log.warn("[{}] Failed to parse broker host '{}'", inboundChannel, brokerHostAndPort, e); + inboundChannel.close(); + return; + } + b.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { @@ -128,65 +129,58 @@ protected void initChannel(SocketChannel ch) { } ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder( Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4)); - ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config, protocolVersion)); + ch.pipeline().addLast("proxyOutboundHandler", + new ProxyBackendHandler(config, protocolVersion, remoteHost)); } }); - URI targetBroker; - try { - // targetBrokerUrl is coming in the "hostname:6650" form, so we need - // to extract host and port - targetBroker = new URI("pulsar://" + targetBrokerUrl); - } catch (URISyntaxException e) { - log.warn("[{}] Failed to parse broker url '{}'", inboundChannel, targetBrokerUrl, e); - inboundChannel.close(); - return; - } - ChannelFuture f = b.connect(targetBrokerAddress); outboundChannel = f.channel(); f.addListener(future -> { if (!future.isSuccess()) { // Close the connection if the connection attempt has failed. log.warn("[{}] Establishing connection to {} ({}) failed. Closing inbound channel.", inboundChannel, - targetBrokerAddress, targetBrokerUrl, future.cause()); + targetBrokerAddress, brokerHostAndPort, future.cause()); inboundChannel.close(); return; } - final ProxyBackendHandler cnx = (ProxyBackendHandler) outboundChannel.pipeline() - .get("proxyOutboundHandler"); - cnx.setRemoteHostName(targetBroker.getHost()); - - // if enable full parsing feature - if (service.getProxyLogLevel() == 2) { - //Set a map between inbound and outbound, - //so can find inbound by outbound or find outbound by inbound - inboundOutboundChannelMap.put(outboundChannel.id() , inboundChannel.id()); - } + }); + } - if (config.isHaProxyProtocolEnabled()) { - if (proxyConnection.hasHAProxyMessage()) { - outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage())); - } else { - if (inboundChannel.remoteAddress() instanceof InetSocketAddress) { - InetSocketAddress clientAddress = (InetSocketAddress) inboundChannel.remoteAddress(); - String sourceAddress = clientAddress.getAddress().getHostAddress(); - int sourcePort = clientAddress.getPort(); - if (outboundChannel.localAddress() instanceof InetSocketAddress) { - InetSocketAddress proxyAddress = (InetSocketAddress) inboundChannel.remoteAddress(); - String destinationAddress = proxyAddress.getAddress().getHostAddress(); - int destinationPort = proxyAddress.getPort(); - HAProxyMessage msg = new HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY, - HAProxyProxiedProtocol.TCP4, sourceAddress, destinationAddress, sourcePort, destinationPort); - outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg)); - msg.release(); - } - } - } + private static String parseHost(String brokerPortAndHost) { + int pos = brokerPortAndHost.lastIndexOf(':'); + if (pos > 0) { + return brokerPortAndHost.substring(0, pos); + } else { + throw new IllegalArgumentException("Illegal broker host:port '" + brokerPortAndHost + "'"); + } + } + + private void writeHAProxyMessage() { + if (proxyConnection.hasHAProxyMessage()) { + outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage())) + .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + } else { + if (inboundChannel.remoteAddress() instanceof InetSocketAddress + && outboundChannel.localAddress() instanceof InetSocketAddress) { + InetSocketAddress clientAddress = (InetSocketAddress) inboundChannel.remoteAddress(); + String sourceAddress = clientAddress.getAddress().getHostAddress(); + int sourcePort = clientAddress.getPort(); + InetSocketAddress proxyAddress = (InetSocketAddress) inboundChannel.remoteAddress(); + String destinationAddress = proxyAddress.getAddress().getHostAddress(); + int destinationPort = proxyAddress.getPort(); + HAProxyMessage msg = new HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY, + HAProxyProxiedProtocol.TCP4, sourceAddress, destinationAddress, sourcePort, + destinationPort); + outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg)) + .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + msg.release(); } - }); + } } + + private ByteBuf encodeProxyProtocolMessage(HAProxyMessage msg) { // Max length of v1 version proxy protocol message is 108 ByteBuf out = Unpooled.buffer(108); @@ -218,30 +212,45 @@ enum BackendState { Init, HandshakeCompleted } - public class ProxyBackendHandler extends PulsarDecoder implements FutureListener { + public class ProxyBackendHandler extends PulsarDecoder { private BackendState state = BackendState.Init; - private String remoteHostName; + private final String remoteHostName; protected ChannelHandlerContext ctx; private final ProxyConfiguration config; private final int protocolVersion; - public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion) { + public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion, String remoteHostName) { this.config = config; this.protocolVersion = protocolVersion; + this.remoteHostName = remoteHostName; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; + + if (config.isHaProxyProtocolEnabled()) { + writeHAProxyMessage(); + } + // Send the Connect command to broker authenticationDataProvider = authentication.getAuthData(remoteHostName); AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); ByteBuf command; command = Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, "Pulsar proxy", null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod); - outboundChannel.writeAndFlush(command); - outboundChannel.read(); + outboundChannel.writeAndFlush(command) + .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + // handle backpressure + // stop/resume reading input from connection between the client and the proxy + // when the writability of the connection between the proxy and the broker changes + inboundChannel.config().setAutoRead(ctx.channel().isWritable()); + super.channelWritabilityChanged(ctx); } @Override @@ -262,7 +271,8 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce if (msg instanceof ByteBuf) { ProxyService.bytesCounter.inc(((ByteBuf) msg).readableBytes()); } - inboundChannel.writeAndFlush(msg).addListener(this); + inboundChannel.writeAndFlush(msg) + .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); break; default: @@ -301,26 +311,13 @@ protected void handleAuthChallenge(CommandAuthChallenge authChallenge) { log.debug("{} Mutual auth {}", ctx.channel(), authentication.getAuthMethodName()); } - outboundChannel.writeAndFlush(request); - outboundChannel.read(); + outboundChannel.writeAndFlush(request) + .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); } catch (Exception e) { log.error("Error mutual verify", e); } } - @Override - public void operationComplete(Future future) { - // This is invoked when the write operation on the paired connection - // is completed - if (future.isSuccess()) { - outboundChannel.read(); - } else { - log.warn("[{}] [{}] Failed to write on proxy connection. Closing both connections.", inboundChannel, - outboundChannel, future.cause()); - inboundChannel.close(); - } - } - @Override protected void messageReceived() { // no-op @@ -350,18 +347,7 @@ protected void handleConnected(CommandConnected connected) { int maxMessageSize = connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE; inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize)) - .addListener(future -> { - if (future.isSuccess()) { - // Start reading from both connections - inboundChannel.read(); - outboundChannel.read(); - } else { - log.warn("[{}] [{}] Failed to write to inbound connection. Closing both connections.", - inboundChannel, - outboundChannel, future.cause()); - inboundChannel.close(); - } - }); + .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); } private void startDirectProxying(CommandConnected connected) { @@ -390,20 +376,20 @@ private void startDirectProxying(CommandConnected connected) { inboundChannel.pipeline().addBefore("handler", "inboundParser", new ParserProxyHandler(service, inboundChannel, ParserProxyHandler.FRONTEND_CONN, - connected.getMaxMessageSize())); + connected.getMaxMessageSize(), outboundChannel.id())); outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser", new ParserProxyHandler(service, outboundChannel, ParserProxyHandler.BACKEND_CONN, - connected.getMaxMessageSize())); + connected.getMaxMessageSize(), inboundChannel.id())); } else { inboundChannel.pipeline().addBefore("handler", "inboundParser", new ParserProxyHandler(service, inboundChannel, ParserProxyHandler.FRONTEND_CONN, - Commands.DEFAULT_MAX_MESSAGE_SIZE)); + Commands.DEFAULT_MAX_MESSAGE_SIZE, outboundChannel.id())); outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser", new ParserProxyHandler(service, outboundChannel, ParserProxyHandler.BACKEND_CONN, - Commands.DEFAULT_MAX_MESSAGE_SIZE)); + Commands.DEFAULT_MAX_MESSAGE_SIZE, inboundChannel.id())); } } } @@ -419,10 +405,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } - public void setRemoteHostName(String remoteHostName) { - this.remoteHostName = remoteHostName; - } - private boolean verifyTlsHostName(String hostname, ChannelHandlerContext ctx) { ChannelHandler sslHandler = ctx.channel().pipeline().get("tls"); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java index 68bd74e68008a..41a9b594f9363 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java @@ -19,10 +19,19 @@ package org.apache.pulsar.proxy.server; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelId; +import io.netty.channel.ChannelInboundHandlerAdapter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; - import org.apache.commons.lang3.mutable.MutableLong; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.raw.MessageParser; @@ -32,59 +41,62 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; - public class ParserProxyHandler extends ChannelInboundHandlerAdapter { - private Channel channel; + private final Channel channel; //inbound protected static final String FRONTEND_CONN = "frontendconn"; //outbound protected static final String BACKEND_CONN = "backendconn"; - private String connType; + private final String connType; - private int maxMessageSize; + private final int maxMessageSize; + private final ChannelId peerChannelId; private final ProxyService service; - //producerid+channelid as key - //or consumerid+channelid as key - private static Map producerHashMap = new ConcurrentHashMap<>(); - private static Map consumerHashMap = new ConcurrentHashMap<>(); + /** + * producerid + channelid as key. + */ + private static final Map producerHashMap = new ConcurrentHashMap<>(); + + /** + * consumerid + channelid as key. + */ + private static final Map consumerHashMap = new ConcurrentHashMap<>(); - public ParserProxyHandler(ProxyService service, Channel channel, String type, int maxMessageSize) { + public ParserProxyHandler(ProxyService service, Channel channel, String type, int maxMessageSize, + ChannelId peerChannelId) { this.service = service; this.channel = channel; this.connType = type; this.maxMessageSize = maxMessageSize; + this.peerChannelId = peerChannelId; } - private void logging(Channel conn, BaseCommand.Type cmdtype, String info, List messages) throws Exception{ + private void logging(Channel conn, BaseCommand.Type cmdtype, String info, List messages) { if (messages != null) { // lag - for (int i=0; i messages = Lists.newArrayList(); - ByteBuf buffer = (ByteBuf)(msg); + TopicName topicName; + List messages = new ArrayList<>(); + ByteBuf buffer = (ByteBuf) (msg); try { buffer.markReaderIndex(); @@ -105,20 +117,23 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { switch (cmd.getType()) { case PRODUCER: - ParserProxyHandler.producerHashMap.put(String.valueOf(cmd.getProducer().getProducerId()) + "," + String.valueOf(ctx.channel().id()), cmd.getProducer().getTopic()); + ParserProxyHandler.producerHashMap.put(cmd.getProducer().getProducerId() + "," + ctx.channel().id(), + cmd.getProducer().getTopic()); - logging(ctx.channel() , cmd.getType() , "{producer:" + cmd.getProducer().getProducerName() + ",topic:" + cmd.getProducer().getTopic() + "}", null); + logging(ctx.channel(), cmd.getType(), "{producer:" + cmd.getProducer().getProducerName() + + ",topic:" + cmd.getProducer().getTopic() + "}", null); break; case SEND: if (service.getProxyLogLevel() != 2) { - logging(ctx.channel() , cmd.getType() , "", null); + logging(ctx.channel(), cmd.getType(), "", null); break; } - topicName = TopicName.get(ParserProxyHandler.producerHashMap.get(String.valueOf(cmd.getSend().getProducerId()) + "," + String.valueOf(ctx.channel().id()))); + topicName = TopicName.get(ParserProxyHandler.producerHashMap.get(cmd.getSend().getProducerId() + "," + + ctx.channel().id())); MutableLong msgBytes = new MutableLong(0); - MessageParser.parseMessage(topicName, -1L, - -1L,buffer,(message) -> { + MessageParser.parseMessage(topicName, -1L, + -1L, buffer, (message) -> { messages.add(message); msgBytes.add(message.getData().readableBytes()); }, maxMessageSize); @@ -126,42 +141,43 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(), topic -> new TopicStats()); topicStats.getMsgInRate().recordMultipleEvents(messages.size(), msgBytes.longValue()); - logging(ctx.channel() , cmd.getType() , "" , messages); + logging(ctx.channel(), cmd.getType(), "", messages); break; case SUBSCRIBE: - ParserProxyHandler.consumerHashMap.put(String.valueOf(cmd.getSubscribe().getConsumerId()) + "," + String.valueOf(ctx.channel().id()) , cmd.getSubscribe().getTopic()); + ParserProxyHandler.consumerHashMap.put(cmd.getSubscribe().getConsumerId() + "," + + ctx.channel().id(), cmd.getSubscribe().getTopic()); - logging(ctx.channel() , cmd.getType() , "{consumer:" + cmd.getSubscribe().getConsumerName() + ",topic:" + cmd.getSubscribe().getTopic() + "}" , null); + logging(ctx.channel(), cmd.getType(), "{consumer:" + cmd.getSubscribe().getConsumerName() + + ",topic:" + cmd.getSubscribe().getTopic() + "}", null); break; case MESSAGE: if (service.getProxyLogLevel() != 2) { - logging(ctx.channel() , cmd.getType() , "" , null); + logging(ctx.channel(), cmd.getType(), "", null); break; } - topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(String.valueOf(cmd.getMessage().getConsumerId()) + "," + DirectProxyHandler.inboundOutboundChannelMap.get(ctx.channel().id()))); + topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(cmd.getMessage().getConsumerId() + + "," + peerChannelId)); msgBytes = new MutableLong(0); - MessageParser.parseMessage(topicName, -1L, - -1L,buffer,(message) -> { - messages.add(message); - msgBytes.add(message.getData().readableBytes()); - }, maxMessageSize); + MessageParser.parseMessage(topicName, -1L, + -1L, buffer, (message) -> { + messages.add(message); + msgBytes.add(message.getData().readableBytes()); + }, maxMessageSize); // update topic stats topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(), topic -> new TopicStats()); topicStats.getMsgOutRate().recordMultipleEvents(messages.size(), msgBytes.longValue()); - logging(ctx.channel() , cmd.getType() , "" , messages); + logging(ctx.channel(), cmd.getType(), "", messages); break; default: - logging(ctx.channel() , cmd.getType() , "" , null); + logging(ctx.channel(), cmd.getType(), "", null); break; } } catch (Exception e){ - - log.error("{},{},{}" , e.getMessage() , e.getStackTrace() , e.getCause()); - + log.error("channelRead error ", e); } finally { buffer.resetReaderIndex(); buffer.resetWriterIndex(); @@ -170,8 +186,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf totalSizeBuf = Unpooled.buffer(4); totalSizeBuf.writeInt(buffer.readableBytes()); CompositeByteBuf compBuf = Unpooled.compositeBuffer(); - compBuf.addComponents(totalSizeBuf,buffer); - compBuf.writerIndex(totalSizeBuf.capacity()+buffer.capacity()); + compBuf.addComponents(totalSizeBuf, buffer); + compBuf.writerIndex(totalSizeBuf.capacity() + buffer.capacity()); // Release mssages messages.forEach(RawMessage::release); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index df99acacbef5d..29060faeff25a 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; +import io.netty.channel.ChannelFutureListener; import io.netty.handler.codec.haproxy.HAProxyMessage; import java.net.SocketAddress; import java.util.Collections; @@ -69,7 +70,7 @@ * Handles incoming discovery request from client and sends appropriate response back to client * */ -public class ProxyConnection extends PulsarHandler implements FutureListener { +public class ProxyConnection extends PulsarHandler { private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class); // ConnectionPool is used by the proxy to issue lookup requests private ConnectionPool connectionPool; @@ -190,6 +191,17 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E ctx.close(); } + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + if (directProxyHandler != null && directProxyHandler.outboundChannel != null) { + // handle backpressure + // stop/resume reading input from connection between the proxy and the broker + // when the writability of the connection between the client and the proxy changes + directProxyHandler.outboundChannel.config().setAutoRead(ctx.channel().isWritable()); + } + super.channelWritabilityChanged(ctx); + } + @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof HAProxyMessage) { @@ -213,7 +225,8 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes); ProxyService.bytesCounter.inc(bytes); } - directProxyHandler.outboundChannel.writeAndFlush(msg).addListener(this); + directProxyHandler.outboundChannel.writeAndFlush(msg) + .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); break; default: @@ -221,18 +234,6 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce } } - @Override - public void operationComplete(Future future) { - // This is invoked when the write operation on the paired connection is - // completed - if (future.isSuccess()) { - ctx.read(); - } else { - LOG.warn("[{}] Error in writing to inbound channel. Closing", remoteAddress, future.cause()); - directProxyHandler.outboundChannel.close(); - } - } - private synchronized void completeConnect(AuthData clientData) throws PulsarClientException { if (service.getConfiguration().isAuthenticationEnabled()) { if (service.getConfiguration().isForwardAuthorizationCredentials()) { @@ -270,18 +271,18 @@ private synchronized void completeConnect(AuthData clientData) throws PulsarClie ctx() .writeAndFlush( Commands.newError(-1, ServerError.ServiceNotReady, "Target broker isn't available.")) - .addListener(future -> ctx().close()); + .addListener(ChannelFutureListener.CLOSE); return; } brokerProxyValidator.resolveAndCheckTargetAddress(proxyToBrokerUrl) - .thenAccept(address -> ctx().executor().submit(() -> { + .thenAcceptAsync(address -> { // Client already knows which broker to connect. Let's open a // connection there and just pass bytes in both directions state = State.ProxyConnectionToBroker; directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl, address, protocolVersionToAdvertise, sslHandlerSupplier); - })) + }, ctx.executor()) .exceptionally(throwable -> { if (throwable instanceof TargetAddressDeniedException || throwable.getCause() instanceof TargetAddressDeniedException) { @@ -300,7 +301,7 @@ private synchronized void completeConnect(AuthData clientData) throws PulsarClie .writeAndFlush( Commands.newError(-1, ServerError.ServiceNotReady, "Target broker cannot be validated.")) - .addListener(future -> ctx().close()); + .addListener(ChannelFutureListener.CLOSE); return null; }); } else { @@ -309,7 +310,8 @@ private synchronized void completeConnect(AuthData clientData) throws PulsarClie // partitions metadata lookups state = State.ProxyLookupRequests; lookupProxyHandler = new LookupProxyHandler(service, this); - ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise)); + ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise)) + .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); } } @@ -328,7 +330,8 @@ private void doAuthentication(AuthData clientData) throws Exception { } // auth not complete, continue auth with client side. - ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, protocolVersionToAdvertise)); + ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, protocolVersionToAdvertise)) + .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); if (LOG.isDebugEnabled()) { LOG.debug("[{}] Authentication in progress client by method {}.", remoteAddress, authMethod); @@ -406,8 +409,8 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), doAuthentication(clientData); } catch (Exception e) { LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e); - ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate")); - close(); + ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate")) + .addListener(ChannelFutureListener.CLOSE); } } @@ -428,8 +431,8 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) { } catch (Exception e) { String msg = "Unable to handleAuthResponse"; LOG.warn("[{}] {} ", remoteAddress, msg, e); - ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg)); - close(); + ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg)) + .addListener(ChannelFutureListener.CLOSE); } } @@ -462,25 +465,6 @@ protected void handleLookup(CommandLookupTopic lookup) { lookupProxyHandler.handleLookup(lookup); } - private synchronized void close() { - if (state != State.Closed) { - state = State.Closed; - if (directProxyHandler != null && directProxyHandler.outboundChannel != null) { - directProxyHandler.outboundChannel.close(); - directProxyHandler = null; - } - if (connectionPool != null) { - try { - connectionPool.close(); - connectionPool = null; - } catch (Exception e) { - LOG.error("Error closing connection pool", e); - } - } - ctx.close(); - } - } - ClientConfigurationData createClientConfiguration() { ClientConfigurationData clientConf = new ClientConfigurationData(); clientConf.setServiceUrl(service.getServiceUrl()); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java index 8e457554cf5ad..fba3c36e26616 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java @@ -90,6 +90,26 @@ public void shouldAllowAllWithWildcard() throws Exception { brokerProxyValidator.resolveAndCheckTargetAddress("myhost.mydomain:6650").get(); } + @Test + public void shouldAllowIPv6Address() throws Exception { + BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator( + createMockedAddressResolver("fd4d:801b:73fa:abcd:0000:0000:0000:0001"), + "*" + , "fd4d:801b:73fa:abcd::/64" + , "6650"); + brokerProxyValidator.resolveAndCheckTargetAddress("myhost.mydomain:6650").get(); + } + + @Test + public void shouldAllowIPv6AddressNumeric() throws Exception { + BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator( + createMockedAddressResolver("fd4d:801b:73fa:abcd:0000:0000:0000:0001"), + "*" + , "fd4d:801b:73fa:abcd::/64" + , "6650"); + brokerProxyValidator.resolveAndCheckTargetAddress("fd4d:801b:73fa:abcd:0000:0000:0000:0001:6650").get(); + } + private AddressResolver createMockedAddressResolver(String ipAddressResult) { AddressResolver inetSocketAddressResolver = mock(AddressResolver.class); when(inetSocketAddressResolver.resolve(any())).then(invocationOnMock -> {