From aa58877412147defcc2c1002d9f13a2f745b8a99 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Thu, 8 Aug 2024 16:38:54 -0700 Subject: [PATCH 1/8] Fix the abnormal case where codec is not available to accept the event due to connection termination. Improve plugin shutdown process by separating netty acceptor and child executor groups, then terminating by order. --- CHANGELOG.md | 4 ++++ lib/logstash/inputs/beats/message_listener.rb | 17 ++++++++++++----- spec/inputs/beats/message_listener_spec.rb | 18 ++++++++++++++++++ 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 770e5e28..e7860141 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 6.9.0 + - Fix: avoid plugin crash when connection terminated but processing the message [#500](https://github.com/logstash-plugins/logstash-input-beats/pull/500) + - Improve plugin shutdown process by separating netty acceptor and child groups. + ## 6.8.4 - Fixed to populate the `@metadata` fields even if the source's metadata value is `nil` [#502](https://github.com/logstash-plugins/logstash-input-beats/pull/502) diff --git a/lib/logstash/inputs/beats/message_listener.rb b/lib/logstash/inputs/beats/message_listener.rb index a5116f4a..4869a10a 100644 --- a/lib/logstash/inputs/beats/message_listener.rb +++ b/lib/logstash/inputs/beats/message_listener.rb @@ -49,11 +49,18 @@ def onNewMessage(ctx, message) @nocodec_transformer.transform(event) @queue << event else - codec(ctx).accept(CodecCallbackListener.new(target_field, - hash, - message.getIdentityStream(), - @codec_transformer, - @queue)) + current_codec = codec(ctx) + if current_codec + current_codec.accept(CodecCallbackListener.new(target_field, + hash, + message.getIdentityStream(), + @codec_transformer, + @queue)) + else + # the possible cases: connection closed or exception with a connection + # let client retry + input.logger.warn("No codec available to process the message. This may due to connection termination and message was being processed.") + end end end diff --git a/spec/inputs/beats/message_listener_spec.rb b/spec/inputs/beats/message_listener_spec.rb index e96aa621..5b51daf3 100644 --- a/spec/inputs/beats/message_listener_spec.rb +++ b/spec/inputs/beats/message_listener_spec.rb @@ -218,6 +218,24 @@ def flush(&block) end end + context "when connection is terminated" do + let(:message) { MockMessage.new("message from Mars", { "message" => "hello world", "@metadata" => {} } )} + + it "handles without crashing" do + subject.onConnectionClose(ctx) + expect(subject.connections_list[ctx]).to be nil + + expect( input.logger ).to receive(:warn) do |message | + expect( message ).to match /No codec available to process the message. This may due to connection termination and message was being processed./ + end + + subject.onNewMessage(ctx, message) + # doesn't push to queue, so should be nil + event = queue.pop + expect(event.get("message")).to be nil + end + end + it_behaves_like "when the message is from any libbeat", :disabled, "[@metadata][ip_address]" it_behaves_like "when the message is from any libbeat", :v1, "[@metadata][input][beats][host][ip]" it_behaves_like "when the message is from any libbeat", :v8, "[@metadata][input][beats][host][ip]" From 01be0be476a662e4dc31c4d404b6c92db61d217f Mon Sep 17 00:00:00 2001 From: Mashhur Date: Thu, 15 Aug 2024 09:50:37 -0700 Subject: [PATCH 2/8] Improving gracefully shutdown process in a number of better ways: close boss group (incoming socket handler), ask and wait for the executors which have pending tasks, ignore message handling when executors terminated. --- .../inputs/beats/codec_callback_listener.rb | 4 +- .../java/org/logstash/beats/BeatsHandler.java | 88 ++++++++++++------- src/main/java/org/logstash/beats/Server.java | 37 +++++++- 3 files changed, 91 insertions(+), 38 deletions(-) diff --git a/lib/logstash/inputs/beats/codec_callback_listener.rb b/lib/logstash/inputs/beats/codec_callback_listener.rb index d40236bd..b37c68a3 100644 --- a/lib/logstash/inputs/beats/codec_callback_listener.rb +++ b/lib/logstash/inputs/beats/codec_callback_listener.rb @@ -2,12 +2,12 @@ require "logstash/inputs/beats" module LogStash module Inputs class Beats - # Use the new callback based approch instead of using blocks + # Use the new callback based approach instead of using blocks # so we can retain some context of the execution, and make it easier to test class CodecCallbackListener attr_accessor :data # The path acts as the `stream_identity`, - # usefull when the clients is reading multiples files + # useful when the clients is reading multiples files attr_accessor :path def initialize(data, hash, path, transformer, queue) diff --git a/src/main/java/org/logstash/beats/BeatsHandler.java b/src/main/java/org/logstash/beats/BeatsHandler.java index 15dfb7e9..b30ca300 100644 --- a/src/main/java/org/logstash/beats/BeatsHandler.java +++ b/src/main/java/org/logstash/beats/BeatsHandler.java @@ -7,13 +7,19 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Objects; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLHandshakeException; public class BeatsHandler extends SimpleChannelInboundHandler { private final static Logger logger = LogManager.getLogger(BeatsHandler.class); + private final static String executorTerminatedMessage = "event executor terminated"; + private final IMessageListener messageListener; private ChannelHandlerContext context; + private final AtomicBoolean isQuitePeriod = new AtomicBoolean(false); public BeatsHandler(IMessageListener listener) { messageListener = listener; @@ -21,8 +27,8 @@ public BeatsHandler(IMessageListener listener) { @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { - context = ctx; - if (logger.isTraceEnabled()){ + context = ctx; + if (logger.isTraceEnabled()) { logger.trace(format("Channel Active")); } super.channelActive(ctx); @@ -32,7 +38,7 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); - if (logger.isTraceEnabled()){ + if (logger.isTraceEnabled()) { logger.trace(format("Channel Inactive")); } messageListener.onConnectionClose(ctx); @@ -41,29 +47,22 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { @Override public void channelRead0(ChannelHandlerContext ctx, Batch batch) { - if(logger.isDebugEnabled()) { + if (logger.isDebugEnabled()) { logger.debug(format("Received a new payload")); } try { - if (batch.isEmpty()) { - logger.debug("Sending 0-seq ACK for empty batch"); - writeAck(ctx, batch.getProtocol(), 0); - } - for (Message message : batch) { + if (isQuitePeriod.get()) { if (logger.isDebugEnabled()) { - logger.debug(format("Sending a new message for the listener, sequence: " + message.getSequence())); - } - messageListener.onNewMessage(ctx, message); - - if (needAck(message)) { - ack(ctx, message); + logger.debug(format("Received batch but no executors available, ignoring...")); } + } else { + processBatchAndSendAck(ctx, batch); } - }finally{ + } finally { //this channel is done processing this payload, instruct the connection handler to stop sending TCP keep alive ctx.channel().attr(ConnectionHandler.CHANNEL_SEND_KEEP_ALIVE).get().set(false); if (logger.isDebugEnabled()) { - logger.debug("{}: batches pending: {}", ctx.channel().id().asShortText(),ctx.channel().attr(ConnectionHandler.CHANNEL_SEND_KEEP_ALIVE).get().get()); + logger.debug("{}: batches pending: {}", ctx.channel().id().asShortText(), ctx.channel().attr(ConnectionHandler.CHANNEL_SEND_KEEP_ALIVE).get().get()); } batch.release(); ctx.flush(); @@ -93,12 +92,20 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E } } else { final Throwable realCause = extractCause(cause, 0); - if (logger.isDebugEnabled()){ + if (logger.isDebugEnabled()) { logger.info(format("Handling exception: " + cause + " (caused by: " + realCause + ")"), cause); } else { logger.info(format("Handling exception: " + cause + " (caused by: " + realCause + ")")); } - super.exceptionCaught(ctx, cause); + // when execution tasks rejected, no need to forward the exception to netty channel handlers + if (cause instanceof RejectedExecutionException) { + // we no longer have event executors available since they are terminated, mostly by shutdown process + if (Objects.nonNull(cause.getMessage()) && cause.getMessage().contains(executorTerminatedMessage)) { + this.isQuitePeriod.compareAndSet(false, true); + } + } else { + super.exceptionCaught(ctx, cause); + } } } finally { ctx.flush(); @@ -106,6 +113,26 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E } } + private void processBatchAndSendAck(ChannelHandlerContext ctx, Batch batch) { + if (batch.isEmpty()) { + logger.debug("Sending 0-seq ACK for empty batch"); + writeAck(ctx, batch.getProtocol(), 0); + } + for (Message message : batch) { + if (logger.isDebugEnabled()) { + logger.debug(format("Sending a new message for the listener, sequence: " + message.getSequence())); + } + messageListener.onNewMessage(ctx, message); + + if (needAck(message)) { + if (logger.isTraceEnabled()) { + logger.trace(format("Acking message number " + message.getSequence())); + } + writeAck(ctx, message.getBatch().getProtocol(), message.getSequence()); + } + } + } + private boolean isNoisyException(final Throwable ex) { if (ex instanceof IOException) { final String message = ex.getMessage(); @@ -120,13 +147,6 @@ private boolean needAck(Message message) { return message.getSequence() == message.getBatch().getHighestSequence(); } - private void ack(ChannelHandlerContext ctx, Message message) { - if (logger.isTraceEnabled()){ - logger.trace(format("Acking message number " + message.getSequence())); - } - writeAck(ctx, message.getBatch().getProtocol(), message.getSequence()); - } - private void writeAck(ChannelHandlerContext ctx, byte protocol, int sequence) { ctx.write(new Ack(protocol, sequence)); } @@ -140,20 +160,20 @@ private String format(String message) { InetSocketAddress remote = (InetSocketAddress) context.channel().remoteAddress(); String localhost; - if(local != null) { + if (local != null) { localhost = local.getAddress().getHostAddress() + ":" + local.getPort(); - } else{ + } else { localhost = "undefined"; } - String remotehost; - if(remote != null) { - remotehost = remote.getAddress().getHostAddress() + ":" + remote.getPort(); - } else{ - remotehost = "undefined"; + String remoteHost; + if (remote != null) { + remoteHost = remote.getAddress().getHostAddress() + ":" + remote.getPort(); + } else { + remoteHost = "undefined"; } - return "[local: " + localhost + ", remote: " + remotehost + "] " + message; + return "[local: " + localhost + ", remote: " + remoteHost + "] " + message; } private static final int MAX_CAUSE_NESTING = 10; diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index 492ae297..36cc43aa 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -11,7 +11,9 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.SingleThreadEventExecutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.logstash.netty.SslHandlerProvider; @@ -23,6 +25,7 @@ public class Server { private final String host; private final int eventLoopThreadCount; private final int executorThreadCount; + private NioEventLoopGroup bossGroup; private NioEventLoopGroup workGroup; private IMessageListener messageListener = new MessageListener(); private SslHandlerProvider sslHandlerProvider; @@ -51,6 +54,7 @@ public Server listen() throws InterruptedException { logger.error("Could not shut down worker group before starting", e); } } + bossGroup = new NioEventLoopGroup(eventLoopThreadCount); // TODO: add a config to make it adjustable, no need many threads workGroup = new NioEventLoopGroup(eventLoopThreadCount); try { logger.info("Starting server on port: {}", this.port); @@ -58,7 +62,7 @@ public Server listen() throws InterruptedException { beatsInitializer = new BeatsInitializer(messageListener, clientInactivityTimeoutSeconds, executorThreadCount); ServerBootstrap server = new ServerBootstrap(); - server.group(workGroup) + server.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.SO_LINGER, 0) // Since the protocol doesn't support yet a remote close from the server and we don't want to have 'unclosed' socket lying around we have to use `SO_LINGER` to force the close of the socket. .childHandler(beatsInitializer); @@ -83,13 +87,30 @@ public void stop() { } private void shutdown() { + // as much as possible we try to gracefully shut down + // no longer accept incoming socket connections + // with event loop group (workGroup) declares quite period with `shutdownGracefully` which queued tasks will not receive work for the best cases + // executor group threads will be asked and waited to gracefully shutdown if they have pending tasks. This helps each individual handler process the event/exception, especially when multichannel use case. + // there is no guarantee that executor threads will terminate during the shutdown because of many factors such as ack processing + // so any pending tasks which send batches to BeatsHandler will be ignored try { + // boss group shuts down socket connections + // shutting down bossGroup separately gives us faster channel closure + if (bossGroup != null) { + bossGroup.shutdownGracefully().sync(); + } + if (workGroup != null) { - workGroup.shutdownGracefully().sync(); + workGroup.shutdownGracefully(); } if (beatsInitializer != null) { beatsInitializer.shutdownEventExecutor(); } + // we wait for workGroup shutdown this time + if (workGroup != null && !workGroup.isShutdown()) { + // calling again shutdownGracefully doesn't hurt that it returns terminationFuture if shutdown is under process + workGroup.shutdownGracefully().sync(); + } } catch (InterruptedException e) { throw new IllegalStateException(e); } @@ -160,6 +181,18 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E public void shutdownEventExecutor() { try { idleExecutorGroup.shutdownGracefully().sync(); + + // DefaultEventExecutorGroup internally executes numbers of SingletonEventExecutor + // try to gracefully shut down every thread if they have unacked pending batches (pending tasks) + for (final EventExecutor eventExecutor : beatsHandlerExecutorGroup) { + if (eventExecutor instanceof SingleThreadEventExecutor) { + final SingleThreadEventExecutor singleExecutor = (SingleThreadEventExecutor) eventExecutor; + if (singleExecutor.pendingTasks() > 0) { + singleExecutor.shutdownGracefully().sync(); + } + } + } + // make sure non-pending tasked executors get terminated beatsHandlerExecutorGroup.shutdownGracefully().sync(); } catch (InterruptedException e) { throw new IllegalStateException(e); From 5a77a31dc0ac6ac36982395003756abd306f410e Mon Sep 17 00:00:00 2001 From: Mashhur Date: Thu, 15 Aug 2024 09:53:58 -0700 Subject: [PATCH 3/8] Version up and add a descriptive changelog. --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index 9ffc8cfb..97f57815 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -6.8.4 +6.9.0 From ca69fab84b5af4778dd2389757add83526db2432 Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Tue, 20 Aug 2024 09:33:29 -0700 Subject: [PATCH 4/8] Apply suggestions from code review Changelog notes revise suggestion and misspelling quiet var correction. Co-authored-by: Andrea Selva --- src/main/java/org/logstash/beats/BeatsHandler.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/logstash/beats/BeatsHandler.java b/src/main/java/org/logstash/beats/BeatsHandler.java index b30ca300..8768c2a2 100644 --- a/src/main/java/org/logstash/beats/BeatsHandler.java +++ b/src/main/java/org/logstash/beats/BeatsHandler.java @@ -19,7 +19,7 @@ public class BeatsHandler extends SimpleChannelInboundHandler { private final IMessageListener messageListener; private ChannelHandlerContext context; - private final AtomicBoolean isQuitePeriod = new AtomicBoolean(false); + private final AtomicBoolean isQuietPeriod = new AtomicBoolean(false); public BeatsHandler(IMessageListener listener) { messageListener = listener; @@ -51,7 +51,7 @@ public void channelRead0(ChannelHandlerContext ctx, Batch batch) { logger.debug(format("Received a new payload")); } try { - if (isQuitePeriod.get()) { + if (isQuietPeriod.get()) { if (logger.isDebugEnabled()) { logger.debug(format("Received batch but no executors available, ignoring...")); } @@ -101,7 +101,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E if (cause instanceof RejectedExecutionException) { // we no longer have event executors available since they are terminated, mostly by shutdown process if (Objects.nonNull(cause.getMessage()) && cause.getMessage().contains(executorTerminatedMessage)) { - this.isQuitePeriod.compareAndSet(false, true); + this.isQuietPeriod.compareAndSet(false, true); } } else { super.exceptionCaught(ctx, cause); From 577efd76d6efd6fb90f513eaa5857355c0cf90d1 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Tue, 27 Aug 2024 07:51:18 -0700 Subject: [PATCH 5/8] Wait for worker group graceful shutdown before event executors. --- src/main/java/org/logstash/beats/Server.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index 36cc43aa..7de605e4 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -101,16 +101,12 @@ private void shutdown() { } if (workGroup != null) { - workGroup.shutdownGracefully(); + workGroup.shutdownGracefully().sync(); } + if (beatsInitializer != null) { beatsInitializer.shutdownEventExecutor(); } - // we wait for workGroup shutdown this time - if (workGroup != null && !workGroup.isShutdown()) { - // calling again shutdownGracefully doesn't hurt that it returns terminationFuture if shutdown is under process - workGroup.shutdownGracefully().sync(); - } } catch (InterruptedException e) { throw new IllegalStateException(e); } From a29da3283ad222260d1ff680e742241ca263fd70 Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Wed, 28 Aug 2024 07:14:19 -0700 Subject: [PATCH 6/8] Update src/main/java/org/logstash/beats/Server.java Comment correction. Co-authored-by: Andrea Selva --- src/main/java/org/logstash/beats/Server.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index 7de605e4..a999558b 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -178,7 +178,7 @@ public void shutdownEventExecutor() { try { idleExecutorGroup.shutdownGracefully().sync(); - // DefaultEventExecutorGroup internally executes numbers of SingletonEventExecutor + // DefaultEventExecutorGroup internally executes numbers of SingleThreadEventExecutor // try to gracefully shut down every thread if they have unacked pending batches (pending tasks) for (final EventExecutor eventExecutor : beatsHandlerExecutorGroup) { if (eventExecutor instanceof SingleThreadEventExecutor) { From be59fb13e4bbeb61ea792355c65a5a3b75c7d749 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Wed, 28 Aug 2024 07:18:47 -0700 Subject: [PATCH 7/8] Separate event executors termination with pending tasks into new method. --- src/main/java/org/logstash/beats/Server.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index a999558b..fb4b4b1b 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -178,6 +178,17 @@ public void shutdownEventExecutor() { try { idleExecutorGroup.shutdownGracefully().sync(); + shutdownEventExecutorsWithPendingTasks(); + + // make sure non-pending tasked executors get terminated + beatsHandlerExecutorGroup.shutdownGracefully().sync(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + + private void shutdownEventExecutorsWithPendingTasks() { + try { // DefaultEventExecutorGroup internally executes numbers of SingleThreadEventExecutor // try to gracefully shut down every thread if they have unacked pending batches (pending tasks) for (final EventExecutor eventExecutor : beatsHandlerExecutorGroup) { @@ -188,8 +199,6 @@ public void shutdownEventExecutor() { } } } - // make sure non-pending tasked executors get terminated - beatsHandlerExecutorGroup.shutdownGracefully().sync(); } catch (InterruptedException e) { throw new IllegalStateException(e); } From 54323deb6c77f7c00264e0225ac244e5d346f5b9 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Wed, 28 Aug 2024 08:23:16 -0700 Subject: [PATCH 8/8] Changelog revised. --- CHANGELOG.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e7860141..2470921e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## 6.9.0 - - Fix: avoid plugin crash when connection terminated but processing the message [#500](https://github.com/logstash-plugins/logstash-input-beats/pull/500) - - Improve plugin shutdown process by separating netty acceptor and child groups. + - Improvements on plugin's shutdown [#500](https://github.com/logstash-plugins/logstash-input-beats/pull/500) + - Fix: avoid plugin crash when connection terminated but processing the message + - Graceful shutdown: close acceptor group (incoming socket handler) first, ask and wait for the executors which have pending tasks, better message handling when executors terminated ## 6.8.4 - Fixed to populate the `@metadata` fields even if the source's metadata value is `nil` [#502](https://github.com/logstash-plugins/logstash-input-beats/pull/502)