From 5e8111c2a829c3fd3653a8e1f8aba0377e06de7a Mon Sep 17 00:00:00 2001 From: jrhee17 Date: Mon, 9 Dec 2024 18:25:19 +0900 Subject: [PATCH] minimal impl --- .../client/ClientRequestContextExtension.java | 2 + .../client/DefaultClientRequestContext.java | 7 +- .../common/CancellationScheduler.java | 6 ++ .../common/DefaultCancellationScheduler.java | 16 ++- .../common/NoopCancellationScheduler.java | 5 + .../retry/ResponseTimeoutFromStartTest.java | 101 ++++++++++++++++++ .../client/grpc/ArmeriaClientCall.java | 2 +- 7 files changed, 136 insertions(+), 3 deletions(-) create mode 100644 core/src/test/java/com/linecorp/armeria/client/retry/ResponseTimeoutFromStartTest.java diff --git a/core/src/main/java/com/linecorp/armeria/internal/client/ClientRequestContextExtension.java b/core/src/main/java/com/linecorp/armeria/internal/client/ClientRequestContextExtension.java index a818d4c8994..26a9081031e 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/client/ClientRequestContextExtension.java +++ b/core/src/main/java/com/linecorp/armeria/internal/client/ClientRequestContextExtension.java @@ -73,4 +73,6 @@ public interface ClientRequestContextExtension extends ClientRequestContext, Req * with default values on every request. */ HttpHeaders internalRequestHeaders(); + + long remainingTimeoutNanos(); } diff --git a/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java b/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java index 6af10aad221..ed3fbfc662a 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java +++ b/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java @@ -541,7 +541,7 @@ private DefaultClientRequestContext(DefaultClientRequestContext ctx, // Cancel the original timeout and create a new scheduler for the derived context. ctx.responseCancellationScheduler.cancelScheduled(); responseCancellationScheduler = - CancellationScheduler.ofClient(TimeUnit.MILLISECONDS.toNanos(ctx.responseTimeoutMillis())); + CancellationScheduler.ofClient(ctx.remainingTimeoutNanos()); writeTimeoutMillis = ctx.writeTimeoutMillis(); maxResponseLength = ctx.maxResponseLength(); @@ -898,6 +898,11 @@ public HttpHeaders internalRequestHeaders() { return internalRequestHeaders; } + @Override + public long remainingTimeoutNanos() { + return responseCancellationScheduler().remainingTimeoutNanos(); + } + @Override public void setAdditionalRequestHeader(CharSequence name, Object value) { requireNonNull(name, "name"); diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/CancellationScheduler.java b/core/src/main/java/com/linecorp/armeria/internal/common/CancellationScheduler.java index a5b91af0950..7f5ff402ee6 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/CancellationScheduler.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/CancellationScheduler.java @@ -108,6 +108,12 @@ default void finishNow() { */ long timeoutNanos(); + /** + * Before the scheduler has started, the configured timeout will be returned regardless of the + * {@link TimeoutMode}. If the scheduler has already started, the remaining time will be returned. + */ + long remainingTimeoutNanos(); + long startTimeNanos(); CompletableFuture whenCancelling(); diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/DefaultCancellationScheduler.java b/core/src/main/java/com/linecorp/armeria/internal/common/DefaultCancellationScheduler.java index 698aa7e523f..d8088a004ca 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/DefaultCancellationScheduler.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/DefaultCancellationScheduler.java @@ -112,12 +112,14 @@ public void start() { if (state != State.INIT) { return; } - state = State.SCHEDULED; startTimeNanos = ticker.read(); if (timeoutMode == TimeoutMode.SET_FROM_NOW) { final long elapsedTimeNanos = startTimeNanos - setFromNowStartNanos; timeoutNanos = Long.max(LongMath.saturatedSubtract(timeoutNanos, elapsedTimeNanos), 0); } + + // set the state after all timeout related fields are updated + state = State.SCHEDULED; if (timeoutNanos != Long.MAX_VALUE) { scheduledFuture = eventLoop().schedule(() -> invokeTask(null), timeoutNanos, NANOSECONDS); } @@ -292,6 +294,18 @@ public long timeoutNanos() { return timeoutNanos == Long.MAX_VALUE ? 0 : timeoutNanos; } + @Override + public long remainingTimeoutNanos() { + if (timeoutNanos == Long.MAX_VALUE) { + return 0; + } + if (!isStarted()) { + return timeoutNanos; + } + final long elapsed = ticker.read() - startTimeNanos; + return Math.max(1, LongMath.saturatedSubtract(timeoutNanos, elapsed)); + } + @Override public long startTimeNanos() { return startTimeNanos; diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/NoopCancellationScheduler.java b/core/src/main/java/com/linecorp/armeria/internal/common/NoopCancellationScheduler.java index c6f6ac71b83..4bd6e94ffc9 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/NoopCancellationScheduler.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/NoopCancellationScheduler.java @@ -90,6 +90,11 @@ public long timeoutNanos() { return 0; } + @Override + public long remainingTimeoutNanos() { + return 0; + } + @Override public long startTimeNanos() { return 0; diff --git a/core/src/test/java/com/linecorp/armeria/client/retry/ResponseTimeoutFromStartTest.java b/core/src/test/java/com/linecorp/armeria/client/retry/ResponseTimeoutFromStartTest.java new file mode 100644 index 00000000000..55426d6fb3a --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/client/retry/ResponseTimeoutFromStartTest.java @@ -0,0 +1,101 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retry; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; + +import java.time.Duration; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; + +import org.assertj.core.data.Percentage; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.linecorp.armeria.client.ResponseTimeoutException; +import com.linecorp.armeria.client.ResponseTimeoutMode; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.QueryParams; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; + +class ResponseTimeoutFromStartTest { + + private static final Logger logger = LoggerFactory.getLogger(ResponseTimeoutFromStartTest.class); + + @RegisterExtension + static ServerExtension server = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) throws Exception { + sb.service("/", (ctx, req) -> { + final String delayMillisStr = ctx.queryParam("delayMillis"); + assertThat(delayMillisStr).isNotNull(); + final int delayMillis = Integer.parseInt(delayMillisStr); + return HttpResponse.delayed(HttpResponse.of(500), Duration.ofMillis(delayMillis)); + }); + } + }; + + @ParameterizedTest + @CsvSource({ + "0,2500,2000", + "0,1750,2000", + "5000,1500,2000", + }) + void originalResponseTimeoutRespected(long backoffMillis, long attemptMillis, long delayMillis) { + final long timeoutSeconds = 3; + final WebClient webClient = + WebClient.builder(server.httpUri()) + .responseTimeout(Duration.ofSeconds(timeoutSeconds)) + .responseTimeoutMode(ResponseTimeoutMode.FROM_START) + .decorator((delegate, ctx, req) -> { + logger.info("ctx.responseTimeoutMillis: {}", ctx.responseTimeoutMillis()); + return delegate.execute(ctx, req); + }) + .decorator( + RetryingClient.builder(RetryRule.builder() + .onException() + .onServerErrorStatus() + .thenBackoff(Backoff.fixed(backoffMillis))) + .responseTimeoutForEachAttempt(Duration.ofMillis(attemptMillis)) + .maxTotalAttempts(Integer.MAX_VALUE) + .newDecorator()) + .build(); + + final long prev = System.nanoTime(); + final Throwable throwable = catchThrowable( + () -> webClient.get("/", QueryParams.of("delayMillis", delayMillis)).aggregate().join()); + assertThat(throwable) + .isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(ResponseTimeoutException.class); + logger.debug("elapsed time is: {}ms", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - prev)); + + if (backoffMillis > 0) { + assertThat(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - prev)) + .isLessThan(TimeUnit.SECONDS.toMillis(timeoutSeconds)); + } else { + + assertThat(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - prev)) + .isCloseTo(TimeUnit.SECONDS.toMillis(timeoutSeconds), Percentage.withPercentage(10)); + } + } +} diff --git a/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/ArmeriaClientCall.java b/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/ArmeriaClientCall.java index e996aa012b8..d00c4d572cc 100644 --- a/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/ArmeriaClientCall.java +++ b/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/ArmeriaClientCall.java @@ -241,7 +241,7 @@ public void start(Listener responseListener, Metadata metadata) { ctx.setResponseTimeout(TimeoutMode.SET_FROM_NOW, Duration.ofNanos(remainingNanos)); } } else { - remainingNanos = MILLISECONDS.toNanos(ctx.responseTimeoutMillis()); + remainingNanos = ctx.remainingTimeoutNanos(); } // Must come after handling deadline.