Skip to content

Commit 16392bc

Browse files
authored
netty: converts Proxy handler into new protocol negotiation style (grpc#6159)
1 parent 19b0916 commit 16392bc

File tree

4 files changed

+71
-278
lines changed

4 files changed

+71
-278
lines changed

netty/src/main/java/io/grpc/netty/InternalProtocolNegotiators.java

-15
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.grpc.ChannelLogger;
2020
import io.grpc.netty.ProtocolNegotiators.ClientTlsHandler;
2121
import io.grpc.netty.ProtocolNegotiators.GrpcNegotiationHandler;
22-
import io.grpc.netty.ProtocolNegotiators.ProtocolNegotiationHandler;
2322
import io.grpc.netty.ProtocolNegotiators.WaitUntilActiveHandler;
2423
import io.netty.channel.ChannelHandler;
2524
import io.netty.channel.ChannelHandlerContext;
@@ -40,20 +39,6 @@ public static ChannelLogger negotiationLogger(ChannelHandlerContext ctx) {
4039
return ProtocolNegotiators.negotiationLogger(ctx);
4140
}
4241

43-
/**
44-
* Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or
45-
* {@link #fail(ChannelHandlerContext, Throwable)} is called. This handler allows us to
46-
* write to a {@link io.netty.channel.Channel} before we are allowed to write to it officially
47-
* i.e. before it's active or the TLS Handshake is complete.
48-
*/
49-
public abstract static class AbstractBufferingHandler
50-
extends ProtocolNegotiators.AbstractBufferingHandler {
51-
52-
protected AbstractBufferingHandler(ChannelHandler... handlers) {
53-
super(handlers);
54-
}
55-
}
56-
5742
/**
5843
* Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will
5944
* be negotiated, the {@code handler} is added and writes to the {@link io.netty.channel.Channel}

netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java

+35-237
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,10 @@
3434
import io.grpc.internal.GrpcAttributes;
3535
import io.grpc.internal.GrpcUtil;
3636
import io.netty.channel.ChannelDuplexHandler;
37-
import io.netty.channel.ChannelFuture;
3837
import io.netty.channel.ChannelFutureListener;
3938
import io.netty.channel.ChannelHandler;
4039
import io.netty.channel.ChannelHandlerContext;
41-
import io.netty.channel.ChannelInboundHandler;
4240
import io.netty.channel.ChannelInboundHandlerAdapter;
43-
import io.netty.channel.ChannelPromise;
4441
import io.netty.handler.codec.http.DefaultHttpRequest;
4542
import io.netty.handler.codec.http.HttpClientCodec;
4643
import io.netty.handler.codec.http.HttpClientUpgradeHandler;
@@ -50,7 +47,6 @@
5047
import io.netty.handler.codec.http2.Http2ClientUpgradeCodec;
5148
import io.netty.handler.proxy.HttpProxyHandler;
5249
import io.netty.handler.proxy.ProxyConnectionEvent;
53-
import io.netty.handler.proxy.ProxyHandler;
5450
import io.netty.handler.ssl.OpenSsl;
5551
import io.netty.handler.ssl.OpenSslEngine;
5652
import io.netty.handler.ssl.SslContext;
@@ -59,12 +55,9 @@
5955
import io.netty.util.AsciiString;
6056
import io.netty.util.Attribute;
6157
import io.netty.util.AttributeMap;
62-
import io.netty.util.ReferenceCountUtil;
6358
import java.net.SocketAddress;
6459
import java.net.URI;
65-
import java.util.ArrayDeque;
6660
import java.util.Arrays;
67-
import java.util.Queue;
6861
import java.util.logging.Level;
6962
import java.util.logging.Logger;
7063
import javax.annotation.Nullable;
@@ -195,20 +188,16 @@ private void fireProtocolNegotiationEvent(ChannelHandlerContext ctx, SSLSession
195188
public static ProtocolNegotiator httpProxy(final SocketAddress proxyAddress,
196189
final @Nullable String proxyUsername, final @Nullable String proxyPassword,
197190
final ProtocolNegotiator negotiator) {
191+
checkNotNull(negotiator, "negotiator");
192+
checkNotNull(proxyAddress, "proxyAddress");
198193
final AsciiString scheme = negotiator.scheme();
199-
Preconditions.checkNotNull(proxyAddress, "proxyAddress");
200-
Preconditions.checkNotNull(negotiator, "negotiator");
201194
class ProxyNegotiator implements ProtocolNegotiator {
202195
@Override
203196
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler http2Handler) {
204-
HttpProxyHandler proxyHandler;
205-
if (proxyUsername == null || proxyPassword == null) {
206-
proxyHandler = new HttpProxyHandler(proxyAddress);
207-
} else {
208-
proxyHandler = new HttpProxyHandler(proxyAddress, proxyUsername, proxyPassword);
209-
}
210-
return new BufferUntilProxyTunnelledHandler(
211-
proxyHandler, negotiator.newHandler(http2Handler));
197+
ChannelHandler protocolNegotiationHandler = negotiator.newHandler(http2Handler);
198+
ChannelHandler ppnh = new ProxyProtocolNegotiationHandler(
199+
proxyAddress, proxyUsername, proxyPassword, protocolNegotiationHandler);
200+
return ppnh;
212201
}
213202

214203
@Override
@@ -228,34 +217,45 @@ public void close() {
228217
}
229218

230219
/**
231-
* Buffers all writes until the HTTP CONNECT tunnel is established.
220+
* A Proxy handler follows {@link ProtocolNegotiationHandler} pattern. Upon successful proxy
221+
* connection, this handler will install {@code next} handler which should be a handler from
222+
* other type of {@link ProtocolNegotiator} to continue negotiating protocol using proxy.
232223
*/
233-
static final class BufferUntilProxyTunnelledHandler extends AbstractBufferingHandler {
224+
static final class ProxyProtocolNegotiationHandler extends ProtocolNegotiationHandler {
234225

235-
public BufferUntilProxyTunnelledHandler(ProxyHandler proxyHandler, ChannelHandler handler) {
236-
super(proxyHandler, handler);
237-
}
226+
private final SocketAddress address;
227+
@Nullable private final String userName;
228+
@Nullable private final String password;
238229

239-
@Override
240-
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
241-
if (evt instanceof ProxyConnectionEvent) {
242-
writeBufferedAndRemove(ctx);
243-
}
244-
super.userEventTriggered(ctx, evt);
230+
public ProxyProtocolNegotiationHandler(
231+
SocketAddress address,
232+
@Nullable String userName,
233+
@Nullable String password,
234+
ChannelHandler next) {
235+
super(next);
236+
this.address = checkNotNull(address, "address");
237+
this.userName = userName;
238+
this.password = password;
245239
}
246240

247241
@Override
248-
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
249-
fail(ctx, unavailableException("Connection broken while trying to CONNECT through proxy"));
250-
super.channelInactive(ctx);
242+
protected void protocolNegotiationEventTriggered(ChannelHandlerContext ctx) {
243+
HttpProxyHandler nettyProxyHandler;
244+
if (userName == null || password == null) {
245+
nettyProxyHandler = new HttpProxyHandler(address);
246+
} else {
247+
nettyProxyHandler = new HttpProxyHandler(address, userName, password);
248+
}
249+
ctx.pipeline().addBefore(ctx.name(), /* newName= */ null, nettyProxyHandler);
251250
}
252251

253252
@Override
254-
public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
255-
if (ctx.channel().isActive()) { // This may be a notification that the socket was closed
256-
fail(ctx, unavailableException("Channel closed while trying to CONNECT through proxy"));
253+
protected void userEventTriggered0(ChannelHandlerContext ctx, Object evt) throws Exception {
254+
if (evt instanceof ProxyConnectionEvent) {
255+
fireProtocolNegotiationEvent(ctx);
256+
} else {
257+
super.userEventTriggered(ctx, evt);
257258
}
258-
super.close(ctx, future);
259259
}
260260
}
261261

@@ -527,208 +527,6 @@ static void logSslEngineDetails(Level level, ChannelHandlerContext ctx, String m
527527
log.log(level, builder.toString(), t);
528528
}
529529

530-
/**
531-
* Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or
532-
* {@link #fail(ChannelHandlerContext, Throwable)} is called. This handler allows us to
533-
* write to a {@link io.netty.channel.Channel} before we are allowed to write to it officially
534-
* i.e. before it's active or the TLS Handshake is complete.
535-
*/
536-
public abstract static class AbstractBufferingHandler extends ChannelDuplexHandler {
537-
538-
private ChannelHandler[] handlers;
539-
private Queue<ChannelWrite> bufferedWrites = new ArrayDeque<>();
540-
private boolean writing;
541-
private boolean flushRequested;
542-
private Throwable failCause;
543-
544-
/**
545-
* @param handlers the ChannelHandlers are added to the pipeline on channelRegistered and
546-
* before this handler.
547-
*/
548-
protected AbstractBufferingHandler(ChannelHandler... handlers) {
549-
this.handlers = handlers;
550-
}
551-
552-
/**
553-
* When this channel is registered, we will add all the ChannelHandlers passed into our
554-
* constructor to the pipeline.
555-
*/
556-
@Override
557-
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
558-
/**
559-
* This check is necessary as a channel may be registered with different event loops during it
560-
* lifetime and we only want to configure it once.
561-
*/
562-
if (handlers != null && handlers.length > 0) {
563-
for (ChannelHandler handler : handlers) {
564-
ctx.pipeline().addBefore(ctx.name(), null, handler);
565-
}
566-
ChannelHandler handler0 = handlers[0];
567-
ChannelHandlerContext handler0Ctx = ctx.pipeline().context(handlers[0]);
568-
handlers = null;
569-
if (handler0Ctx != null) { // The handler may have removed itself immediately
570-
if (handler0 instanceof ChannelInboundHandler) {
571-
((ChannelInboundHandler) handler0).channelRegistered(handler0Ctx);
572-
} else {
573-
handler0Ctx.fireChannelRegistered();
574-
}
575-
}
576-
} else {
577-
super.channelRegistered(ctx);
578-
}
579-
}
580-
581-
/**
582-
* Do not rely on channel handlers to propagate exceptions to us.
583-
* {@link NettyClientHandler} is an example of a class that does not propagate exceptions.
584-
* Add a listener to the connect future directly and do appropriate error handling.
585-
*/
586-
@Override
587-
public void connect(final ChannelHandlerContext ctx, SocketAddress remoteAddress,
588-
SocketAddress localAddress, ChannelPromise promise) throws Exception {
589-
super.connect(ctx, remoteAddress, localAddress, promise);
590-
promise.addListener(new ChannelFutureListener() {
591-
@Override
592-
public void operationComplete(ChannelFuture future) throws Exception {
593-
if (!future.isSuccess()) {
594-
fail(ctx, future.cause());
595-
}
596-
}
597-
});
598-
}
599-
600-
/**
601-
* If we encounter an exception, then notify all buffered writes that we failed.
602-
*/
603-
@Override
604-
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
605-
fail(ctx, cause);
606-
}
607-
608-
/**
609-
* If this channel becomes inactive, then notify all buffered writes that we failed.
610-
*/
611-
@Override
612-
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
613-
fail(ctx, unavailableException("Connection broken while performing protocol negotiation"));
614-
super.channelInactive(ctx);
615-
}
616-
617-
/**
618-
* Buffers the write until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} is
619-
* called, or we have somehow failed. If we have already failed in the past, then the write
620-
* will fail immediately.
621-
*/
622-
@Override
623-
@SuppressWarnings("FutureReturnValueIgnored")
624-
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
625-
throws Exception {
626-
/**
627-
* This check handles a race condition between Channel.write (in the calling thread) and the
628-
* removal of this handler (in the event loop thread).
629-
* The problem occurs in e.g. this sequence:
630-
* 1) [caller thread] The write method identifies the context for this handler
631-
* 2) [event loop] This handler removes itself from the pipeline
632-
* 3) [caller thread] The write method delegates to the invoker to call the write method in
633-
* the event loop thread. When this happens, we identify that this handler has been
634-
* removed with "bufferedWrites == null".
635-
*/
636-
if (failCause != null) {
637-
promise.setFailure(failCause);
638-
ReferenceCountUtil.release(msg);
639-
} else if (bufferedWrites == null) {
640-
super.write(ctx, msg, promise);
641-
} else {
642-
bufferedWrites.add(new ChannelWrite(msg, promise));
643-
}
644-
}
645-
646-
/**
647-
* Calls to this method will not trigger an immediate flush. The flush will be deferred until
648-
* {@link #writeBufferedAndRemove(ChannelHandlerContext)}.
649-
*/
650-
@Override
651-
public void flush(ChannelHandlerContext ctx) {
652-
/**
653-
* Swallowing any flushes is not only an optimization but also required
654-
* for the SslHandler to work correctly. If the SslHandler receives multiple
655-
* flushes while the handshake is still ongoing, then the handshake "randomly"
656-
* times out. Not sure at this point why this is happening. Doing a single flush
657-
* seems to work but multiple flushes don't ...
658-
*/
659-
if (bufferedWrites == null) {
660-
ctx.flush();
661-
} else {
662-
flushRequested = true;
663-
}
664-
}
665-
666-
/**
667-
* If we are still performing protocol negotiation, then this will propagate failures to all
668-
* buffered writes.
669-
*/
670-
@Override
671-
public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
672-
if (ctx.channel().isActive()) { // This may be a notification that the socket was closed
673-
fail(ctx, unavailableException("Channel closed while performing protocol negotiation"));
674-
}
675-
super.close(ctx, future);
676-
}
677-
678-
/**
679-
* Propagate failures to all buffered writes.
680-
*/
681-
@SuppressWarnings("FutureReturnValueIgnored")
682-
protected final void fail(ChannelHandlerContext ctx, Throwable cause) {
683-
if (failCause == null) {
684-
failCause = cause;
685-
}
686-
if (bufferedWrites != null) {
687-
while (!bufferedWrites.isEmpty()) {
688-
ChannelWrite write = bufferedWrites.poll();
689-
write.promise.setFailure(cause);
690-
ReferenceCountUtil.release(write.msg);
691-
}
692-
bufferedWrites = null;
693-
}
694-
695-
ctx.fireExceptionCaught(cause);
696-
}
697-
698-
@SuppressWarnings("FutureReturnValueIgnored")
699-
protected final void writeBufferedAndRemove(ChannelHandlerContext ctx) {
700-
if (!ctx.channel().isActive() || writing) {
701-
return;
702-
}
703-
// Make sure that method can't be reentered, so that the ordering
704-
// in the queue can't be messed up.
705-
writing = true;
706-
while (!bufferedWrites.isEmpty()) {
707-
ChannelWrite write = bufferedWrites.poll();
708-
ctx.write(write.msg, write.promise);
709-
}
710-
assert bufferedWrites.isEmpty();
711-
bufferedWrites = null;
712-
if (flushRequested) {
713-
ctx.flush();
714-
}
715-
// Removal has to happen last as the above writes will likely trigger
716-
// new writes that have to be added to the end of queue in order to not
717-
// mess up the ordering.
718-
ctx.pipeline().remove(this);
719-
}
720-
721-
private static class ChannelWrite {
722-
Object msg;
723-
ChannelPromise promise;
724-
725-
ChannelWrite(Object msg, ChannelPromise promise) {
726-
this.msg = msg;
727-
this.promise = promise;
728-
}
729-
}
730-
}
731-
732530
/**
733531
* Adapts a {@link ProtocolNegotiationEvent} to the {@link GrpcHttp2ConnectionHandler}.
734532
*/

netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,10 @@
6767
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
6868
import io.netty.channel.Channel;
6969
import io.netty.channel.ChannelConfig;
70+
import io.netty.channel.ChannelDuplexHandler;
7071
import io.netty.channel.ChannelFactory;
7172
import io.netty.channel.ChannelHandler;
73+
import io.netty.channel.ChannelHandlerContext;
7274
import io.netty.channel.ChannelOption;
7375
import io.netty.channel.EventLoopGroup;
7476
import io.netty.channel.ReflectiveChannelFactory;
@@ -915,9 +917,21 @@ public String parse(InputStream stream) {
915917
}
916918
}
917919

918-
private static class NoopHandler extends ProtocolNegotiators.AbstractBufferingHandler {
920+
private static class NoopHandler extends ChannelDuplexHandler {
921+
922+
private final GrpcHttp2ConnectionHandler grpcHandler;
923+
919924
public NoopHandler(GrpcHttp2ConnectionHandler grpcHandler) {
920-
super(grpcHandler);
925+
this.grpcHandler = grpcHandler;
926+
}
927+
928+
@Override
929+
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
930+
ctx.pipeline().addBefore(ctx.name(), null, grpcHandler);
931+
}
932+
933+
public void fail(ChannelHandlerContext ctx, Throwable cause) {
934+
ctx.fireExceptionCaught(cause);
921935
}
922936
}
923937

0 commit comments

Comments
 (0)