-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathInputLoop.java
268 lines (230 loc) · 8.97 KB
/
InputLoop.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
package org.logstash.tcp;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
/**
* Plain TCP Server Implementation.
*/
public final class InputLoop implements Runnable, Closeable {
// historically this class was passing around the plugin's logger
private static final Logger logger = LogManager.getLogger("logstash.inputs.tcp");
/**
* Netty Boss Group.
*/
private final EventLoopGroup boss;
/**
* Netty Worker Group.
*/
private final EventLoopGroup worker;
/**
* The Server Bootstrap
*/
private final ServerBootstrap serverBootstrap;
/**
* SSL configuration.
*/
private final SslContext sslContext;
/**
* TCP Port.
*/
private final int port;
/**
* TCP Host.
*/
private final String host;
/**
* Ctor.
* @param host Host to bind the listen to
* @param port Port to listen on
* @param decoder {@link Decoder} provided by Jruby
* @param keepAlive set to true to instruct the socket to issue TCP keep alive
*/
public InputLoop(final String host, final int port, final Decoder decoder, final boolean keepAlive,
final SslContext sslContext) {
this.sslContext = sslContext;
this.host = host;
this.port = port;
worker = new NioEventLoopGroup();
boss = new NioEventLoopGroup(1);
serverBootstrap = new ServerBootstrap().group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, keepAlive)
.childHandler(new InputLoop.InputHandler(decoder, sslContext));
}
@Override
public void run() {
try {
serverBootstrap.bind(host, port).sync().channel().closeFuture().sync();
} catch (final InterruptedException ex) {
throw new IllegalStateException(ex);
}
}
@Override
public void close() {
try {
// Shut down boss first otherwise new connections
// will be passed to a closed worker loop, triggering:
// RejectedExecutionException: event executor terminated
boss.shutdownGracefully().sync();
worker.shutdownGracefully().sync();
} catch (final InterruptedException ex) {
throw new IllegalStateException(ex);
}
}
/**
* {@link ChannelInitializer} configuring client channels to forward all data to given
* {@link Decoder}.
*/
private static final class InputHandler extends ChannelInitializer<SocketChannel> {
private final String SSL_HANDLER = "ssl-handler";
/**
* {@link Decoder} supplied by JRuby.
*/
private final Decoder decoder;
/**
* SSL configuration options.
*/
private final SslContext sslContext;
/**
* Ctor.
* @param decoder {@link Decoder} provided by JRuby.
*/
InputHandler(final Decoder decoder, final SslContext sslContext) {
this.decoder = decoder;
this.sslContext = sslContext;
}
@Override
protected void initChannel(final SocketChannel channel) throws Exception {
Decoder localCopy = decoder.copy();
// if SSL is enabled, the SSL handler must be added to the pipeline FIRST
if (sslContext != null) {
channel.pipeline().addFirst(SSL_HANDLER, sslContext.newHandler(channel.alloc()));
}
channel.pipeline().addLast(new DecoderAdapter(localCopy, logger));
// disable AUTO_READ and use ThrottleReleaseHandler as LAST handler
channel.config().setAutoRead(false);
channel.pipeline().addLast(new ThrottleReleaseHandler());
channel.closeFuture().addListener(new FlushOnCloseListener(localCopy));
if (logger.isDebugEnabled()) {
logger.debug(remoteChannelInfo(channel) + ": initialized channel");
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("Error in Netty input handler: " + cause);
super.exceptionCaught(ctx, cause);
}
/**
* This {@link ThrottleReleaseHandler} is a handler that marks the channel eligible for
* reading when the channel first becomes active or has completed a read operation, and
* is what enables this plugin to apply TCP back-pressure when it is blocked instead of
* reading bytes into buffers that will vanish when we OOM.
*
* <p>It requires the channel to be configured <em>without</em> {@code AUTO_READ}</p>
*/
private static final class ThrottleReleaseHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ctx.channel().read();
}
@Override
public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
ctx.channel().read();
}
}
/**
* Listeners that flushes the the JRuby supplied {@link Decoder} when the socket is closed.
*/
private static final class FlushOnCloseListener implements GenericFutureListener<Future<Void>> {
/**
* {@link Decoder} supplied by JRuby.
*/
private final Decoder decoder;
/**
* Ctor.
* @param decoder {@link Decoder} provided by JRuby.
*/
FlushOnCloseListener(Decoder decoder) { this.decoder = decoder; }
@Override
public void operationComplete(Future future) throws Exception {
decoder.flush();
}
}
/**
* Adapter that wraps the JRuby supplied {@link Decoder}.
*/
private static final class DecoderAdapter extends ChannelInboundHandlerAdapter {
/**
* {@link Decoder} provided by JRuby.
*/
private final Decoder decoder;
/**
* Reference to the logger.
*/
private final Logger logger;
/**
* Ctor.
* @param decoder {@link Decoder} provided by JRuby.
*/
DecoderAdapter(final Decoder decoder, Logger logger) {
this.logger = logger;
this.decoder = decoder;
}
// 6.07 updated to pass in the full netty ChannelHandlerContext instead of the remoteaddress field
// corresponding interface updated
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
decoder.decode(ctx, (ByteBuf) msg);
}
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
final String channelInfo = remoteChannelInfo(ctx.channel());
if (silentException(cause)) {
if (logger.isDebugEnabled()) {
logger.debug(channelInfo + ": closing", cause);
} else {
logger.info("{}: closing ({})", channelInfo, cause.getMessage());
}
} else {
logger.error(channelInfo + ": closing due:", cause);
}
ctx.close();
}
private boolean silentException(final Throwable ex) {
if (ex instanceof IOException) {
final String message = ex.getMessage();
if ("Connection reset by peer".equals(message)) {
return true;
}
}
return false;
}
}
}
private static String remoteChannelInfo(final Channel channel) {
final InetSocketAddress remote = ((InetSocketAddress) channel.remoteAddress());
if (remote != null) {
return remote.getAddress() + ":" + remote.getPort();
}
return null;
}
}