Skip to content

Commit

Permalink
Fixed bug where a request timeout is scheduled after receiving the en…
Browse files Browse the repository at this point in the history
…tire body (#5614)

Motivation:

If `ExchangeType` is `UNARY` or `RESPONSE_STREAMING` for a service, a timeout is scheduled after EOS is received. If it takes a long time to receive a full request body, the request will not end with 503 Service Unavailable even after the request timeout set by the user.

Modifications:

- Immediately fire a `DecodedHttpRequest` when request headers are received in `Http{1,2}RequestDecoder`
- `HttpServerHandler` uses `whenAggregated()` to wait for the unary request to be aggregated on behalf of `Http{1,2}RequestDecoder`
- Remove some methods specialized for `AggregatedDecodedHttpRequest` has been deleted

Result:

Unary and response streaming requests are now properly timed out if they fail to receive the body within the designated timeout period.
  • Loading branch information
ikhoon authored and minwoox committed Apr 18, 2024
1 parent 9879752 commit ae469b3
Show file tree
Hide file tree
Showing 17 changed files with 219 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public final void cancel() {
}

@Override
public final void abort() {
public void abort() {
if (isDone()) {
return;
}
Expand All @@ -160,7 +160,7 @@ public final void abort() {
}

@Override
public final void abort(Throwable cause) {
public void abort(Throwable cause) {
if (isDone()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ public void run(Throwable cause) {
// A stream or connection was already closed by a client
fail(cause);
} else {
req.setShouldResetOnlyIfRemoteIsOpen(true);
req.abortResponse(cause, false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.internal.common.InboundTrafficController;
import com.linecorp.armeria.internal.common.stream.AggregatingStreamMessage;

import io.netty.channel.EventLoop;
Expand All @@ -50,15 +49,15 @@ final class AggregatingDecodedHttpRequest extends AggregatingStreamMessage<HttpO

@Nullable
private ServiceRequestContext ctx;
@Nullable
private HttpHeaders trailers;
private long transferredBytes;

@Nullable
private HttpResponse response;
@Nullable
private Throwable abortResponseCause;

private final CompletableFuture<Void> aggregationFuture = new CompletableFuture<>();

AggregatingDecodedHttpRequest(EventLoop eventLoop, int id, int streamId, RequestHeaders headers,
boolean keepAlive, long maxRequestLength,
RoutingContext routingCtx, ExchangeType exchangeType,
Expand All @@ -80,15 +79,6 @@ final class AggregatingDecodedHttpRequest extends AggregatingStreamMessage<HttpO
@Override
public void init(ServiceRequestContext ctx) {
this.ctx = ctx;
ctx.logBuilder().increaseRequestLength(transferredBytes);
if (trailers != null) {
ctx.logBuilder().requestTrailers(trailers);
}
}

@Override
public boolean isInitialized() {
return ctx != null;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -149,16 +139,19 @@ public EventLoop defaultSubscriberExecutor() {

@Override
public boolean tryWrite(HttpObject obj) {
assert ctx != null : "uninitialized DecodedHttpRequest must be aborted.";
final boolean published = super.tryWrite(obj);

if (obj instanceof HttpData) {
((HttpData) obj).touch(routingCtx);
final HttpData httpData = (HttpData) obj;
httpData.touch(ctx);
ctx.logBuilder().increaseRequestLength(httpData);
if (obj.isEndOfStream()) {
close();
}
}
if (obj instanceof HttpHeaders) {
trailers = (HttpHeaders) obj;
ctx.logBuilder().requestTrailers((HttpHeaders) obj);
close();
}
return published;
Expand All @@ -184,25 +177,53 @@ public void abortResponse(Throwable cause, boolean cancel) {
}
abortResponseCause = cause;

super.close(cause);
// Make sure to invoke the ServiceRequestContext.whenRequestCancelling() and whenRequestCancelled()
// by cancelling a request
if (cancel && ctx != null) {
ctx.cancel(cause);
}

// Complete aggregationFuture first to execute the aborted request with the service and decorators and
// then abort the response.
aggregationFuture.complete(null);
if (response != null && !response.isComplete()) {
response.abort(cause);
}
}

@Override
public void close() {
super.close();
aggregationFuture.complete(null);
}

@Override
public void close(Throwable cause) {
super.close(cause);
aggregationFuture.complete(null);
}

@Override
public void abort() {
super.abort();
aggregationFuture.complete(null);
}

@Override
public void abort(Throwable cause) {
super.abort(cause);
aggregationFuture.complete(null);
}

@Override
public boolean isResponseAborted() {
return abortResponseCause != null;
}

@Override
public boolean needsAggregation() {
return true;
public CompletableFuture<Void> whenAggregated() {
return aggregationFuture;
}

@Override
Expand All @@ -224,18 +245,4 @@ public long requestStartTimeMicros() {
public RequestHeaders headers() {
return headers;
}

@Override
public StreamingDecodedHttpRequest toAbortedStreaming(
InboundTrafficController inboundTrafficController,
Throwable cause, boolean shouldResetOnlyIfRemoteIsOpen) {
final StreamingDecodedHttpRequest streamingDecodedHttpRequest = new StreamingDecodedHttpRequest(
eventLoop, id, streamId, headers, keepAlive,
inboundTrafficController, maxRequestLength, routingCtx,
exchangeType, requestStartTimeNanos, requestStartTimeMicros,
false, shouldResetOnlyIfRemoteIsOpen);
abort(cause);
streamingDecodedHttpRequest.abortResponse(cause, true);
return streamingDecodedHttpRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.linecorp.armeria.server;

import java.util.concurrent.CompletableFuture;

import com.linecorp.armeria.common.ExchangeType;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
Expand Down Expand Up @@ -73,8 +75,6 @@ static DecodedHttpRequest of(boolean endOfStream, EventLoop eventLoop, int id, i

void init(ServiceRequestContext ctx);

boolean isInitialized();

RoutingContext routingContext();

/**
Expand Down Expand Up @@ -108,9 +108,13 @@ static DecodedHttpRequest of(boolean endOfStream, EventLoop eventLoop, int id, i
boolean isResponseAborted();

/**
* Returns whether the request should be fully aggregated before passed to the {@link HttpServerHandler}.
* Returns a {@link CompletableFuture} that is completed the request is fully aggregated.
* {@code null} if the request does not need to be aggregated.
*/
boolean needsAggregation();
@Nullable
default CompletableFuture<Void> whenAggregated() {
return null;
}

/**
* Returns the {@link ExchangeType} that determines whether to stream an {@link HttpRequest} or
Expand All @@ -136,17 +140,6 @@ default boolean isHttp1WebSocket() {
return false;
}

/**
* Returns a new {@link StreamingDecodedHttpRequest} whose corresponding response is aborted using the
* specified {@link Throwable}. This is called when an {@link AggregatingDecodedHttpRequest}
* needs to be passed to a service before it's closed.
*/
default StreamingDecodedHttpRequest toAbortedStreaming(
InboundTrafficController inboundTrafficController,
Throwable cause, boolean shouldResetOnlyIfRemoteIsOpen) {
throw new UnsupportedOperationException();
}

/**
* Sets whether to send an RST_STREAM after the response sending response when the peer is open state.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@ public void init(ServiceRequestContext ctx) {
this.ctx = ctx;
}

@Override
public boolean isInitialized() {
return ctx != null;
}

@Override
public RoutingContext routingContext() {
return routingContext;
Expand Down Expand Up @@ -233,11 +228,6 @@ public boolean isResponseAborted() {
return abortResponseCause != null;
}

@Override
public boolean needsAggregation() {
return false;
}

@Override
public ExchangeType exchangeType() {
return exchangeType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
this.req = req = DecodedHttpRequest.of(endOfStream, eventLoop, id, 1, headers,
keepAlive, inboundTrafficController, routingCtx);

// An aggregating request will be fired after all objects are collected.
if (!req.needsAggregation()) {
ctx.fireChannelRead(req);
}
ctx.fireChannelRead(req);
} else {
fail(id, null, HttpStatus.BAD_REQUEST, "Invalid decoder state", null);
return;
Expand Down Expand Up @@ -335,15 +332,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
// status.
final HttpStatusException httpStatusException =
HttpStatusException.of(HttpStatus.REQUEST_ENTITY_TOO_LARGE, cause);
if (!decodedReq.isInitialized()) {
assert decodedReq.needsAggregation();
final StreamingDecodedHttpRequest streamingReq = decodedReq.toAbortedStreaming(
inboundTrafficController, httpStatusException, shouldReset);
ctx.fireChannelRead(streamingReq);
} else {
decodedReq.setShouldResetOnlyIfRemoteIsOpen(shouldReset);
decodedReq.abortResponse(httpStatusException, true);
}
decodedReq.setShouldResetOnlyIfRemoteIsOpen(shouldReset);
decodedReq.abortResponse(httpStatusException, true);
return;
}

Expand All @@ -358,11 +348,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
decodedReq.write(ArmeriaHttpUtil.toArmeria(trailingHeaders));
}
decodedReq.close();
if (decodedReq.needsAggregation()) {
assert !decodedReq.isInitialized();
// An aggregated request is now ready to be fired.
ctx.fireChannelRead(decodedReq);
}
this.req = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,7 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers
req = DecodedHttpRequest.of(endOfStream, eventLoop, id, streamId, headers, true,
inboundTrafficController, routingCtx);
requests.put(streamId, req);
// An aggregating request will be fired later after all objects are collected.
if (!req.needsAggregation()) {
ctx.fireChannelRead(req);
}
ctx.fireChannelRead(req);
} else {
if (!(req instanceof DecodedHttpRequestWriter)) {
// Silently ignore the following HEADERS Frames of non-DecodedHttpRequestWriter. The request
Expand All @@ -224,11 +221,6 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers
try {
// Trailers is received. The decodedReq will be automatically closed.
decodedReq.write(trailers);
if (req.needsAggregation()) {
assert !req.isInitialized();
// An aggregated request can be fired now.
ctx.fireChannelRead(req);
}
} catch (Throwable t) {
decodedReq.close(t);
throw Http2Exception.streamError(streamId, INTERNAL_ERROR, t,
Expand Down Expand Up @@ -318,10 +310,6 @@ public int onDataRead(
// Received an empty DATA frame
if (endOfStream) {
req.close();
if (req.needsAggregation()) {
assert !req.isInitialized();
ctx.fireChannelRead(req);
}
}
return padding;
}
Expand All @@ -344,26 +332,12 @@ public int onDataRead(

final HttpStatusException httpStatusException =
HttpStatusException.of(HttpStatus.REQUEST_ENTITY_TOO_LARGE, cause);
if (!decodedReq.isInitialized()) {
assert decodedReq.needsAggregation();
final StreamingDecodedHttpRequest streamingReq =
decodedReq.toAbortedStreaming(inboundTrafficController,
httpStatusException, shouldReset);
requests.put(streamId, streamingReq);
ctx.fireChannelRead(streamingReq);
} else {
decodedReq.setShouldResetOnlyIfRemoteIsOpen(shouldReset);
decodedReq.abortResponse(httpStatusException, true);
}
decodedReq.setShouldResetOnlyIfRemoteIsOpen(shouldReset);
decodedReq.abortResponse(httpStatusException, true);
} else if (decodedReq.isOpen()) {
try {
// The decodedReq will be automatically closed if endOfStream is true.
decodedReq.write(HttpData.wrap(data.retain()).withEndOfStream(endOfStream));
if (endOfStream && decodedReq.needsAggregation()) {
assert !decodedReq.isInitialized();
// An aggregated request is now ready to be fired.
ctx.fireChannelRead(decodedReq);
}
} catch (Throwable t) {
decodedReq.close(t);
throw Http2Exception.streamError(streamId, INTERNAL_ERROR, t,
Expand Down Expand Up @@ -412,13 +386,7 @@ public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorC

final ClosedStreamException cause =
new ClosedStreamException("received a RST_STREAM frame: " + Http2Error.valueOf(errorCode));
if (!req.isInitialized()) {
assert req.needsAggregation();
// Call fireChannelRead so that the cause is logged by LoggingService.
ctx.fireChannelRead(req.toAbortedStreaming(inboundTrafficController, cause, false));
} else {
req.abortResponse(cause, /* cancel */ true);
}
req.abortResponse(cause, /* cancel */ true);
}

@Override
Expand Down
Loading

0 comments on commit ae469b3

Please sign in to comment.