Skip to content

Commit ba0fd84

Browse files
anarsultanovejona86
authored andcommitted
netty: Rely on ChannelFactory in NettyServer instead of dynamic classes
Fixes grpc#5649
1 parent fd4f189 commit ba0fd84

File tree

6 files changed

+70
-31
lines changed

6 files changed

+70
-31
lines changed

netty/src/main/java/io/grpc/netty/NettyServer.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import io.grpc.internal.TransportTracer;
3939
import io.netty.bootstrap.ServerBootstrap;
4040
import io.netty.channel.Channel;
41+
import io.netty.channel.ChannelFactory;
4142
import io.netty.channel.ChannelFuture;
4243
import io.netty.channel.ChannelFutureListener;
4344
import io.netty.channel.ChannelInitializer;
@@ -66,7 +67,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
6667

6768
private final InternalLogId logId;
6869
private final SocketAddress address;
69-
private final Class<? extends ServerChannel> channelType;
70+
private final ChannelFactory<? extends ServerChannel> channelFactory;
7071
private final Map<ChannelOption<?>, ?> channelOptions;
7172
private final ProtocolNegotiator protocolNegotiator;
7273
private final int maxStreamsPerConnection;
@@ -95,7 +96,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
9596
new AtomicReference<>();
9697

9798
NettyServer(
98-
SocketAddress address, Class<? extends ServerChannel> channelType,
99+
SocketAddress address, ChannelFactory<? extends ServerChannel> channelFactory,
99100
Map<ChannelOption<?>, ?> channelOptions,
100101
ObjectPool<? extends EventLoopGroup> bossGroupPool,
101102
ObjectPool<? extends EventLoopGroup> workerGroupPool,
@@ -109,7 +110,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
109110
boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos,
110111
InternalChannelz channelz) {
111112
this.address = address;
112-
this.channelType = checkNotNull(channelType, "channelType");
113+
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
113114
checkNotNull(channelOptions, "channelOptions");
114115
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
115116
this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool");
@@ -155,7 +156,7 @@ public void start(ServerListener serverListener) throws IOException {
155156

156157
ServerBootstrap b = new ServerBootstrap();
157158
b.group(bossGroup, workerGroup);
158-
b.channel(channelType);
159+
b.channelFactory(channelFactory);
159160
// For non-socket based channel, the option will be ignored.
160161
b.option(SO_BACKLOG, 128);
161162
b.childOption(SO_KEEPALIVE, true);
@@ -170,7 +171,7 @@ public void start(ServerListener serverListener) throws IOException {
170171

171172
b.childHandler(new ChannelInitializer<Channel>() {
172173
@Override
173-
public void initChannel(Channel ch) throws Exception {
174+
public void initChannel(Channel ch) {
174175

175176
ChannelPromise channelDone = ch.newPromise();
176177

@@ -217,7 +218,7 @@ public void initChannel(Channel ch) throws Exception {
217218
* Releases the event loop if the channel is "done", possibly due to the channel closing.
218219
*/
219220
final class LoopReleaser implements ChannelFutureListener {
220-
boolean done;
221+
private boolean done;
221222

222223
@Override
223224
public void operationComplete(ChannelFuture future) throws Exception {

netty/src/main/java/io/grpc/netty/NettyServerBuilder.java

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
2626

2727
import com.google.common.annotations.VisibleForTesting;
28-
import com.google.common.base.Preconditions;
2928
import com.google.errorprone.annotations.CanIgnoreReturnValue;
3029
import io.grpc.ExperimentalApi;
3130
import io.grpc.Internal;
@@ -36,8 +35,10 @@
3635
import io.grpc.internal.KeepAliveManager;
3736
import io.grpc.internal.ObjectPool;
3837
import io.grpc.internal.SharedResourcePool;
38+
import io.netty.channel.ChannelFactory;
3939
import io.netty.channel.ChannelOption;
4040
import io.netty.channel.EventLoopGroup;
41+
import io.netty.channel.ReflectiveChannelFactory;
4142
import io.netty.channel.ServerChannel;
4243
import io.netty.channel.socket.nio.NioServerSocketChannel;
4344
import io.netty.handler.ssl.SslContext;
@@ -79,7 +80,9 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
7980
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
8081

8182
private final List<SocketAddress> listenAddresses = new ArrayList<>();
82-
private Class<? extends ServerChannel> channelType = null;
83+
84+
private ChannelFactory<? extends ServerChannel> channelFactory =
85+
Utils.DEFAULT_SERVER_CHANNEL_FACTORY;
8386
private final Map<ChannelOption<?>, Object> channelOptions = new HashMap<>();
8487
private ObjectPool<? extends EventLoopGroup> bossEventLoopGroupPool =
8588
DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL;
@@ -91,7 +94,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
9194
private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
9295
private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
9396
private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE;
94-
private long keepAliveTimeInNanos = DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
97+
private long keepAliveTimeInNanos = DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
9598
private long keepAliveTimeoutInNanos = DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS;
9699
private long maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED;
97100
private long maxConnectionAgeInNanos = MAX_CONNECTION_AGE_NANOS_DISABLED;
@@ -142,16 +145,41 @@ public NettyServerBuilder addListenAddress(SocketAddress listenAddress) {
142145
}
143146

144147
/**
145-
* Specify the channel type to use, by default we use {@link NioServerSocketChannel} or {@code
146-
* EpollServerSocketChannel}.
148+
* Specifies the channel type to use, by default we use {@code EpollServerSocketChannel} if
149+
* available, otherwise using {@link NioServerSocketChannel}.
150+
*
151+
* <p>You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
152+
* {@link ServerChannel} implementation has no no-args constructor.
153+
*
154+
* <p>It's an optional parameter. If the user has not provided an Channel type or ChannelFactory
155+
* when the channel is built, the builder will use the default one which is static.
147156
*
148157
* <p>You must also provide corresponding {@link EventLoopGroup} using {@link
149158
* #workerEventLoopGroup(EventLoopGroup)} and {@link #bossEventLoopGroup(EventLoopGroup)}. For
150159
* example, {@link NioServerSocketChannel} must use {@link
151160
* io.netty.channel.nio.NioEventLoopGroup}, otherwise your server won't start.
152161
*/
153162
public NettyServerBuilder channelType(Class<? extends ServerChannel> channelType) {
154-
this.channelType = Preconditions.checkNotNull(channelType, "channelType");
163+
checkNotNull(channelType, "channelType");
164+
return channelFactory(new ReflectiveChannelFactory<>(channelType));
165+
}
166+
167+
/**
168+
* Specifies the {@link ChannelFactory} to create {@link ServerChannel} instances. This method is
169+
* usually only used if the specific {@code ServerChannel} requires complex logic which requires
170+
* additional information to create the {@code ServerChannel}. Otherwise, recommend to use {@link
171+
* #channelType(Class)}.
172+
*
173+
* <p>It's an optional parameter. If the user has not provided an Channel type or ChannelFactory
174+
* when the channel is built, the builder will use the default one which is static.
175+
*
176+
* <p>You must also provide corresponding {@link EventLoopGroup} using {@link
177+
* #workerEventLoopGroup(EventLoopGroup)} and {@link #bossEventLoopGroup(EventLoopGroup)}. For
178+
* example, if the factory creates {@link NioServerSocketChannel} you must use {@link
179+
* io.netty.channel.nio.NioEventLoopGroup}, otherwise your server won't start.
180+
*/
181+
public NettyServerBuilder channelFactory(ChannelFactory<? extends ServerChannel> channelFactory) {
182+
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
155183
return this;
156184
}
157185

@@ -499,16 +527,13 @@ protected List<NettyServer> buildTransportServers(
499527
ProtocolNegotiator negotiator = protocolNegotiator;
500528
if (negotiator == null) {
501529
negotiator = sslContext != null ? ProtocolNegotiators.serverTls(sslContext) :
502-
ProtocolNegotiators.serverPlaintext();
530+
ProtocolNegotiators.serverPlaintext();
503531
}
504532

505-
Class<? extends ServerChannel> resolvedChannelType =
506-
channelType == null ? Utils.DEFAULT_SERVER_CHANNEL_TYPE : channelType;
507-
508533
List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size());
509534
for (SocketAddress listenAddress : listenAddresses) {
510535
NettyServer transportServer = new NettyServer(
511-
listenAddress, resolvedChannelType, channelOptions, bossEventLoopGroupPool,
536+
listenAddress, channelFactory, channelOptions, bossEventLoopGroupPool,
512537
workerEventLoopGroupPool, negotiator, streamTracerFactories,
513538
getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
514539
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
@@ -521,10 +546,10 @@ protected List<NettyServer> buildTransportServers(
521546

522547
@VisibleForTesting
523548
void assertEventLoopsAndChannelType() {
524-
boolean allProvided = channelType != null
549+
boolean allProvided = channelFactory != Utils.DEFAULT_SERVER_CHANNEL_FACTORY
525550
&& bossEventLoopGroupPool != DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL
526551
&& workerEventLoopGroupPool != DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL;
527-
boolean nonProvided = channelType == null
552+
boolean nonProvided = channelFactory == Utils.DEFAULT_SERVER_CHANNEL_FACTORY
528553
&& bossEventLoopGroupPool == DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL
529554
&& workerEventLoopGroupPool == DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL;
530555
checkState(

netty/src/main/java/io/grpc/netty/Utils.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@
3636
import io.grpc.netty.NettySocketSupport.NativeSocketOptions;
3737
import io.netty.channel.Channel;
3838
import io.netty.channel.ChannelConfig;
39+
import io.netty.channel.ChannelFactory;
3940
import io.netty.channel.ChannelOption;
4041
import io.netty.channel.EventLoopGroup;
42+
import io.netty.channel.ReflectiveChannelFactory;
4143
import io.netty.channel.ServerChannel;
4244
import io.netty.channel.nio.NioEventLoopGroup;
4345
import io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -81,7 +83,7 @@ class Utils {
8183
public static final Resource<EventLoopGroup> DEFAULT_BOSS_EVENT_LOOP_GROUP;
8284
public static final Resource<EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP;
8385

84-
public static final Class<? extends ServerChannel> DEFAULT_SERVER_CHANNEL_TYPE;
86+
public static final ChannelFactory<? extends ServerChannel> DEFAULT_SERVER_CHANNEL_FACTORY;
8587
public static final Class<? extends Channel> DEFAULT_CLIENT_CHANNEL_TYPE;
8688

8789
@Nullable
@@ -90,16 +92,16 @@ class Utils {
9092
static {
9193
// Decide default channel types and EventLoopGroup based on Epoll availability
9294
if (isEpollAvailable()) {
93-
DEFAULT_SERVER_CHANNEL_TYPE = epollServerChannelType();
9495
DEFAULT_CLIENT_CHANNEL_TYPE = epollChannelType();
96+
DEFAULT_SERVER_CHANNEL_FACTORY = new ReflectiveChannelFactory<>(epollServerChannelType());
9597
EPOLL_EVENT_LOOP_GROUP_CONSTRUCTOR = epollEventLoopGroupConstructor();
9698
DEFAULT_BOSS_EVENT_LOOP_GROUP
9799
= new DefaultEventLoopGroupResource(1, "grpc-default-boss-ELG", EventLoopGroupType.EPOLL);
98100
DEFAULT_WORKER_EVENT_LOOP_GROUP
99101
= new DefaultEventLoopGroupResource(0,"grpc-default-worker-ELG", EventLoopGroupType.EPOLL);
100102
} else {
101103
logger.log(Level.FINE, "Epoll is not available, using Nio.", getEpollUnavailabilityCause());
102-
DEFAULT_SERVER_CHANNEL_TYPE = NioServerSocketChannel.class;
104+
DEFAULT_SERVER_CHANNEL_FACTORY = nioServerChannelFactory();
103105
DEFAULT_CLIENT_CHANNEL_TYPE = NioSocketChannel.class;
104106
DEFAULT_BOSS_EVENT_LOOP_GROUP = NIO_BOSS_EVENT_LOOP_GROUP;
105107
DEFAULT_WORKER_EVENT_LOOP_GROUP = NIO_WORKER_EVENT_LOOP_GROUP;
@@ -290,6 +292,15 @@ private static EventLoopGroup createEpollEventLoopGroup(
290292
}
291293
}
292294

295+
private static ChannelFactory<ServerChannel> nioServerChannelFactory() {
296+
return new ChannelFactory<ServerChannel>() {
297+
@Override
298+
public ServerChannel newChannel() {
299+
return new NioServerSocketChannel();
300+
}
301+
};
302+
}
303+
293304
/**
294305
* Returns TCP_USER_TIMEOUT channel option for Epoll channel if Epoll is available, otherwise
295306
* null.

netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,7 @@ private void startServer() throws IOException {
719719
private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException {
720720
server = new NettyServer(
721721
TestUtils.testServerAddress(new InetSocketAddress(0)),
722-
NioServerSocketChannel.class,
722+
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
723723
new HashMap<ChannelOption<?>, Object>(),
724724
new FixedObjectPool<>(group), new FixedObjectPool<>(group), negotiator,
725725
Collections.<ServerStreamTracer.Factory>emptyList(),

netty/src/test/java/io/grpc/netty/NettyServerTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void getPort() throws Exception {
5858
InetSocketAddress addr = new InetSocketAddress(0);
5959
NettyServer ns = new NettyServer(
6060
addr,
61-
Utils.DEFAULT_SERVER_CHANNEL_TYPE,
61+
Utils.DEFAULT_SERVER_CHANNEL_FACTORY,
6262
new HashMap<ChannelOption<?>, Object>(),
6363
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
6464
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
@@ -96,7 +96,7 @@ public void getPort_notStarted() throws Exception {
9696
InetSocketAddress addr = new InetSocketAddress(0);
9797
NettyServer ns = new NettyServer(
9898
addr,
99-
Utils.DEFAULT_SERVER_CHANNEL_TYPE,
99+
Utils.DEFAULT_SERVER_CHANNEL_FACTORY,
100100
new HashMap<ChannelOption<?>, Object>(),
101101
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
102102
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
@@ -134,7 +134,7 @@ public void childChannelOptions() throws Exception {
134134
InetSocketAddress addr = new InetSocketAddress(0);
135135
NettyServer ns = new NettyServer(
136136
addr,
137-
Utils.DEFAULT_SERVER_CHANNEL_TYPE,
137+
Utils.DEFAULT_SERVER_CHANNEL_FACTORY,
138138
channelOptions,
139139
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
140140
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
@@ -184,7 +184,7 @@ public void channelzListenSocket() throws Exception {
184184
InetSocketAddress addr = new InetSocketAddress(0);
185185
NettyServer ns = new NettyServer(
186186
addr,
187-
Utils.DEFAULT_SERVER_CHANNEL_TYPE,
187+
Utils.DEFAULT_SERVER_CHANNEL_FACTORY,
188188
new HashMap<ChannelOption<?>, Object>(),
189189
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
190190
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),

netty/src/test/java/io/grpc/netty/UtilsTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@
2929
import io.grpc.Status;
3030
import io.grpc.internal.GrpcUtil;
3131
import io.netty.channel.Channel;
32+
import io.netty.channel.ChannelFactory;
3233
import io.netty.channel.ChannelOption;
3334
import io.netty.channel.ConnectTimeoutException;
3435
import io.netty.channel.EventLoopGroup;
36+
import io.netty.channel.ServerChannel;
3537
import io.netty.channel.WriteBufferWaterMark;
3638
import io.netty.channel.embedded.EmbeddedChannel;
3739
import io.netty.channel.socket.nio.NioSocketChannel;
@@ -204,13 +206,13 @@ public void defaultClientChannelType_whenEpollIsAvailable() {
204206
}
205207

206208
@Test
207-
public void defaultServerChannelType_whenEpollIsAvailable() {
209+
public void defaultServerChannelFactory_whenEpollIsAvailable() {
208210
assume().that(Utils.isEpollAvailable()).isTrue();
209211

210-
Class<? extends Channel> clientChannelType = Utils.DEFAULT_SERVER_CHANNEL_TYPE;
212+
ChannelFactory<? extends ServerChannel> channelFactory = Utils.DEFAULT_SERVER_CHANNEL_FACTORY;
211213

212-
assertThat(clientChannelType.getName())
213-
.isEqualTo("io.netty.channel.epoll.EpollServerSocketChannel");
214+
assertThat(channelFactory.toString())
215+
.isEqualTo("ReflectiveChannelFactory(EpollServerSocketChannel.class)");
214216
}
215217

216218
@Test

0 commit comments

Comments
 (0)