-
Notifications
You must be signed in to change notification settings - Fork 928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix a leak in HttpEncodedResponse
#5858
Conversation
Motivation: An `HttpData` produced in `HttpEncodedResponse.beforeComplete()` is not collected by `CollectingSubscriberAndSubscription` and leaked. ```java Hint: {10B, pooled, <unknown>} com.linecorp.armeria.common.HttpData.wrap(HttpData.java:110) com.linecorp.armeria.server.encoding.HttpEncodedResponse.beforeComplete(HttpEncodedResponse.java:163) com.linecorp.armeria.common.stream.FilteredStreamMessage.lambda$collect$0(FilteredStreamMessage.java:201) java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) java.base/java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:950) java.base/java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2340) com.linecorp.armeria.common.stream.FilteredStreamMessage.collect(FilteredStreamMessage.java:142) ``` `CollectingSubscriberAndSubscription` was designed to only apply `filter()` of the `upstream.collect()`. I didn't consider that an object could be published via `onNext()` in `beforeComplete()`. `CollectingSubscriberAndSubscription` was added to provide an optimized code path for unary calls. it doesn't seem the code provides a trival performance improvement but the implemetation is complex and error-prone. I was able to fix the code not to leak the data but I didn't want to additional complexity to it. It might be cleaner to use the Reactive Streams API instead of keeping the custom `collect()` implemetation. There will be no change in performance for normal message sizes. Modifications: - Remove the custom `collect()` implemtation in `FilteredStreamMessage`. Result: Fix a potential leak when sending compressed responses.
@@ -752,6 +754,11 @@ default CompletableFuture<List<T>> collect(EventExecutor executor, SubscriptionO | |||
requireNonNull(executor, "executor"); | |||
requireNonNull(options, "options"); | |||
final StreamMessageCollector<T> collector = new StreamMessageCollector<>(options); | |||
if (!containsNotifyCancellation(options)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before setting this PR as ready for review, it would be easier to review if you explain why this change is needed in the PR description
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamMessageCollector
collects the published elements until onComplete()
or onError()
is called. It generally worked but I found a corner case where Subscription.cancel()
made collector.collect()
incomplete forever.
You can reproduce it by disabling this block and then running it below.
armeria/core/src/test/java/com/linecorp/armeria/common/stream/StreamMessageCollectingTest.java
Line 168 in a870edf
void filteredStreamMessage_cancel() { |
I believe it would make more sense to specify NOTIFY_CANCELLATION
for collect()
method because it is used to fully consume the upstream data instead of partial ones.
CancelledSubscriptionException
will make the collecting future completed exceptionally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For context, previously, the custom collect()
implementation in FilteredStreamMessage
hooked a Subscription.cancel()
event and completed the collecting future with the accumulated items so far.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I understood the issue now. Agree with the approach 👍
onError(ex); | ||
upstream.cancel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in general, do we always cancel the upstream before propagating the error downstream now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we cancel the upstream first, cancel()
may recursively call downstream.onError(CancelledSubscriptionException)
which could prevent the error from being propagated to downstream
.
So I think onError(ex)
should be invoked first to propagate the cause to the downstream that will be eventually passed to ServerErrorHandler
and RequestLog.*Cause()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this change is unrelated to the leak, am I understanding correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I found the bug while fixing broken tests after removing CollectingSubscriberAndSubscription
and related code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other changes look good, left a question about the early return in onComplete
@@ -752,6 +754,11 @@ default CompletableFuture<List<T>> collect(EventExecutor executor, SubscriptionO | |||
requireNonNull(executor, "executor"); | |||
requireNonNull(options, "options"); | |||
final StreamMessageCollector<T> collector = new StreamMessageCollector<>(options); | |||
if (!containsNotifyCancellation(options)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I understood the issue now. Agree with the approach 👍
onError(ex); | ||
upstream.cancel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this change is unrelated to the leak, am I understanding correctly?
if (completed) { | ||
// onError(Throwable) or onComplete() has been called in filter(). | ||
StreamMessageUtil.closeOrAbort(filtered); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't understånd this change 😅 Shouldn't the last filtered object be also passed downstream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not publish items if after calling onError()
or onComplete()`.
https://github.com/reactive-streams/reactive-streams-jvm#1.7
Otherwise, did you mean something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I imagined FilteredStreamMessage#beforeComplete
publishing an item via:
armeria/core/src/main/java/com/linecorp/armeria/common/stream/FilteredStreamMessage.java
Lines 268 to 270 in 6fae642
completed = true; | |
try { | |
beforeComplete(delegate); |
where completed=true
is set but onNext
is still called in the following scenario:
armeria/core/src/main/java/com/linecorp/armeria/server/encoding/HttpEncodedResponse.java
Line 163 in 6fae642
subscriber.onNext(HttpData.wrap(buf)); |
I thought this was the scenario this PR was trying to address
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed this to pass StreamMessageCollectingTest.filteredStreamMessage_cancel()
.
armeria/core/src/test/java/com/linecorp/armeria/common/stream/StreamMessageCollectingTest.java
Lines 185 to 190 in c7aca10
protected HttpData filter(HttpData obj) { | |
count++; | |
if (count < 3) { | |
return obj; | |
} else { | |
subscription.cancel(); |
subscription.cancel()
made upstream signal subscriber.onError(CancelledSubscriptionException)
. Afterward, subscribe.onNext()
was called.
It's impossible to know what all subclasses do. So we may need to prevent onNext()
from being called by beforeComplete()
or beforeOnError()
.
@Override
public void onNext(T o) {
if (complete) {
return;
}
U filtered;
try {
filtered = filter(o);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way, only the Subscription
API of FilteringSubscriber
is exposed to the subclasses. It seems FilteringSubscriber.onNext()
couldn't be called by beforeCompete()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way, only the Subscription API of FilteringSubscriber is exposed to the subclasses.
I just realized that the downstream Subscriber#onNext
is called instead of FilteringSubscriber#onNext
. I guess the current implementation should be fine then 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not nice that a Subscriber
is exposed through beforeComplete(). We may need to redesign the API later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 👍 👍
@@ -298,17 +219,21 @@ public void onNext(T o) { | |||
try { | |||
filtered = filter(o); | |||
} catch (Throwable ex) { | |||
StreamMessageUtil.closeOrAbort(o); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to call StreamMessageUtil.closeOrAbort(filtered);
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't filtered
null when we reach here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For context, I found a case where o
was double-released when a maximum length was exceeded. Calling StreamMessageUtil.closeOrAbort(o)
didn't make sense since the ownership has been transferred to filter()
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct. 👍 The previous logic was wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 👍 👍
Motivation:
An
HttpData
produced inHttpEncodedResponse.beforeComplete()
is not collected byCollectingSubscriberAndSubscription
but is leaked.CollectingSubscriberAndSubscription
was designed to only applyfilter()
to theupstream.collect()
. I didn't consider that an object could be published viaonNext()
inbeforeComplete()
. The purpose ofCollectingSubscriberAndSubscription
was to provide an optimized code path for unary calls. it didn't seem the code provides a trivial performance improvement but the implementation was complex and error-prone.I was able to fix the code not to leak the data but I didn't want to additional complexity to it. It might be cleaner to use the Reactive Streams API instead of keeping the custom
collect()
implementation. There will be no change in performance for normal message sizes.Modifications:
collect()
implemtation inFilteredStreamMessage
.Result:
Fix a potential leak when sending compressed responses.