OF-3176: Offload blocking Netty handlers to dedicated executor#3143
OF-3176: Offload blocking Netty handlers to dedicated executor#3143guusdk wants to merge 10 commits intoigniterealtime:mainfrom
Conversation
Introduce a shared blocking handler executor for Netty pipelines to ensure that potentially blocking or long-running operations do not execute on EventLoop threads. This change separates responsibilities between: - acceptor (boss) EventLoopGroup - non-blocking I/O worker EventLoopGroup - dedicated executor for blocking pipeline handlers The new executor is shared across all channels created by a NettyConnectionAcceptor and is used for handlers that may perform authentication, routing, persistence, or other blocking work. This improves throughput and stability under load by preventing EventLoop starvation and aligns Openfire’s Netty usage with Netty best practices. The connection configuration’s "max thread pool size" is now applied to the dedicated blocking handler executor. Netty EventLoopGroups (acceptor and I/O worker) now use Netty’s default thread sizing, as they are reserved exclusively for non-blocking socket I/O and protocol framing.
There was a problem hiding this comment.
Pull request overview
This PR introduces a dedicated EventExecutorGroup for potentially blocking Netty pipeline handlers, to keep Netty EventLoop threads focused on non-blocking I/O and reduce EventLoop starvation under load.
Changes:
- Add a shared blocking-handler executor to
NettyConnectionAcceptorand run core “business logic” handlers on that executor. - Extend plugin handler injection to receive an executor, enabling plugins to offload their handlers from EventLoop threads.
- Update outbound S2S session initialization to also offload its business logic handler to a dedicated executor, plus documentation updates describing the new threading model.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
xmppserver/src/main/java/org/jivesoftware/openfire/spi/NettyServerInitializer.java |
Offloads the core business logic handler to a provided blocking executor; passes executor to plugin handler factories. |
xmppserver/src/main/java/org/jivesoftware/openfire/spi/NettyConnectionAcceptor.java |
Introduces and manages lifecycle of a shared blocking handler executor; updates bootstrap wiring and handler factory calls. |
xmppserver/src/main/java/org/jivesoftware/openfire/nio/NettySessionInitializer.java |
Offloads outbound S2S business logic to a new executor; introduces per-instance executor and thread naming changes. |
xmppserver/src/main/java/org/jivesoftware/openfire/nio/NettyChannelHandlerFactory.java |
Changes plugin extension interface to accept an executor for handler execution. |
documentation/internal-networking.html |
Documents the new dedicated executor concept alongside boss/worker groups. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
xmppserver/src/main/java/org/jivesoftware/openfire/nio/NettyChannelHandlerFactory.java
Outdated
Show resolved
Hide resolved
xmppserver/src/main/java/org/jivesoftware/openfire/nio/NettySessionInitializer.java
Outdated
Show resolved
Hide resolved
xmppserver/src/main/java/org/jivesoftware/openfire/nio/NettySessionInitializer.java
Outdated
Show resolved
Hide resolved
xmppserver/src/main/java/org/jivesoftware/openfire/spi/NettyConnectionAcceptor.java
Show resolved
Hide resolved
xmppserver/src/main/java/org/jivesoftware/openfire/nio/NettySessionInitializer.java
Outdated
Show resolved
Hide resolved
41a5e78 to
18485da
Compare
…ing Netty handlers A recent change switched server-side (NettyServerInitializer) and outbound client (NettySessionInitializer) pipelines to use a separate EventExecutorGroup for blocking handlers, to prevent long-running operations from exhausting the Netty EventLoop (OF-3176). This introduced a race condition: if data arrived immediately after a channel became active, it could reach the pipeline before the handler was fully registered on the executor, leaving it queued until the executor ran or the acceptor was stopped. Occassional, but flaky, failures of the LocalIncomingServerSessionTest unit tests were a symptom of this race condition. Fix: - Disable autoRead on the channel initially. - Re-enable autoRead only after the channel is fully registered. This restores reliable processing of incoming data on both server and outbound sessions while still preventing Netty EventLoop starvation from blocking operations.
…eturns NettyConnectionAcceptor.start() previously returned as soon as the server socket was bound. While this guarantees that the port is open, it does not ensure that the Netty boss EventLoop has completed acceptor initialization. After the recent change to offload blocking handlers to a separate EventExecutorGroup, this timing window became visible in tests: clients connecting immediately after startup could race ahead of acceptor readiness, leading to intermittent failures. Fix: - Add an explicit readiness barrier by scheduling a task on the boss EventLoop and waiting for it to execute before returning from start(). - This guarantees that the server channel is bound, registered, and fully initialized on the Netty event loop before accepting connections. This change provides deterministic startup semantics for both production use and tests, without affecting per-connection pipeline initialization or runtime behavior.
783be64 to
e4c2680
Compare
…eter Replace raw StanzaHandler usage with a generic type parameter to ensure type safety and remove the need for casting in subclasses.
…tion Ensure callers can reliably wait for RespondingServerStanzaHandler creation by exposing a future that completes from handlerAdded(). This prevents intermittent failures caused by accessing the handler before it is attached to the Netty pipeline, without introducing blocking or timing-based workarounds.
This step builds a docker image, then executes tests against it. This commit enables BuildKit + Docker layer caching in the integration job This should help: - Maven cache layers (/tmp/m2_repo) are reused across workflow runs - The expensive dependency:resolve steps become near-instant
…ry per review feedback Keep the original addNewHandlerTo(ChannelPipeline) method and add an executor-aware overload as default, avoiding breaking existing plugin implementations.
The original javadoc was copied from a very similar code structure that was added to NettyConnectionAcceptor.
…g on InterruptedException Previously, when start() was interrupted, closeMainChannel() was called but the thread's interrupted status was cleared. This could hide the interruption from higher-level code. Now, the interrupt flag is restored via Thread.currentThread().interrupt(), ensuring that the thread interruption is correctly propagated without changing existing stop/start behavior.
|
Does this solution void the fixes for https://igniterealtime.atlassian.net/browse/OF-3180 in #3140 ? |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (directTLS) { | ||
| ch.attr(CONNECTION).get().startTLS(true, true); | ||
| } |
There was a problem hiding this comment.
Direct TLS startup calls ch.attr(CONNECTION).get().startTLS(true, true) immediately after adding NettyOutboundConnectionHandler with blockingHandlerExecutor. Because handlerAdded for that handler is executed asynchronously on the executor group, the CONNECTION attribute may still be null here, which can crash outbound S2S connection establishment. Defer TLS initialization until the NettyConnection has been created (eg, after handlerAdded completes, preferably by scheduling pipeline mutation on the channel EventLoop).
| final H stanzaHandler = createStanzaHandler(nettyConnection); | ||
| ctx.channel().attr(HANDLER).set(stanzaHandler); | ||
| stanzaHandlerFuture.complete(stanzaHandler); | ||
| } |
There was a problem hiding this comment.
If createStanzaHandler(...) (or subsequent attr assignments) throws, stanzaHandlerFuture is never completed, which can cause callers waiting on getStanzaHandlerFuture() to hang indefinitely. Wrap stanza handler creation/assignment in try/catch and complete stanzaHandlerFuture exceptionally (and/or close the channel) on failure.
| connectionAcceptor.start(); // Start accepting inbound S2S connections. | ||
| NettySessionInitializer.startSharedResources(); // Start the shared resources for outbound S2S connections. |
There was a problem hiding this comment.
NettySessionInitializer shared resources are started here unconditionally for every ConnectionListener. This should only run for the S2S listener (ConnectionType.SOCKET_S2S); otherwise starting any listener (eg C2S) unnecessarily starts outbound S2S resources and makes their lifecycle dependent on unrelated listeners.
| if (isDirectTLSConfigured()) { | ||
| ch.attr(CONNECTION).get().startTLS(false, true); | ||
| } |
There was a problem hiding this comment.
Direct TLS startup calls ch.attr(CONNECTION).get().startTLS(...) immediately after adding businessLogicHandler with blockingHandlerExecutor. Because handlerAdded for that handler is executed asynchronously on the executor group, the CONNECTION attribute may not be set yet, leading to a NullPointerException/race at startup. Defer startTLS until after handlerAdded has created and stored the NettyConnection (for example, trigger it from within handlerAdded or via a future that completes when CONNECTION is available, and run the TLS pipeline mutation on the channel EventLoop).
| ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { | ||
| @Override | ||
| public void channelRegistered(ChannelHandlerContext ctx) { | ||
| // Schedule enabling auto-read on the blocking executor to ensure pipeline is fully ready. | ||
| blockingHandlerExecutor.execute(() -> ctx.channel().config().setAutoRead(true)); |
There was a problem hiding this comment.
Auto-read is re-enabled by scheduling a task on blockingHandlerExecutor during channelRegistered. Because businessLogicHandler was added with that same executor group, its handlerAdded (which initializes CONNECTION/HANDLER attributes) also runs asynchronously on the pool; with multiple threads there is no ordering guarantee, so reads can be enabled before initialization completes. Re-enable autoRead only after businessLogicHandler signals initialization completion (eg via a future) and perform setAutoRead(true) on the channel EventLoop.
| // Re-enable autoRead after the channel is fully registered. | ||
| ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { | ||
| @Override | ||
| public void channelRegistered(ChannelHandlerContext ctx) { | ||
| // Schedule enabling auto-read on the blocking executor to ensure pipeline is fully ready. | ||
| blockingHandlerExecutor.execute(() -> ctx.channel().config().setAutoRead(true)); | ||
| ctx.fireChannelRegistered(); |
There was a problem hiding this comment.
Auto-read is re-enabled by scheduling a task on blockingHandlerExecutor during channelRegistered. Since NettyOutboundConnectionHandler was added with that executor group, its handlerAdded (which sets CONNECTION/HANDLER) also runs asynchronously on the pool; there is no sequencing guarantee, so reads can be enabled before initialization completes. Consider enabling autoRead only after handler initialization completes (eg via getStanzaHandlerFuture()) and execute the config change on the channel EventLoop.
| // Re-enable autoRead after the channel is fully registered. | |
| ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { | |
| @Override | |
| public void channelRegistered(ChannelHandlerContext ctx) { | |
| // Schedule enabling auto-read on the blocking executor to ensure pipeline is fully ready. | |
| blockingHandlerExecutor.execute(() -> ctx.channel().config().setAutoRead(true)); | |
| ctx.fireChannelRegistered(); | |
| // Re-enable autoRead only after the outbound handler has completed its initialization, | |
| // and perform the config change on the channel's EventLoop. | |
| ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { | |
| @Override | |
| public void channelRegistered(ChannelHandlerContext ctx) { | |
| ctx.fireChannelRegistered(); | |
| final Channel channel = ctx.channel(); | |
| final CompletableFuture<?> initFuture = businessLogicHandler.getStanzaHandlerFuture(); | |
| if (initFuture != null) { | |
| initFuture.whenComplete((result, error) -> { | |
| if (error == null && channel.isOpen()) { | |
| channel.eventLoop().execute(() -> { | |
| if (channel.isOpen()) { | |
| channel.config().setAutoRead(true); | |
| } | |
| }); | |
| } | |
| }); | |
| } else { | |
| // Fallback: if no init future is available, still enable auto-read on the EventLoop. | |
| channel.eventLoop().execute(() -> { | |
| if (channel.isOpen()) { | |
| channel.config().setAutoRead(true); | |
| } | |
| }); | |
| } |
| // its task queue. Add a barrier task that runs on the boss event loop after bind completes. This guarantees | ||
| // that The boss EventLoop has processed startup tasks and Netty is ready to accept connections. | ||
| final CountDownLatch readyLatch = new CountDownLatch(1); | ||
| mainChannel.eventLoop().execute(readyLatch::countDown); | ||
| readyLatch.await(); |
There was a problem hiding this comment.
readyLatch.await() has no timeout. If the boss EventLoop cannot execute the barrier task for any reason (shutdown, deadlock, unexpected scheduling failure), start() can hang indefinitely. Consider using a bounded await with logging/failure handling (and restore the interrupt flag when interrupted) to avoid blocking startup forever.
| connectionAcceptor.stop(); // Stop accepting inbound S2S connections. | ||
| NettySessionInitializer.stopSharedResources(); // Stop the shared resources for outbound S2S connections. |
There was a problem hiding this comment.
NettySessionInitializer shared resources are stopped here unconditionally. ConnectionListener is used for multiple connection types, so stopping/restarting a non-S2S listener would tear down shared outbound S2S Netty pools and disrupt outbound federation. Gate stopSharedResources() behind a check for ConnectionType.SOCKET_S2S (or move lifecycle management to the S2S subsystem).
| connectionAcceptor.stop(); // Stop accepting inbound S2S connections. | ||
| NettySessionInitializer.stopSharedResources(); // Stop the shared resources for outbound S2S connections. |
There was a problem hiding this comment.
Same issue on shutdown: stop() always invokes NettySessionInitializer.stopSharedResources(), which will tear down outbound S2S resources even when stopping a non-S2S listener. This should be conditional on getType() == ConnectionType.SOCKET_S2S (or otherwise managed by the S2S subsystem) to avoid breaking outbound S2S during unrelated listener lifecycle events.
| connectionAcceptor.stop(); // Stop accepting inbound S2S connections. | |
| NettySessionInitializer.stopSharedResources(); // Stop the shared resources for outbound S2S connections. | |
| connectionAcceptor.stop(); // Stop accepting inbound connections for this listener. | |
| if ( getType() == ConnectionType.SOCKET_S2S ) | |
| { | |
| NettySessionInitializer.stopSharedResources(); // Stop the shared resources for outbound S2S connections. | |
| } |
Previously, each NettySessionInitializer instance created its own NioEventLoopGroup and DefaultEventExecutorGroup, meaning every outbound S2S connection owned its own thread pools for its entire lifetime. This caused unbounded thread growth under load. Introduce static shared thread pools (sharedIoWorkerGroup and sharedBlockingHandlerExecutor) in NettySessionInitializer, managed via new startSharedResources() and stopSharedResources() static methods. All outbound S2S connections now share a single pair of pools.
6d3b69f to
4327c20
Compare
Introduce a shared blocking handler executor for Netty pipelines to ensure that potentially blocking or long-running operations do not execute on EventLoop threads.
This change separates responsibilities between:
The new executor is shared across all channels created by a NettyConnectionAcceptor and is used for handlers that may perform authentication, routing, persistence, or other blocking work.
This improves throughput and stability under load by preventing EventLoop starvation and aligns Openfire’s Netty usage with Netty best practices.
The connection configuration’s "max thread pool size" is now applied to the dedicated blocking handler executor. Netty EventLoopGroups (acceptor and I/O worker) now use Netty’s default thread sizing, as they are reserved exclusively for non-blocking socket I/O and protocol framing.