Skip to content

Commit a54f418

Browse files
jrhee17kezhenxu94
andauthored
Provide a way to run service code out of I/O event loops (#5233)
Motivation: When service devs are not very familiar with asynchronous programming, it is very easy to drive Armeria's core event loops into havoc by blocking them. Framework devs may want to make sure Armeria at least handle I/O and function normally for a certain set of core services, such as `PrometheusExpositionService` or `HealthCheckService`, by isolating other non-core services from the I/O event loops. Modifications: * Add the `serviceWorkerGroup()` builder methods to `ServerBuilder` and `ServiceConfigSetters` so a user can specify the service worker groups as shown in the above example. * If `serviceWorkerGroup` is not specified, the `workerGroup` is used by default. * Change how Armeria assigns an event loop to a `ServiceRequestContext`. * If the `workerGroup` is different from `serviceWorkerGroup`, then an event loop from `serviceWorkerGroup` is used. * Otherwise, the IO event loop is used for executing services * Modified the constructor of `DefaultServiceRequestContext` so it accepts an `EventLoop`. * Modified `HttpServerHandler.handleRequest()`, so that `HttpService#serve` is executed from the `serviceWorkerGroup` * Modified so that we can guarantee that pending `RequestLogFuture`s are always scheduled from the context's event loop Result: - Closes #4099. - Users can add per-service/virtual host/server `serviceWorkerGroup` property that makes a service use a different `EventLoopGroup` than `ServerBuilder.workerGroup`. --------- Co-authored-by: kezhenxu94 <[email protected]>
1 parent d9703ed commit a54f418

32 files changed

+1019
-112
lines changed

benchmarks/jmh/src/jmh/java/com/linecorp/armeria/server/RoutersBenchmark.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,27 +66,27 @@ public class RoutersBenchmark {
6666
new ServiceConfig(route1, route1,
6767
SERVICE, defaultLogName, defaultServiceName, defaultServiceNaming, 0, 0,
6868
false, AccessLogWriter.disabled(), CommonPools.blockingTaskExecutor(),
69-
SuccessFunction.always(), 0, multipartUploadsLocation, ImmutableList.of(),
70-
HttpHeaders.of(), ctx -> RequestId.random(), serviceErrorHandler,
71-
NOOP_CONTEXT_HOOK),
69+
SuccessFunction.always(), 0, multipartUploadsLocation,
70+
CommonPools.workerGroup(), ImmutableList.of(), HttpHeaders.of(),
71+
ctx -> RequestId.random(), serviceErrorHandler, NOOP_CONTEXT_HOOK),
7272
new ServiceConfig(route2, route2,
7373
SERVICE, defaultLogName, defaultServiceName, defaultServiceNaming, 0, 0,
7474
false, AccessLogWriter.disabled(), CommonPools.blockingTaskExecutor(),
75-
SuccessFunction.always(), 0, multipartUploadsLocation, ImmutableList.of(),
76-
HttpHeaders.of(), ctx -> RequestId.random(), serviceErrorHandler,
77-
NOOP_CONTEXT_HOOK));
75+
SuccessFunction.always(), 0, multipartUploadsLocation,
76+
CommonPools.workerGroup(), ImmutableList.of(), HttpHeaders.of(),
77+
ctx -> RequestId.random(), serviceErrorHandler, NOOP_CONTEXT_HOOK));
7878
FALLBACK_SERVICE = new ServiceConfig(Route.ofCatchAll(), Route.ofCatchAll(), SERVICE,
7979
defaultLogName, defaultServiceName,
8080
defaultServiceNaming, 0, 0, false, AccessLogWriter.disabled(),
81-
CommonPools.blockingTaskExecutor(),
82-
SuccessFunction.always(), 0, multipartUploadsLocation,
81+
CommonPools.blockingTaskExecutor(), SuccessFunction.always(), 0,
82+
multipartUploadsLocation, CommonPools.workerGroup(),
8383
ImmutableList.of(), HttpHeaders.of(), ctx -> RequestId.random(),
8484
serviceErrorHandler, NOOP_CONTEXT_HOOK);
8585
HOST = new VirtualHost(
8686
"localhost", "localhost", 0, null, SERVICES, FALLBACK_SERVICE, RejectedRouteHandler.DISABLED,
8787
unused -> NOPLogger.NOP_LOGGER, defaultServiceNaming, defaultLogName, 0, 0, false,
8888
AccessLogWriter.disabled(), CommonPools.blockingTaskExecutor(), 0, SuccessFunction.ofDefault(),
89-
multipartUploadsLocation, ImmutableList.of(),
89+
multipartUploadsLocation, CommonPools.workerGroup(), ImmutableList.of(),
9090
ctx -> RequestId.random());
9191
ROUTER = Routers.ofVirtualHost(HOST, SERVICES, RejectedRouteHandler.DISABLED);
9292
}

