Skip to content

Graceful shutdown improvements and undefined method 'accept' fix. #500

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 6.9.0
- Improvements on plugin's shutdown [#500](https://github.com/logstash-plugins/logstash-input-beats/pull/500)
- Fix: avoid plugin crash when connection terminated but processing the message
- Graceful shutdown: close acceptor group (incoming socket handler) first, ask and wait for the executors which have pending tasks, better message handling when executors terminated

## 6.8.4
- Fixed to populate the `@metadata` fields even if the source's metadata value is `nil` [#502](https://github.com/logstash-plugins/logstash-input-beats/pull/502)

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
6.8.4
6.9.0
4 changes: 2 additions & 2 deletions lib/logstash/inputs/beats/codec_callback_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
require "logstash/inputs/beats"

module LogStash module Inputs class Beats
# Use the new callback based approch instead of using blocks
# Use the new callback based approach instead of using blocks
# so we can retain some context of the execution, and make it easier to test
class CodecCallbackListener
attr_accessor :data
# The path acts as the `stream_identity`,
# usefull when the clients is reading multiples files
# useful when the clients is reading multiples files
attr_accessor :path

def initialize(data, hash, path, transformer, queue)
Expand Down
17 changes: 12 additions & 5 deletions lib/logstash/inputs/beats/message_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,18 @@ def onNewMessage(ctx, message)
@nocodec_transformer.transform(event)
@queue << event
else
codec(ctx).accept(CodecCallbackListener.new(target_field,
hash,
message.getIdentityStream(),
@codec_transformer,
@queue))
current_codec = codec(ctx)
if current_codec
current_codec.accept(CodecCallbackListener.new(target_field,
hash,
message.getIdentityStream(),
@codec_transformer,
@queue))
else
# the possible cases: connection closed or exception with a connection
# let client retry
input.logger.warn("No codec available to process the message. This may due to connection termination and message was being processed.")
end
end
end

Expand Down
18 changes: 18 additions & 0 deletions spec/inputs/beats/message_listener_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,24 @@ def flush(&block)
end
end

context "when connection is terminated" do
let(:message) { MockMessage.new("message from Mars", { "message" => "hello world", "@metadata" => {} } )}

it "handles without crashing" do
subject.onConnectionClose(ctx)
expect(subject.connections_list[ctx]).to be nil

expect( input.logger ).to receive(:warn) do |message |
expect( message ).to match /No codec available to process the message. This may due to connection termination and message was being processed./
end

subject.onNewMessage(ctx, message)
# doesn't push to queue, so should be nil
event = queue.pop
expect(event.get("message")).to be nil
end
end

it_behaves_like "when the message is from any libbeat", :disabled, "[@metadata][ip_address]"
it_behaves_like "when the message is from any libbeat", :v1, "[@metadata][input][beats][host][ip]"
it_behaves_like "when the message is from any libbeat", :v8, "[@metadata][input][beats][host][ip]"
Expand Down
88 changes: 54 additions & 34 deletions src/main/java/org/logstash/beats/BeatsHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,28 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLHandshakeException;

