Skip to content

Commit

Permalink
fix(http interceptor context): clear http interceptor context on resp…
Browse files Browse the repository at this point in the history
…onse body cancel and end
  • Loading branch information
kenluluuuluuuuu committed Apr 23, 2024
1 parent 81c406b commit 3bccc6f
Showing 1 changed file with 37 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,41 +247,47 @@ private State onLegitimateRequest(LiveHttpRequest request, ChannelHandlerContext

HttpInterceptorContext context = new HttpInterceptorContext(secure, remoteAddress(ctx), ctx.executor(), timers);
Eventual<LiveHttpResponse> responseEventual = httpPipeline.handle(v11Request, context);
responseEventual.subscribe(new BaseSubscriber<>() {
@Override
public void hookOnSubscribe(Subscription s) {
subscription = s;
s.request(1);
}

@Override
public void hookOnComplete() {
eventProcessor.submit(new ResponseObservableCompletedEvent(ctx, request.id()));
}

@Override
public void hookOnError(Throwable cause) {
eventProcessor.submit(new ResponseObservableErrorEvent(ctx, cause, request.id()));
}
responseEventual.map(response ->
response.newBuilder().body(body ->
body.doOnCancel(() -> {
timers.stopTiming(RESPONSE_PROCESSING);
timers.stopTiming(REQUEST_PROCESSING);
context.clear();
})
.doOnEnd(throwable -> {
timers.stopTiming(RESPONSE_PROCESSING);
timers.stopTiming(REQUEST_PROCESSING);
context.clear();
})
).build()
)
.subscribe(new BaseSubscriber<>() {
@Override
public void hookOnSubscribe(Subscription s) {
subscription = s;
s.request(1);
}

@Override
public void hookOnNext(LiveHttpResponse response) {
eventProcessor.submit(new ResponseReceivedEvent(response, ctx));
}
@Override
public void hookOnComplete() {
eventProcessor.submit(new ResponseObservableCompletedEvent(ctx, request.id()));
}

@Override
protected void hookOnCancel() {
eventProcessor.submit(new ResponseObservableCancelledEvent(ctx, new ResponseCancelledException(), request.id()));
}
@Override
public void hookOnError(Throwable cause) {
eventProcessor.submit(new ResponseObservableErrorEvent(ctx, cause, request.id()));
}

@Override
protected void hookFinally(SignalType type) {
timers.stopTiming(RESPONSE_PROCESSING);
timers.stopTiming(REQUEST_PROCESSING);
@Override
public void hookOnNext(LiveHttpResponse response) {
eventProcessor.submit(new ResponseReceivedEvent(response, ctx));
}

context.clear();
}
});
@Override
protected void hookOnCancel() {
eventProcessor.submit(new ResponseObservableCancelledEvent(ctx, new ResponseCancelledException(), request.id()));
}
});

return WAITING_FOR_RESPONSE;
} catch (Throwable cause) {
Expand Down

0 comments on commit 3bccc6f

Please sign in to comment.