diff --git a/core/src/main/java/com/linecorp/armeria/client/HttpResponseWrapper.java b/core/src/main/java/com/linecorp/armeria/client/HttpResponseWrapper.java index dfd3a91fb6a..695557a68cc 100644 --- a/core/src/main/java/com/linecorp/armeria/client/HttpResponseWrapper.java +++ b/core/src/main/java/com/linecorp/armeria/client/HttpResponseWrapper.java @@ -25,6 +25,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpHeaders; import com.linecorp.armeria.common.HttpObject; @@ -58,6 +60,8 @@ class HttpResponseWrapper implements StreamWriter { private final EventLoop eventLoop; private final ClientRequestContext ctx; private final long maxContentLength; + @VisibleForTesting + static final String UNEXPECTED_EXCEPTION_MSG = "Unexpected exception while closing a request"; private boolean responseStarted; private long contentLengthHeaderValue = -1; @@ -232,14 +236,16 @@ void close(@Nullable Throwable cause, boolean cancel) { requestAutoAbortDelayMillis, TimeUnit.MILLISECONDS); } - private void closeAction(@Nullable Throwable cause) { + private boolean closeAction(@Nullable Throwable cause) { + final boolean closed; if (cause != null) { - delegate.close(cause); + closed = delegate.tryClose(cause); ctx.logBuilder().endResponse(cause); } else { - delegate.close(); + closed = delegate.tryClose(); ctx.logBuilder().endResponse(); } + return closed; } private void cancelAction(@Nullable Throwable cause) { @@ -262,8 +268,10 @@ private void cancelTimeoutAndLog(@Nullable Throwable cause, boolean cancel) { cancelAction(cause); return; } - if (delegate.isOpen()) { - closeAction(cause); + + // don't log if the cause will be exposed via the response/log + if (delegate.isOpen() && closeAction(cause)) { + return; } // the context has been cancelled either by timeout or by user invocation @@ -275,7 +283,7 @@ private void cancelTimeoutAndLog(@Nullable Throwable cause, boolean cancel) { return; } - final StringBuilder logMsg = new StringBuilder("Unexpected exception while closing a request"); + final StringBuilder logMsg = new StringBuilder(UNEXPECTED_EXCEPTION_MSG); final HttpRequest request = ctx.request(); assert request != null; final String authority = request.authority(); diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/DefaultStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/DefaultStreamMessage.java index 856e593c18c..58ae76a7d38 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/DefaultStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/DefaultStreamMessage.java @@ -441,28 +441,41 @@ private void handleCloseEvent(SubscriptionImpl subscription, CloseEvent o) { @Override public void close() { - if (setState(State.OPEN, State.CLOSED)) { - addObjectOrEvent(SUCCESSFUL_CLOSE); - } + tryClose(); } @Override public final void close(Throwable cause) { requireNonNull(cause, "cause"); - if (cause instanceof CancelledSubscriptionException) { - throw new IllegalArgumentException("cause: " + cause + " (must use Subscription.cancel())"); - } tryClose(cause); } + /** + * Tries to close the stream. + * + * @return {@code true} if the stream has been closed by this method call. + * {@code false} if the stream has been closed already by another party. + */ + @UnstableApi + public final boolean tryClose() { + if (setState(State.OPEN, State.CLOSED)) { + addObjectOrEvent(SUCCESSFUL_CLOSE); + return true; + } + return false; + } + /** * Tries to close the stream with the specified {@code cause}. * * @return {@code true} if the stream has been closed by this method call. - * {@code false} if the stream has been closed already by other party. + * {@code false} if the stream has been closed already by another party. */ public final boolean tryClose(Throwable cause) { + if (cause instanceof CancelledSubscriptionException) { + throw new IllegalArgumentException("cause: " + cause + " (must use Subscription.cancel())"); + } if (setState(State.OPEN, State.CLOSED)) { addObjectOrEvent(new CloseEvent(cause)); return true; diff --git a/core/src/test/java/com/linecorp/armeria/client/Http2GoAwayTest.java b/core/src/test/java/com/linecorp/armeria/client/Http2GoAwayTest.java index 287f505c1af..dad61f85650 100644 --- a/core/src/test/java/com/linecorp/armeria/client/Http2GoAwayTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/Http2GoAwayTest.java @@ -15,12 +15,13 @@ */ package com.linecorp.armeria.client; -import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf; +import static com.linecorp.armeria.internal.testing.Http2ByteUtil.handleInitialExchange; +import static com.linecorp.armeria.internal.testing.Http2ByteUtil.newClientFactory; +import static com.linecorp.armeria.internal.testing.Http2ByteUtil.readFrame; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.BufferedOutputStream; -import java.io.IOException; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; @@ -31,14 +32,9 @@ import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; -import com.google.common.io.ByteStreams; - import com.linecorp.armeria.common.AggregatedHttpResponse; import com.linecorp.armeria.testing.junit5.common.EventLoopExtension; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.handler.codec.http2.Http2CodecUtil; import io.netty.handler.codec.http2.Http2FrameTypes; @Timeout(10) @@ -53,7 +49,7 @@ class Http2GoAwayTest { @Test void streamEndsBeforeGoAway() throws Exception { try (ServerSocket ss = new ServerSocket(0); - ClientFactory clientFactory = newClientFactory()) { + ClientFactory clientFactory = newClientFactory(eventLoop.get())) { final int port = ss.getLocalPort(); @@ -101,7 +97,7 @@ void streamEndsBeforeGoAway() throws Exception { @Test void streamEndsAfterGoAway() throws Exception { try (ServerSocket ss = new ServerSocket(0); - ClientFactory clientFactory = newClientFactory()) { + ClientFactory clientFactory = newClientFactory(eventLoop.get())) { final int port = ss.getLocalPort(); @@ -150,7 +146,7 @@ void streamEndsAfterGoAway() throws Exception { @Test void streamGreaterThanLastStreamId() throws Exception { try (ServerSocket ss = new ServerSocket(0); - ClientFactory clientFactory = newClientFactory()) { + ClientFactory clientFactory = newClientFactory(eventLoop.get())) { final int port = ss.getLocalPort(); @@ -208,55 +204,4 @@ void streamGreaterThanLastStreamId() throws Exception { } } } - - private static ClientFactory newClientFactory() { - return ClientFactory.builder() - .useHttp2Preface(true) - // Set the window size to the HTTP/2 default values to simplify the traffic. - .http2InitialConnectionWindowSize(Http2CodecUtil.DEFAULT_WINDOW_SIZE) - .http2InitialStreamWindowSize(Http2CodecUtil.DEFAULT_WINDOW_SIZE) - .workerGroup(eventLoop.get(), false) - .build(); - } - - private static void handleInitialExchange(InputStream in, BufferedOutputStream out) throws IOException { - // Read the connection preface and discard it. - readBytes(in, connectionPrefaceBuf().readableBytes()); - - // Read a SETTINGS frame. - assertThat(readFrame(in).getByte(3)).isEqualTo(Http2FrameTypes.SETTINGS); - - // Send a SETTINGS frame and the ack for the received SETTINGS frame. - sendEmptySettingsAndAckFrame(out); - - // Read a SETTINGS ack frame. - assertThat(readFrame(in).getByte(3)).isEqualTo(Http2FrameTypes.SETTINGS); - } - - private static byte[] readBytes(InputStream in, int length) throws IOException { - final byte[] buf = new byte[length]; - ByteStreams.readFully(in, buf); - return buf; - } - - private static void sendEmptySettingsAndAckFrame(BufferedOutputStream bos) throws IOException { - // Send an empty SETTINGS frame. - bos.write(new byte[] { 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00 }); - // Send a SETTINGS_ACK frame. - bos.write(new byte[] { 0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00 }); - bos.flush(); - } - - private static int payloadLength(byte[] buf) { - return (buf[0] & 0xff) << 16 | (buf[1] & 0xff) << 8 | (buf[2] & 0xff); - } - - private static ByteBuf readFrame(InputStream in) throws IOException { - final byte[] frameBuf = readBytes(in, 9); - final int payloadLength = payloadLength(frameBuf); - final ByteBuf buffer = Unpooled.buffer(9 + payloadLength); - buffer.writeBytes(frameBuf); - buffer.writeBytes(in, payloadLength); - return buffer; - } } diff --git a/it/internal-logging/src/test/java/com/linecorp/armeria/client/HttpResponseWrapperLogTest.java b/it/internal-logging/src/test/java/com/linecorp/armeria/client/HttpResponseWrapperLogTest.java new file mode 100644 index 00000000000..39ac9085805 --- /dev/null +++ b/it/internal-logging/src/test/java/com/linecorp/armeria/client/HttpResponseWrapperLogTest.java @@ -0,0 +1,119 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.client; + +import static com.linecorp.armeria.internal.testing.Http2ByteUtil.handleInitialExchange; +import static com.linecorp.armeria.internal.testing.Http2ByteUtil.newClientFactory; +import static com.linecorp.armeria.internal.testing.Http2ByteUtil.readFrame; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; + +import java.io.BufferedOutputStream; +import java.io.InputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.LoggerFactory; + +import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.ClosedSessionException; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.testing.junit5.common.EventLoopExtension; + +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; +import io.netty.handler.codec.http2.Http2FrameTypes; + +class HttpResponseWrapperLogTest { + + @RegisterExtension + static final EventLoopExtension eventLoop = new EventLoopExtension(); + + private static final LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); + private static final Logger logger = + (Logger) LoggerFactory.getLogger(HttpResponseWrapper.class); + private static final ListAppender appender = new ListAppender<>(); + + @BeforeEach + void beforeEach() { + appender.setContext(context); + appender.start(); + logger.addAppender(appender); + } + + @AfterEach + void afterEach() { + appender.stop(); + logger.detachAppender(appender); + } + + @Test + void goAwayNotLogged() throws Exception { + try (ServerSocket ss = new ServerSocket(0); + ClientFactory clientFactory = newClientFactory(eventLoop.get())) { + + final int port = ss.getLocalPort(); + + final WebClient client = WebClient.builder("h2c://127.0.0.1:" + port) + .factory(clientFactory) + .build(); + final HttpRequest req = HttpRequest.streaming(HttpMethod.GET, "/"); + final CompletableFuture resFuture = client.execute(req).aggregate(); + try (Socket s = ss.accept()) { + + final InputStream in = s.getInputStream(); + final BufferedOutputStream bos = new BufferedOutputStream(s.getOutputStream()); + handleInitialExchange(in, bos); + + // Read a HEADERS frame. + assertThat(readFrame(in).getByte(3)).isEqualTo(Http2FrameTypes.HEADERS); + + // Send a GOAWAY frame. + bos.write(new byte[] { + 0x00, 0x00, 0x08, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x03, // lastStreamId = 3 + 0x00, 0x00, 0x00, 0x00 // errorCode = 0 + }); + bos.flush(); + + // The second request should fail with UnprocessedRequestException + // which has a cause of GoAwayReceivedException. + await().untilAsserted(resFuture::isCompletedExceptionally); + assertThatThrownBy(resFuture::join).isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(ClosedSessionException.class); + + // Read a GOAWAY frame. + assertThat(readFrame(in).getByte(3)).isEqualTo(Http2FrameTypes.GO_AWAY); + + assertThat(in.read()).isEqualTo(-1); + } + } + assertThat(appender.list).allSatisfy(event -> { + assertThat(event.getMessage()) + .doesNotContain(HttpResponseWrapper.UNEXPECTED_EXCEPTION_MSG); + }); + } +} diff --git a/settings.gradle b/settings.gradle index 64a00b3b1af..537258a8287 100644 --- a/settings.gradle +++ b/settings.gradle @@ -207,6 +207,7 @@ includeWithFlags ':it:builders', 'java' includeWithFlags ':it:context-storage', 'java' includeWithFlags ':it:dgs', 'java17' includeWithFlags ':it:flags-cyclic-dep', 'java' +includeWithFlags ':it:flags-provider', 'java', 'relocate' includeWithFlags ':it:graphql-multipart', 'java17' includeWithFlags ':it:grpcweb', 'java', 'akka-grpc_2.13' includeWithFlags ':it:grpc:java', 'java' @@ -214,7 +215,7 @@ includeWithFlags ':it:grpc:kotlin', 'java', 'relocate includeWithFlags ':it:grpc:kotlin-coroutine-context-provider', 'java', 'relocate', 'kotlin-grpc', 'kotlin' includeWithFlags ':it:grpc:scala', 'java', 'relocate', 'scala-grpc_2.13', 'scala_2.13' includeWithFlags ':it:grpc:reactor', 'java', 'relocate', 'reactor-grpc' -includeWithFlags ':it:flags-provider', 'java', 'relocate' +includeWithFlags ':it:internal-logging', 'java', 'relocate' includeWithFlags ':it:jackson-provider', 'java', 'relocate' includeWithFlags ':it:kotlin', 'java', 'relocate', 'kotlin' includeWithFlags ':it:kubernetes-chaos-tests', 'java', 'relocate' diff --git a/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/Http2ByteUtil.java b/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/Http2ByteUtil.java new file mode 100644 index 00000000000..22a3658ff23 --- /dev/null +++ b/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/Http2ByteUtil.java @@ -0,0 +1,90 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.internal.testing; + +import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import com.google.common.io.ByteStreams; + +import com.linecorp.armeria.client.ClientFactory; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.EventLoop; +import io.netty.handler.codec.http2.Http2CodecUtil; +import io.netty.handler.codec.http2.Http2FrameTypes; + +public final class Http2ByteUtil { + + public static ClientFactory newClientFactory(EventLoop eventLoop) { + return ClientFactory.builder() + .useHttp2Preface(true) + // Set the window size to the HTTP/2 default values to simplify the traffic. + .http2InitialConnectionWindowSize(Http2CodecUtil.DEFAULT_WINDOW_SIZE) + .http2InitialStreamWindowSize(Http2CodecUtil.DEFAULT_WINDOW_SIZE) + .workerGroup(eventLoop, false) + .build(); + } + + public static void handleInitialExchange(InputStream in, BufferedOutputStream out) throws IOException { + // Read the connection preface and discard it. + readBytes(in, connectionPrefaceBuf().readableBytes()); + + // Read a SETTINGS frame. + assertThat(readFrame(in).getByte(3)).isEqualTo(Http2FrameTypes.SETTINGS); + + // Send a SETTINGS frame and the ack for the received SETTINGS frame. + sendEmptySettingsAndAckFrame(out); + + // Read a SETTINGS ack frame. + assertThat(readFrame(in).getByte(3)).isEqualTo(Http2FrameTypes.SETTINGS); + } + + public static byte[] readBytes(InputStream in, int length) throws IOException { + final byte[] buf = new byte[length]; + ByteStreams.readFully(in, buf); + return buf; + } + + public static void sendEmptySettingsAndAckFrame(BufferedOutputStream bos) throws IOException { + // Send an empty SETTINGS frame. + bos.write(new byte[] { 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00 }); + // Send a SETTINGS_ACK frame. + bos.write(new byte[] { 0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00 }); + bos.flush(); + } + + public static int payloadLength(byte[] buf) { + return (buf[0] & 0xff) << 16 | (buf[1] & 0xff) << 8 | (buf[2] & 0xff); + } + + public static ByteBuf readFrame(InputStream in) throws IOException { + final byte[] frameBuf = readBytes(in, 9); + final int payloadLength = payloadLength(frameBuf); + final ByteBuf buffer = Unpooled.buffer(9 + payloadLength); + buffer.writeBytes(frameBuf); + buffer.writeBytes(in, payloadLength); + return buffer; + } + + private Http2ByteUtil() {} +}