brave/src/test/java/com/linecorp/armeria/common/brave/RequestContextCurrentTraceContextTest.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,13 @@
1717
package com.linecorp.armeria.common.brave;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20-
import static org.mockito.ArgumentMatchers.any;
21-
import static org.mockito.Mockito.doAnswer;
2220
import static org.mockito.Mockito.when;
2321

2422
import org.junit.jupiter.api.BeforeEach;
2523
import org.junit.jupiter.api.Test;
2624
import org.mockito.Mock;
2725
import org.mockito.junit.jupiter.MockitoSettings;
2826
import org.mockito.quality.Strictness;
29-
import org.mockito.stubbing.Answer;
3027

3128
import com.linecorp.armeria.common.HttpMethod;
3229
import com.linecorp.armeria.common.HttpRequest;
@@ -55,10 +52,7 @@ class RequestContextCurrentTraceContextTest {
5552
@BeforeEach
5653
void setUp() {
5754
when(eventLoop.inEventLoop()).thenReturn(true);
58-
doAnswer((Answer<Void>) invocation -> {
59-
invocation.<Runnable>getArgument(0).run();
60-
return null;
61-
}).when(eventLoop).execute(any());
55+
when(eventLoop.next()).thenReturn(eventLoop);
6256

6357
ctx = ServiceRequestContext.builder(HttpRequest.of(HttpMethod.GET, "/"))
6458
.eventLoop(eventLoop)

core/src/main/java/com/linecorp/armeria/common/HttpRequest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,4 +810,10 @@ default HttpRequest peekError(Consumer<? super Throwable> action) {
810810
requireNonNull(action, "action");
811811
return of(headers(), HttpMessage.super.peekError(action));
812812
}
813+
814+
@Override
815+
default HttpRequest subscribeOn(EventExecutor eventExecutor) {
816+
requireNonNull(eventExecutor, "eventExecutor");
817+
return of(headers(), HttpMessage.super.subscribeOn(eventExecutor));
818+
}
813819
}

core/src/main/java/com/linecorp/armeria/common/HttpResponse.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,4 +1178,9 @@ default <T extends Throwable> HttpResponse recover(Class<T> causeClass,
11781178
}
11791179
});
11801180
}
1181+
1182+
@Override
1183+
default HttpResponse subscribeOn(EventExecutor eventExecutor) {
1184+
return of(HttpMessage.super.subscribeOn(eventExecutor));
1185+
}
11811186
}