public class BeatsHandler extends SimpleChannelInboundHandler<Batch> {
private final static Logger logger = LogManager.getLogger(BeatsHandler.class);
private final static String executorTerminatedMessage = "event executor terminated";

private final IMessageListener messageListener;
private ChannelHandlerContext context;

private final AtomicBoolean isQuietPeriod = new AtomicBoolean(false);

public BeatsHandler(IMessageListener listener) {
messageListener = listener;
}

@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
context = ctx;
if (logger.isTraceEnabled()){
context = ctx;
if (logger.isTraceEnabled()) {
logger.trace(format("Channel Active"));
}
super.channelActive(ctx);
Expand All @@ -32,7 +38,7 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
if (logger.isTraceEnabled()){
if (logger.isTraceEnabled()) {
logger.trace(format("Channel Inactive"));
}
messageListener.onConnectionClose(ctx);
Expand All @@ -41,29 +47,22 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {

@Override
public void channelRead0(ChannelHandlerContext ctx, Batch batch) {
if(logger.isDebugEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug(format("Received a new payload"));
}
try {
if (batch.isEmpty()) {
logger.debug("Sending 0-seq ACK for empty batch");
writeAck(ctx, batch.getProtocol(), 0);
}
for (Message message : batch) {
if (isQuietPeriod.get()) {
if (logger.isDebugEnabled()) {
logger.debug(format("Sending a new message for the listener, sequence: " + message.getSequence()));
}
messageListener.onNewMessage(ctx, message);

if (needAck(message)) {
ack(ctx, message);
logger.debug(format("Received batch but no executors available, ignoring..."));
}
} else {
processBatchAndSendAck(ctx, batch);
}
}finally{
} finally {
//this channel is done processing this payload, instruct the connection handler to stop sending TCP keep alive
ctx.channel().attr(ConnectionHandler.CHANNEL_SEND_KEEP_ALIVE).get().set(false);
if (logger.isDebugEnabled()) {
logger.debug("{}: batches pending: {}", ctx.channel().id().asShortText(),ctx.channel().attr(ConnectionHandler.CHANNEL_SEND_KEEP_ALIVE).get().get());
logger.debug("{}: batches pending: {}", ctx.channel().id().asShortText(), ctx.channel().attr(ConnectionHandler.CHANNEL_SEND_KEEP_ALIVE).get().get());
}
batch.release();
ctx.flush();
Expand Down Expand Up @@ -93,19 +92,47 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
}
} else {
final Throwable realCause = extractCause(cause, 0);
if (logger.isDebugEnabled()){
if (logger.isDebugEnabled()) {
logger.info(format("Handling exception: " + cause + " (caused by: " + realCause + ")"), cause);
} else {
logger.info(format("Handling exception: " + cause + " (caused by: " + realCause + ")"));
}
super.exceptionCaught(ctx, cause);
// when execution tasks rejected, no need to forward the exception to netty channel handlers
if (cause instanceof RejectedExecutionException) {
// we no longer have event executors available since they are terminated, mostly by shutdown process
if (Objects.nonNull(cause.getMessage()) && cause.getMessage().contains(executorTerminatedMessage)) {
this.isQuietPeriod.compareAndSet(false, true);
}
} else {
super.exceptionCaught(ctx, cause);
}
}
} finally {
ctx.flush();
ctx.close();
}
}

private void processBatchAndSendAck(ChannelHandlerContext ctx, Batch batch) {
if (batch.isEmpty()) {
logger.debug("Sending 0-seq ACK for empty batch");
writeAck(ctx, batch.getProtocol(), 0);
}
for (Message message : batch) {
if (logger.isDebugEnabled()) {
logger.debug(format("Sending a new message for the listener, sequence: " + message.getSequence()));
}
messageListener.onNewMessage(ctx, message);

if (needAck(message)) {
if (logger.isTraceEnabled()) {
logger.trace(format("Acking message number " + message.getSequence()));
}
writeAck(ctx, message.getBatch().getProtocol(), message.getSequence());
}
}
}

private boolean isNoisyException(final Throwable ex) {
if (ex instanceof IOException) {
final String message = ex.getMessage();
Expand All @@ -120,13 +147,6 @@ private boolean needAck(Message message) {
return message.getSequence() == message.getBatch().getHighestSequence();
}

private void ack(ChannelHandlerContext ctx, Message message) {
if (logger.isTraceEnabled()){
logger.trace(format("Acking message number " + message.getSequence()));
}
writeAck(ctx, message.getBatch().getProtocol(), message.getSequence());
}

private void writeAck(ChannelHandlerContext ctx, byte protocol, int sequence) {
ctx.write(new Ack(protocol, sequence));
}
Expand All @@ -140,20 +160,20 @@ private String format(String message) {
InetSocketAddress remote = (InetSocketAddress) context.channel().remoteAddress();

String localhost;
if(local != null) {
if (local != null) {
localhost = local.getAddress().getHostAddress() + ":" + local.getPort();
} else{
} else {
localhost = "undefined";
}

String remotehost;
if(remote != null) {
remotehost = remote.getAddress().getHostAddress() + ":" + remote.getPort();
} else{
remotehost = "undefined";
String remoteHost;
if (remote != null) {
remoteHost = remote.getAddress().getHostAddress() + ":" + remote.getPort();
} else {
remoteHost = "undefined";
}

return "[local: " + localhost + ", remote: " + remotehost + "] " + message;
return "[local: " + localhost + ", remote: " + remoteHost + "] " + message;
}

private static final int MAX_CAUSE_NESTING = 10;
Expand Down
40 changes: 39 additions & 1 deletion src/main/java/org/logstash/beats/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.netty.SslHandlerProvider;
Expand All @@ -23,6 +25,7 @@ public class Server {
private final String host;
private final int eventLoopThreadCount;
private final int executorThreadCount;
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workGroup;
private IMessageListener messageListener = new MessageListener();
private SslHandlerProvider sslHandlerProvider;
Expand Down Expand Up @@ -51,14 +54,15 @@ public Server listen() throws InterruptedException {
logger.error("Could not shut down worker group before starting", e);
}
}
bossGroup = new NioEventLoopGroup(eventLoopThreadCount); // TODO: add a config to make it adjustable, no need many threads
workGroup = new NioEventLoopGroup(eventLoopThreadCount);
try {
logger.info("Starting server on port: {}", this.port);

beatsInitializer = new BeatsInitializer(messageListener, clientInactivityTimeoutSeconds, executorThreadCount);

ServerBootstrap server = new ServerBootstrap();
server.group(workGroup)
server.group(bossGroup, workGroup)
Copy link
Contributor Author

@mashhurs mashhurs Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Till now, we were using workgroup for handling incoming socket connections and there was no guarantee socket handlers got terminated first. With this separation we will have two major improvements:

  • make sure we terminate handlers, which don't push to netty internal tasks queue
  • make thread count configurable: no need many threads as it accepts and sends to work group, current default is NumberOfCpuProcessors*2

References:

.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.SO_LINGER, 0) // Since the protocol doesn't support yet a remote close from the server and we don't want to have 'unclosed' socket lying around we have to use `SO_LINGER` to force the close of the socket.
.childHandler(beatsInitializer);
Expand All @@ -83,10 +87,23 @@ public void stop() {
}

private void shutdown() {
// as much as possible we try to gracefully shut down
// no longer accept incoming socket connections
// with event loop group (workGroup) declares quite period with `shutdownGracefully` which queued tasks will not receive work for the best cases
// executor group threads will be asked and waited to gracefully shutdown if they have pending tasks. This helps each individual handler process the event/exception, especially when multichannel use case.
// there is no guarantee that executor threads will terminate during the shutdown because of many factors such as ack processing
// so any pending tasks which send batches to BeatsHandler will be ignored
try {
// boss group shuts down socket connections
// shutting down bossGroup separately gives us faster channel closure
if (bossGroup != null) {
bossGroup.shutdownGracefully().sync();
}

if (workGroup != null) {
workGroup.shutdownGracefully().sync();
}

if (beatsInitializer != null) {
beatsInitializer.shutdownEventExecutor();
}
Expand Down Expand Up @@ -160,10 +177,31 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
public void shutdownEventExecutor() {
try {
idleExecutorGroup.shutdownGracefully().sync();

shutdownEventExecutorsWithPendingTasks();

// make sure non-pending tasked executors get terminated
beatsHandlerExecutorGroup.shutdownGracefully().sync();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}

private void shutdownEventExecutorsWithPendingTasks() {
try {
// DefaultEventExecutorGroup internally executes numbers of SingleThreadEventExecutor
// try to gracefully shut down every thread if they have unacked pending batches (pending tasks)
for (final EventExecutor eventExecutor : beatsHandlerExecutorGroup) {
if (eventExecutor instanceof SingleThreadEventExecutor) {
final SingleThreadEventExecutor singleExecutor = (SingleThreadEventExecutor) eventExecutor;
if (singleExecutor.pendingTasks() > 0) {
singleExecutor.shutdownGracefully().sync();
}
}
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
}