From b68917cf24276493e17391e0ea76e1cec05acfee Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Mon, 5 Aug 2024 20:09:40 +0900 Subject: [PATCH 1/4] Fix a leak in `HttpEncodedResponse` Motivation: An `HttpData` produced in `HttpEncodedResponse.beforeComplete()` is not collected by `CollectingSubscriberAndSubscription` and leaked. ```java Hint: {10B, pooled, } com.linecorp.armeria.common.HttpData.wrap(HttpData.java:110) com.linecorp.armeria.server.encoding.HttpEncodedResponse.beforeComplete(HttpEncodedResponse.java:163) com.linecorp.armeria.common.stream.FilteredStreamMessage.lambda$collect$0(FilteredStreamMessage.java:201) java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) java.base/java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:950) java.base/java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2340) com.linecorp.armeria.common.stream.FilteredStreamMessage.collect(FilteredStreamMessage.java:142) ``` `CollectingSubscriberAndSubscription` was designed to only apply `filter()` of the `upstream.collect()`. I didn't consider that an object could be published via `onNext()` in `beforeComplete()`. `CollectingSubscriberAndSubscription` was added to provide an optimized code path for unary calls. it doesn't seem the code provides a trival performance improvement but the implemetation is complex and error-prone. I was able to fix the code not to leak the data but I didn't want to additional complexity to it. It might be cleaner to use the Reactive Streams API instead of keeping the custom `collect()` implemetation. There will be no change in performance for normal message sizes. Modifications: - Remove the custom `collect()` implemtation in `FilteredStreamMessage`. Result: Fix a potential leak when sending compressed responses. --- .../common/stream/FilteredStreamMessage.java | 117 ------------------ 1 file changed, 117 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/FilteredStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/FilteredStreamMessage.java index ce3b80a494d..6430ce47d07 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/FilteredStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/FilteredStreamMessage.java @@ -16,14 +16,11 @@ package com.linecorp.armeria.common.stream; -import static com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil.EMPTY_OPTIONS; -import static com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil.POOLED_OBJECTS; import static com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil.containsNotifyCancellation; import static com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil.containsWithPooledObjects; import static com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil.toSubscriptionOptions; import static java.util.Objects.requireNonNull; -import java.util.List; import java.util.concurrent.CompletableFuture; import org.reactivestreams.Subscriber; @@ -31,12 +28,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.ImmutableList; - import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; -import com.linecorp.armeria.common.util.Exceptions; import com.linecorp.armeria.internal.common.stream.StreamMessageUtil; import com.linecorp.armeria.unsafe.PooledObjects; @@ -136,79 +130,6 @@ public final CompletableFuture whenComplete() { return completionFuture; } - @Override - public CompletableFuture> collect(EventExecutor executor, SubscriptionOption... options) { - final SubscriptionOption[] filterOptions = filterSupportsPooledObjects ? POOLED_OBJECTS : EMPTY_OPTIONS; - return upstream.collect(executor, filterOptions).handle((result, cause) -> { - // CollectingSubscriberAndSubscription just captures cancel(), onComplete(), and onError() signals - // from the subclass of FilteredStreamMessage. So we need to follow regular Reactive Streams - // specifications. - final CollectingSubscriberAndSubscription subscriberAndSubscription = - new CollectingSubscriberAndSubscription<>(); - beforeSubscribe(subscriberAndSubscription, subscriberAndSubscription); - if (cause != null) { - beforeError(subscriberAndSubscription, cause); - completionFuture.completeExceptionally(cause); - return Exceptions.throwUnsafely(cause); - } else { - Throwable abortCause = null; - final ImmutableList.Builder builder = ImmutableList.builderWithExpectedSize(result.size()); - final boolean withPooledObjects = containsWithPooledObjects(options); - for (T t : result) { - if (abortCause != null) { - // This StreamMessage was aborted already. However, we need to release the remaining - // objects in result. - StreamMessageUtil.closeOrAbort(t, abortCause); - continue; - } - - try { - U filtered = filter(t); - - if (subscriberAndSubscription.completed || subscriberAndSubscription.cause != null || - subscriberAndSubscription.cancelled) { - if (subscriberAndSubscription.cause != null) { - abortCause = cause; - } else { - abortCause = CancelledSubscriptionException.get(); - } - StreamMessageUtil.closeOrAbort(filtered, abortCause); - } else { - requireNonNull(filtered, "filter() returned null"); - if (!withPooledObjects) { - filtered = PooledObjects.copyAndClose(filtered); - } - builder.add(filtered); - } - } catch (Throwable ex) { - // Failed to filter the object. - StreamMessageUtil.closeOrAbort(t, abortCause); - abortCause = ex; - } - } - - final List elements = builder.build(); - if (abortCause != null && !(abortCause instanceof CancelledSubscriptionException)) { - // The stream was aborted with an unsafe exception. - for (U element : elements) { - StreamMessageUtil.closeOrAbort(element, abortCause); - } - completionFuture.completeExceptionally(abortCause); - return Exceptions.throwUnsafely(abortCause); - } - - try { - beforeComplete(subscriberAndSubscription); - completionFuture.complete(null); - } catch (Exception ex) { - completionFuture.completeExceptionally(ex); - throw ex; - } - return elements; - } - }); - } - @Override public final void subscribe(Subscriber subscriber, EventExecutor executor) { subscribe(subscriber, executor, false, false); @@ -351,42 +272,4 @@ public void onComplete() { } } } - - private static final class CollectingSubscriberAndSubscription implements Subscriber, Subscription { - - private boolean completed; - private boolean cancelled; - @Nullable - private Throwable cause; - - @Override - public void onSubscribe(Subscription s) {} - - @Override - public void onNext(T o) {} - - @Override - public void onError(Throwable t) { - if (completed) { - return; - } - cause = t; - } - - @Override - public void onComplete() { - if (cause != null) { - return; - } - completed = true; - } - - @Override - public void request(long n) {} - - @Override - public void cancel() { - cancelled = true; - } - } } From 86ea2cc4e5c044f4a7d4ca621121fbb47fb99239 Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Tue, 6 Aug 2024 01:12:38 +0900 Subject: [PATCH 2/4] fix a broken test --- .../common/stream/FilteredStreamMessage.java | 2 - .../server/encoding/HttpEncodedResponse.java | 2 +- .../encoding/HttpEncodedResponseTest.java | 44 ++++++++++++++++++- 3 files changed, 44 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/FilteredStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/FilteredStreamMessage.java index 6430ce47d07..261195aa00e 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/FilteredStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/FilteredStreamMessage.java @@ -31,7 +31,6 @@ import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; -import com.linecorp.armeria.internal.common.stream.StreamMessageUtil; import com.linecorp.armeria.unsafe.PooledObjects; import io.netty.util.concurrent.EventExecutor; @@ -219,7 +218,6 @@ public void onNext(T o) { try { filtered = filter(o); } catch (Throwable ex) { - StreamMessageUtil.closeOrAbort(o); // onError(ex) should be called before upstream.cancel() to deliver the cause to downstream. // upstream.cancel() and make downstream closed with CancelledSubscriptionException // before sending the actual cause. diff --git a/core/src/main/java/com/linecorp/armeria/server/encoding/HttpEncodedResponse.java b/core/src/main/java/com/linecorp/armeria/server/encoding/HttpEncodedResponse.java index 3ccc19d6061..75d25052c90 100644 --- a/core/src/main/java/com/linecorp/armeria/server/encoding/HttpEncodedResponse.java +++ b/core/src/main/java/com/linecorp/armeria/server/encoding/HttpEncodedResponse.java @@ -143,7 +143,7 @@ protected HttpObject filter(HttpObject obj) { encodedBuf.readerIndex(encodedBuf.writerIndex()); return httpData; } catch (IOException e) { - // An unreleased ByteBuf will be released by `beforeError()` + // An unreleased ByteBuf in `encodedStream` will be released by `beforeError()` throw new IllegalStateException( "Error encoding HttpData, this should not happen with byte arrays.", e); diff --git a/core/src/test/java/com/linecorp/armeria/server/encoding/HttpEncodedResponseTest.java b/core/src/test/java/com/linecorp/armeria/server/encoding/HttpEncodedResponseTest.java index 61c6016808e..567c53c8313 100644 --- a/core/src/test/java/com/linecorp/armeria/server/encoding/HttpEncodedResponseTest.java +++ b/core/src/test/java/com/linecorp/armeria/server/encoding/HttpEncodedResponseTest.java @@ -19,6 +19,8 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; +import java.io.IOException; +import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.CompletionException; @@ -30,6 +32,7 @@ import org.reactivestreams.Subscription; import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.AggregationOptions; import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpObject; import com.linecorp.armeria.common.HttpResponse; @@ -43,9 +46,11 @@ import com.linecorp.armeria.common.stream.NoopSubscriber; import com.linecorp.armeria.common.stream.SubscriptionOption; import com.linecorp.armeria.internal.common.encoding.StreamEncoderFactories; +import com.linecorp.armeria.internal.common.encoding.StreamEncoderFactory; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.Unpooled; import io.netty.util.concurrent.ImmediateEventExecutor; import reactor.test.StepVerifier; @@ -53,7 +58,7 @@ class HttpEncodedResponseTest { @Test - void testLeak() { + void testLeakOnSubscribe() { final ByteBuf buf = Unpooled.directBuffer(); buf.writeCharSequence("foo", StandardCharsets.UTF_8); @@ -71,6 +76,43 @@ void testLeak() { assertThat(buf.refCnt()).isZero(); } + @Test + void testLeakOnError() { + final ByteBuf buf = Unpooled.directBuffer(); + buf.writeCharSequence("foo", StandardCharsets.UTF_8); + + final HttpResponse orig = HttpResponse.of(HttpStatus.OK, + MediaType.PLAIN_TEXT_UTF_8, + HttpData.wrap(buf).withEndOfStream()); + final StreamEncoderFactory throwingEncoderFactory = new StreamEncoderFactory() { + @Override + public String encodingHeaderValue() { + return "gzip"; + } + + @Override + public OutputStream newEncoder(ByteBufOutputStream os) { + return new OutputStream() { + @Override + public void write(int b) throws IOException { + throw new IOException("Oops"); + } + }; + } + }; + final HttpEncodedResponse encoded = new HttpEncodedResponse( + orig, throwingEncoderFactory, mediaType -> true, ByteBufAllocator.DEFAULT, 1); + + assertThatThrownBy(() -> { + encoded.aggregate(AggregationOptions.usePooledObjects(ByteBufAllocator.DEFAULT)).join(); + }).isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(IllegalStateException.class) + .hasRootCauseInstanceOf(IOException.class); + + // 'buf' should be released. + assertThat(buf.refCnt()).isZero(); + } + @Test void shouldReleaseEncodedStreamOnCancel() { final HttpResponse orig = From 692103472b897b926f9e6ce511a0f7e7a3ac4c68 Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Tue, 6 Aug 2024 14:56:16 +0900 Subject: [PATCH 3/4] Handle corner cases --- .../common/stream/AsyncMapStreamMessage.java | 9 +++--- .../common/stream/FilteredStreamMessage.java | 10 +++++-- .../common/stream/FuseableStreamMessage.java | 2 +- .../armeria/common/stream/StreamMessage.java | 7 +++++ .../stream/StreamMessageInputStream.java | 2 +- .../common/stream/SurroundingPublisher.java | 2 +- .../stream/StreamMessageCollectingTest.java | 29 +++++++++---------- .../HttpMessageSubscriberAdapter.java | 3 +- .../web/reactive/ChannelSendOperator.java | 2 +- 9 files changed, 39 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/AsyncMapStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/AsyncMapStreamMessage.java index d18edf951e2..f1b46fed5f2 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/AsyncMapStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/AsyncMapStreamMessage.java @@ -157,11 +157,12 @@ public void onNext(T item) { } catch (Throwable ex) { StreamMessageUtil.closeOrAbort(item, ex); + // onError(ex) should be called before upstream.cancel() that may close downstream with + // CancelledSubscriptionException. + onError(ex); final Subscription upstream = this.upstream; assert upstream != null; upstream.cancel(); - - onError(ex); } } @@ -179,8 +180,8 @@ private void publishDownstream(@Nullable U item, @Nullable Throwable cause) { try { if (cause != null) { - upstream.cancel(); onError(cause); + upstream.cancel(); } else { requireNonNull(item, "function.apply()'s future completed with null"); downstream.onNext(item); @@ -205,8 +206,8 @@ private void publishDownstream(@Nullable U item, @Nullable Throwable cause) { if (item != null) { StreamMessageUtil.closeOrAbort(item, ex); } - upstream.cancel(); onError(ex); + upstream.cancel(); } } diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/FilteredStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/FilteredStreamMessage.java index 261195aa00e..6c7feff61f5 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/FilteredStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/FilteredStreamMessage.java @@ -31,6 +31,7 @@ import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.internal.common.stream.StreamMessageUtil; import com.linecorp.armeria.unsafe.PooledObjects; import io.netty.util.concurrent.EventExecutor; @@ -219,8 +220,8 @@ public void onNext(T o) { filtered = filter(o); } catch (Throwable ex) { // onError(ex) should be called before upstream.cancel() to deliver the cause to downstream. - // upstream.cancel() and make downstream closed with CancelledSubscriptionException - // before sending the actual cause. + // upstream.cancel() may close downstream with CancelledSubscriptionException before sending + // the actual cause. onError(ex); assert upstream != null; @@ -228,6 +229,11 @@ public void onNext(T o) { return; } + if (completed) { + // onError(Throwable) or onComplete() has been called in filter(). + StreamMessageUtil.closeOrAbort(filtered); + return; + } if (!subscribedWithPooledObjects) { filtered = PooledObjects.copyAndClose(filtered); } diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/FuseableStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/FuseableStreamMessage.java index 0ddbdc61d4a..5777b99be7b 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/FuseableStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/FuseableStreamMessage.java @@ -295,8 +295,8 @@ public void onNext(Object item) { if (result != null && item != result) { StreamMessageUtil.closeOrAbort(result, ex); } - upstream.cancel(); onError(ex); + upstream.cancel(); } } diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java index 1eae26bc655..60727d202bc 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java @@ -20,6 +20,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.linecorp.armeria.common.stream.StreamMessageUtil.createStreamMessageFrom; import static com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil.EMPTY_OPTIONS; +import static com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil.containsNotifyCancellation; import static java.util.Objects.requireNonNull; import java.io.File; @@ -43,6 +44,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.collect.ObjectArrays; import com.linecorp.armeria.common.CommonPools; import com.linecorp.armeria.common.HttpData; @@ -752,6 +754,11 @@ default CompletableFuture> collect(EventExecutor executor, SubscriptionO requireNonNull(executor, "executor"); requireNonNull(options, "options"); final StreamMessageCollector collector = new StreamMessageCollector<>(options); + if (!containsNotifyCancellation(options)) { + // Make the return CompletableFuture completed exceptionally if the stream is cancelled while + // collecting the elements. + options = ObjectArrays.concat(options, SubscriptionOption.NOTIFY_CANCELLATION); + } subscribe(collector, executor, options); return collector.collect(); } diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessageInputStream.java b/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessageInputStream.java index 9c7b114a19f..4c9b1b80d05 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessageInputStream.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/StreamMessageInputStream.java @@ -145,10 +145,10 @@ public void onNext(T item) { byteBufsInputStream.add(result.byteBuf()); } catch (Throwable ex) { StreamMessageUtil.closeOrAbort(item, ex); + onError(ex); final Subscription upstream = this.upstream; assert upstream != null; upstream.cancel(); - onError(ex); } } diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/stream/SurroundingPublisher.java b/core/src/main/java/com/linecorp/armeria/internal/common/stream/SurroundingPublisher.java index 53a80e1d175..155321ea3a2 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/stream/SurroundingPublisher.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/stream/SurroundingPublisher.java @@ -445,11 +445,11 @@ private void close0(@Nullable Throwable cause) { downstream.onComplete(); completionFuture.complete(null); } else { + downstream.onError(cause); final Subscription upstream = this.upstream; if (upstream != null) { upstream.cancel(); } - downstream.onError(cause); completionFuture.completeExceptionally(cause); } release(cause); diff --git a/core/src/test/java/com/linecorp/armeria/common/stream/StreamMessageCollectingTest.java b/core/src/test/java/com/linecorp/armeria/common/stream/StreamMessageCollectingTest.java index 1f108e42390..c00e58c9071 100644 --- a/core/src/test/java/com/linecorp/armeria/common/stream/StreamMessageCollectingTest.java +++ b/core/src/test/java/com/linecorp/armeria/common/stream/StreamMessageCollectingTest.java @@ -74,11 +74,13 @@ void emptyStreamMessage() { .hasCause(cause); } - @CsvSource({ "1, true", "1, false", - "2, true", "2, false", - "3, true", "3, false", - "4, true", "4, false", - "100, true", "100, false" }) + @CsvSource({ + "1, true", "1, false", + "2, true", "2, false", + "3, true", "3, false", + "4, true", "4, false", + "100, true", "100, false" + }) @ParameterizedTest void closeOrAbortAndCollect(int size, boolean fixedStream) { Map data = newHttpData(size); @@ -193,19 +195,16 @@ protected HttpData filter(HttpData obj) { } }; - final List collected = filtered.collect(SubscriptionOption.WITH_POOLED_OBJECTS).join(); - assertThat(collected).hasSize(2); + assertThatThrownBy(() -> { + filtered.collect(SubscriptionOption.WITH_POOLED_OBJECTS).join(); + }).isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(CancelledSubscriptionException.class); final List bufs = ImmutableList.copyOf(data.values()); - assertThat(bufs.get(0).refCnt()).isOne(); - assertThat(bufs.get(1).refCnt()).isOne(); - assertThat(bufs.get(2).refCnt()).isZero(); - assertThat(bufs.get(3).refCnt()).isZero(); - assertThat(bufs.get(4).refCnt()).isZero(); - - bufs.get(0).release(); - bufs.get(1).release(); + for (ByteBuf buf : bufs) { + assertThat(buf.refCnt()).isZero(); + } } @Test diff --git a/resteasy/src/main/java/com/linecorp/armeria/internal/common/resteasy/HttpMessageSubscriberAdapter.java b/resteasy/src/main/java/com/linecorp/armeria/internal/common/resteasy/HttpMessageSubscriberAdapter.java index 416ad38ce92..a5a523e3d13 100644 --- a/resteasy/src/main/java/com/linecorp/armeria/internal/common/resteasy/HttpMessageSubscriberAdapter.java +++ b/resteasy/src/main/java/com/linecorp/armeria/internal/common/resteasy/HttpMessageSubscriberAdapter.java @@ -64,10 +64,9 @@ public void onNext(HttpObject httpObject) { if (dataLength > 0) { final long allowedDataLength = MAX_ALLOWED_DATA_LENGTH - contentLength; if (dataLength > allowedDataLength) { - //noinspection ConstantConditions - subscription.cancel(); onError(new IllegalStateException( "content length greater than " + MAX_ALLOWED_DATA_LENGTH)); + subscription.cancel(); return; } contentLength += dataLength; diff --git a/spring/boot3-webflux-autoconfigure/src/main/java/com/linecorp/armeria/spring/web/reactive/ChannelSendOperator.java b/spring/boot3-webflux-autoconfigure/src/main/java/com/linecorp/armeria/spring/web/reactive/ChannelSendOperator.java index 77ba4b461af..aa356037dec 100644 --- a/spring/boot3-webflux-autoconfigure/src/main/java/com/linecorp/armeria/spring/web/reactive/ChannelSendOperator.java +++ b/spring/boot3-webflux-autoconfigure/src/main/java/com/linecorp/armeria/spring/web/reactive/ChannelSendOperator.java @@ -201,10 +201,10 @@ public final void onNext(T item) { } result.subscribe(writeCompletionBarrier); } else { + writeCompletionBarrier.onError(new IllegalStateException("Unexpected item.")); if (subscription != null) { subscription.cancel(); } - writeCompletionBarrier.onError(new IllegalStateException("Unexpected item.")); } } } From 5d78935112710969ca8763ce9fdefae902485f8d Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Wed, 7 Aug 2024 11:40:32 +0900 Subject: [PATCH 4/4] Fix flakyness --- .../armeria/common/stream/StreamMessageCollectingTest.java | 2 ++ .../armeria/internal/logging/ContentPreviewingUtilTest.java | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/com/linecorp/armeria/common/stream/StreamMessageCollectingTest.java b/core/src/test/java/com/linecorp/armeria/common/stream/StreamMessageCollectingTest.java index c00e58c9071..e8290e70211 100644 --- a/core/src/test/java/com/linecorp/armeria/common/stream/StreamMessageCollectingTest.java +++ b/core/src/test/java/com/linecorp/armeria/common/stream/StreamMessageCollectingTest.java @@ -153,6 +153,8 @@ protected HttpData filter(HttpData obj) { if (count < 2) { return obj; } else { + // The ownership of `obj` belongs to this method. + obj.close(); return Exceptions.throwUnsafely(cause); } } diff --git a/core/src/test/java/com/linecorp/armeria/internal/logging/ContentPreviewingUtilTest.java b/core/src/test/java/com/linecorp/armeria/internal/logging/ContentPreviewingUtilTest.java index 42702ceb082..5312b2966a5 100644 --- a/core/src/test/java/com/linecorp/armeria/internal/logging/ContentPreviewingUtilTest.java +++ b/core/src/test/java/com/linecorp/armeria/internal/logging/ContentPreviewingUtilTest.java @@ -63,7 +63,6 @@ void abortedRequestShouldAlsoBeCompleted() { ctx.logBuilder().endRequest(); ctx.logBuilder().endResponse(); - ctx.logBuilder().ensureComplete(); final RequestLog log = ctx.log().whenComplete().join(); assertThat(log.requestContentPreview()).isEmpty();