core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1136,4 +1136,22 @@ default InputStream toInputStream(Function<? super T, ? extends HttpData> httpDa
11361136
default StreamMessage<T> endWith(Function<@Nullable Throwable, ? extends @Nullable T> finalizer) {
11371137
return new SurroundingPublisher<>(null, this, finalizer);
11381138
}
1139+
1140+
/**
1141+
* Calls {@link #subscribe(Subscriber, EventExecutor)} to the upstream
1142+
* {@link StreamMessage} using the specified {@link EventExecutor} and relays the stream
1143+
* transparently downstream. This may be useful if one would like to hide an
1144+
* {@link EventExecutor} from an upstream {@link Publisher}.
1145+
*
1146+
* <p>For example:<pre>{@code
1147+
* Subscriber<Integer> mySubscriber = null;
1148+
* StreamMessage<Integer> upstream = ...; // publisher callbacks are invoked by eventLoop1
1149+
* upstream.subscribeOn(eventLoop1)
1150+
* .subscribe(mySubscriber, eventLoop2); // mySubscriber callbacks are invoked with eventLoop2
1151+
* }</pre>
1152+
*/
1153+
default StreamMessage<T> subscribeOn(EventExecutor eventExecutor) {
1154+
requireNonNull(eventExecutor, "eventExecutor");
1155+
return new SubscribeOnStreamMessage<>(this, eventExecutor);
1156+
}
11391157
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright 2023 LINE Corporation
3+
*
4+
* LINE Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
package com.linecorp.armeria.common.stream;
18+
19+
import java.util.concurrent.CompletableFuture;
20+
21+
import org.reactivestreams.Subscriber;
22+
import org.reactivestreams.Subscription;
23+
24+
import io.netty.util.concurrent.EventExecutor;
25+
26+
final class SubscribeOnStreamMessage<T> implements StreamMessage<T> {
27+
28+
private final StreamMessage<T> upstream;
29+
private final EventExecutor upstreamExecutor;
30+
31+
SubscribeOnStreamMessage(StreamMessage<T> upstream, EventExecutor upstreamExecutor) {
32+
this.upstream = upstream;
33+
this.upstreamExecutor = upstreamExecutor;
34+
}
35+
36+
@Override
37+
public boolean isOpen() {
38+
return upstream.isOpen();
39+
}
40+
41+
@Override
42+
public boolean isEmpty() {
43+
return upstream.isEmpty();
44+
}
45+
46+
@Override
47+
public long demand() {
48+
return upstream.demand();
49+
}
50+
51+
@Override
52+
public CompletableFuture<Void> whenComplete() {
53+
return upstream.whenComplete();
54+
}
55+
56+
@Override
57+
public EventExecutor defaultSubscriberExecutor() {
58+
return upstreamExecutor;
59+
}
60+
61+
@Override
62+
public void subscribe(Subscriber<? super T> subscriber, EventExecutor downstreamExecutor,
63+
SubscriptionOption... options) {
64+
final Subscriber<? super T> subscriber0;
65+
if (upstreamExecutor == downstreamExecutor) {
66+
subscriber0 = subscriber;
67+
} else {
68+
subscriber0 = new SchedulingSubscriber<>(downstreamExecutor, subscriber);
69+
}
70+
if (upstreamExecutor.inEventLoop()) {
71+
upstream.subscribe(subscriber0, downstreamExecutor, options);
72+
} else {
73+
upstreamExecutor.execute(() -> upstream.subscribe(subscriber0, upstreamExecutor, options));
74+
}
75+
}
76+
77+
@Override
78+
public void abort() {
79+
upstream.abort();
80+
}
81+
82+
@Override
83+
public void abort(Throwable cause) {
84+
upstream.abort(cause);
85+
}
86+
87+
static class SchedulingSubscriber<T> implements Subscriber<T> {
88+
89+
private final Subscriber<? super T> downstream;
90+
private final EventExecutor downstreamExecutor;
91+
92+
SchedulingSubscriber(EventExecutor downstreamExecutor, Subscriber<? super T> downstream) {
93+
this.downstream = downstream;
94+
this.downstreamExecutor = downstreamExecutor;
95+
}
96+
97+
@Override
98+
public void onSubscribe(Subscription s) {
99+
downstreamExecutor.execute(() -> downstream.onSubscribe(s));
100+
}
101+
102+
@Override
103+
public void onNext(T t) {
104+
downstreamExecutor.execute(() -> downstream.onNext(t));
105+
}
106+
107+
@Override
108+
public void onError(Throwable t) {
109+
downstreamExecutor.execute(() -> downstream.onError(t));
110+
}
111+
112+
@Override
113+
public void onComplete() {
114+
downstreamExecutor.execute(downstream::onComplete);
115+
}
116+
}
117+
}

core/src/main/java/com/linecorp/armeria/internal/server/DefaultServiceRequestContext.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import io.micrometer.core.instrument.MeterRegistry;
7373
import io.netty.buffer.ByteBufAllocator;
7474
import io.netty.channel.Channel;
75+
import io.netty.channel.EventLoop;
7576
import io.netty.util.AttributeKey;
7677

7778
/**
@@ -90,6 +91,7 @@ public final class DefaultServiceRequestContext
9091
DefaultServiceRequestContext.class, HttpHeaders.class, "additionalResponseTrailers");
9192

9293
private final Channel ch;
94+
private final EventLoop eventLoop;
9395
private final ServiceConfig cfg;
9496
private final RoutingContext routingContext;
9597
private final RoutingResult routingResult;
@@ -141,22 +143,24 @@ public final class DefaultServiceRequestContext
141143
* e.g. {@code System.currentTimeMillis() * 1000}.
142144
*/
143145
public DefaultServiceRequestContext(
144-
ServiceConfig cfg, Channel ch, MeterRegistry meterRegistry, SessionProtocol sessionProtocol,
145-
RequestId id, RoutingContext routingContext, RoutingResult routingResult, ExchangeType exchangeType,
146+
ServiceConfig cfg, Channel ch, EventLoop eventLoop, MeterRegistry meterRegistry,
147+
SessionProtocol sessionProtocol, RequestId id, RoutingContext routingContext,
148+
RoutingResult routingResult, ExchangeType exchangeType,
146149
HttpRequest req, @Nullable SSLSession sslSession, ProxiedAddresses proxiedAddresses,
147150
InetAddress clientAddress, InetSocketAddress remoteAddress, InetSocketAddress localAddress,
148151
long requestStartTimeNanos, long requestStartTimeMicros,
149152
Supplier<? extends AutoCloseable> contextHook) {
150153

151-
this(cfg, ch, meterRegistry, sessionProtocol, id, routingContext, routingResult, exchangeType,
152-
req, sslSession, proxiedAddresses, clientAddress, remoteAddress, localAddress,
154+
this(cfg, ch, eventLoop, meterRegistry, sessionProtocol, id, routingContext, routingResult,
155+
exchangeType, req, sslSession, proxiedAddresses, clientAddress, remoteAddress, localAddress,
153156
null /* requestCancellationScheduler */, requestStartTimeNanos, requestStartTimeMicros,
154157
HttpHeaders.of(), HttpHeaders.of(), contextHook);
155158
}
156159

157160
public DefaultServiceRequestContext(
158-
ServiceConfig cfg, Channel ch, MeterRegistry meterRegistry, SessionProtocol sessionProtocol,
159-
RequestId id, RoutingContext routingContext, RoutingResult routingResult, ExchangeType exchangeType,
161+
ServiceConfig cfg, Channel ch, EventLoop eventLoop, MeterRegistry meterRegistry,
162+
SessionProtocol sessionProtocol, RequestId id, RoutingContext routingContext,
163+
RoutingResult routingResult, ExchangeType exchangeType,
160164
HttpRequest req, @Nullable SSLSession sslSession, ProxiedAddresses proxiedAddresses,
161165
InetAddress clientAddress, InetSocketAddress remoteAddress, InetSocketAddress localAddress,
162166
@Nullable CancellationScheduler requestCancellationScheduler,
@@ -170,6 +174,7 @@ public DefaultServiceRequestContext(
170174
requireNonNull(req, "req"), null, null, contextHook);
171175

172176
this.ch = requireNonNull(ch, "ch");
177+
this.eventLoop = requireNonNull(eventLoop, "eventLoop");
173178
this.cfg = requireNonNull(cfg, "cfg");
174179
this.routingContext = routingContext;
175180
this.routingResult = routingResult;
@@ -178,7 +183,9 @@ public DefaultServiceRequestContext(
178183
} else {
179184
this.requestCancellationScheduler =
180185
CancellationScheduler.ofServer(TimeUnit.MILLISECONDS.toNanos(cfg.requestTimeoutMillis()));
181-
this.requestCancellationScheduler.init(eventLoop());
186+
// the cancellation scheduler uses channelEventLoop since #start is called
187+
// from the netty pipeline logic
188+
this.requestCancellationScheduler.init(ch.eventLoop());
182189
}
183190
this.sslSession = sslSession;
184191
this.proxiedAddresses = requireNonNull(proxiedAddresses, "proxiedAddresses");
@@ -301,7 +308,7 @@ public ContextAwareEventLoop eventLoop() {
301308
if (contextAwareEventLoop != null) {
302309
return contextAwareEventLoop;
303310
}
304-
return contextAwareEventLoop = ContextAwareEventLoop.of(this, ch.eventLoop());
311+
return contextAwareEventLoop = ContextAwareEventLoop.of(this, eventLoop);
305312
}
306313

307314
@Override

core/src/main/java/com/linecorp/armeria/server/AbstractAnnotatedServiceConfigSetters.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import com.linecorp.armeria.server.annotation.ResponseConverterFunction;
4848
import com.linecorp.armeria.server.logging.AccessLogWriter;
4949

50+
import io.netty.channel.EventLoopGroup;
51+
5052
@UnstableApi
5153
abstract class AbstractAnnotatedServiceConfigSetters implements AnnotatedServiceConfigSetters {
5254

@@ -280,6 +282,18 @@ public AbstractAnnotatedServiceConfigSetters multipartUploadsLocation(Path multi
280282
return this;
281283
}
282284

285+
@Override
286+
public ServiceConfigSetters serviceWorkerGroup(EventLoopGroup serviceWorkerGroup, boolean shutdownOnStop) {
287+
defaultServiceConfigSetters.serviceWorkerGroup(serviceWorkerGroup, shutdownOnStop);
288+
return this;
289+
}
290+
291+
@Override
292+
public ServiceConfigSetters serviceWorkerGroup(int numThreads) {
293+
defaultServiceConfigSetters.serviceWorkerGroup(numThreads);
294+
return this;
295+
}
296+
283297
@Override
284298
public AbstractAnnotatedServiceConfigSetters requestIdGenerator(
285299
Function<? super RoutingContext, ? extends RequestId> requestIdGenerator) {

core/src/main/java/com/linecorp/armeria/server/AbstractServiceBindingBuilder.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import com.linecorp.armeria.common.util.BlockingTaskExecutor;
3636
import com.linecorp.armeria.server.logging.AccessLogWriter;
3737

38+
import io.netty.channel.EventLoopGroup;
39+
3840
/**
3941
* A builder class for binding an {@link HttpService} fluently.
4042
*
@@ -175,6 +177,19 @@ public AbstractServiceBindingBuilder multipartUploadsLocation(Path multipartUplo
175177
return this;
176178
}
177179

180+
@Override
181+
public AbstractServiceBindingBuilder serviceWorkerGroup(EventLoopGroup serviceWorkerGroup,
182+
boolean shutdownOnStop) {
183+
defaultServiceConfigSetters.serviceWorkerGroup(serviceWorkerGroup, shutdownOnStop);
184+
return this;
185+
}
186+
187+
@Override
188+
public AbstractServiceBindingBuilder serviceWorkerGroup(int numThreads) {
189+
defaultServiceConfigSetters.serviceWorkerGroup(numThreads);
190+
return this;
191+
}
192+
178193
@Override
179194
public AbstractServiceBindingBuilder requestIdGenerator(
180195
Function<? super RoutingContext, ? extends RequestId> requestIdGenerator) {

core/src/main/java/com/linecorp/armeria/server/AggregatedHttpResponseHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ final class AggregatedHttpResponseHandler extends AbstractHttpResponseHandler
5757

5858
@Override
5959
public Void apply(@Nullable AggregatedHttpResponse response, @Nullable Throwable cause) {
60-
final EventLoop eventLoop = reqCtx.eventLoop();
60+
final EventLoop eventLoop = ctx.channel().eventLoop();
6161
if (eventLoop.inEventLoop()) {
6262
apply0(response, cause);
6363
} else {

0 commit comments

Comments
 (0)