From 1983bed728269cd503347b10d2265b145c701cd9 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 27 Jan 2025 13:02:01 +0000 Subject: [PATCH 01/10] Added retry config to common module --- .../tech/ydb/common/retry/ErrorPolicy.java | 30 --- .../common/retry/ExponentialBackoffRetry.java | 10 +- .../tech/ydb/common/retry/RetryConfig.java | 98 +++++++++ .../ydb/common/retry/YdbRetryBuilder.java | 69 ++++++ .../tech/ydb/common/retry/YdbRetryConfig.java | 62 ++++++ .../ydb/common/retry/RetryConfigTest.java | 198 ++++++++++++++++++ .../ydb/common/retry/RetryPoliciesTest.java | 20 +- .../impl/YdbTransactionImplTest.java | 35 ++++ 8 files changed, 486 insertions(+), 36 deletions(-) delete mode 100644 common/src/main/java/tech/ydb/common/retry/ErrorPolicy.java create mode 100644 common/src/main/java/tech/ydb/common/retry/RetryConfig.java create mode 100644 common/src/main/java/tech/ydb/common/retry/YdbRetryBuilder.java create mode 100644 common/src/main/java/tech/ydb/common/retry/YdbRetryConfig.java create mode 100644 common/src/test/java/tech/ydb/common/retry/RetryConfigTest.java create mode 100644 common/src/test/java/tech/ydb/common/transaction/impl/YdbTransactionImplTest.java diff --git a/common/src/main/java/tech/ydb/common/retry/ErrorPolicy.java b/common/src/main/java/tech/ydb/common/retry/ErrorPolicy.java deleted file mode 100644 index daaad9a97..000000000 --- a/common/src/main/java/tech/ydb/common/retry/ErrorPolicy.java +++ /dev/null @@ -1,30 +0,0 @@ -package tech.ydb.common.retry; - -/** - * Recipes should use the configured error policy to decide how to retry - * errors like unsuccessful {@link tech.ydb.core.StatusCode}. - * - * @author Aleksandr Gorshenin - * @param Type of errors to check - */ -public interface ErrorPolicy { - - /** - * Returns true if the given value should be retried - * - * @param value value to check - * @return true if value is retryable - */ - boolean isRetryable(T value); - - /** - * Returns true if the given exception should be retried - * Usually exceptions are never retried, but some policies can implement more difficult logic - * - * @param ex exception to check - * @return true if exception is retryable - */ - default boolean isRetryable(Exception ex) { - return false; - } -} diff --git a/common/src/main/java/tech/ydb/common/retry/ExponentialBackoffRetry.java b/common/src/main/java/tech/ydb/common/retry/ExponentialBackoffRetry.java index e6d14ba47..392783efd 100644 --- a/common/src/main/java/tech/ydb/common/retry/ExponentialBackoffRetry.java +++ b/common/src/main/java/tech/ydb/common/retry/ExponentialBackoffRetry.java @@ -7,11 +7,11 @@ * * @author Aleksandr Gorshenin */ -public abstract class ExponentialBackoffRetry implements RetryPolicy { +public class ExponentialBackoffRetry implements RetryPolicy { private final long backoffMs; private final int backoffCeiling; - protected ExponentialBackoffRetry(long backoffMs, int backoffCeiling) { + public ExponentialBackoffRetry(long backoffMs, int backoffCeiling) { this.backoffMs = backoffMs; this.backoffCeiling = backoffCeiling; } @@ -22,6 +22,11 @@ protected long backoffTimeMillis(int retryNumber) { return delay + ThreadLocalRandom.current().nextLong(delay); } + @Override + public long nextRetryMs(int retryCount, long elapsedTimeMs) { + return backoffTimeMillis(retryCount); + } + /** * Return current base of backoff delays * @return backoff base duration in milliseconds @@ -37,4 +42,5 @@ public long getBackoffMillis() { public int getBackoffCeiling() { return backoffCeiling; } + } diff --git a/common/src/main/java/tech/ydb/common/retry/RetryConfig.java b/common/src/main/java/tech/ydb/common/retry/RetryConfig.java new file mode 100644 index 000000000..f3c3fbd15 --- /dev/null +++ b/common/src/main/java/tech/ydb/common/retry/RetryConfig.java @@ -0,0 +1,98 @@ +package tech.ydb.common.retry; + +import tech.ydb.core.StatusCode; +import tech.ydb.core.UnexpectedResultException; + +/** + * Recipes should use the retry configuration to decide how to retry + * errors like unsuccessful {@link tech.ydb.core.StatusCode}. + * + * @author Aleksandr Gorshenin + */ +@FunctionalInterface +public interface RetryConfig { + + /** + * Returns retry policy for the given status code and {@code null} if that status code is not retryable + * + * @param code status code to check + * @return policy of retries or {@code null} if the status code is not retryable + */ + RetryPolicy isStatusRetryable(StatusCode code); + + /** + * Returns retry policy for the given exception and {@code null} if that exception is not retryable + * + * @param th exception to check + * @return policy of retries or {@code null} if the exception is not retryable + */ + default RetryPolicy isThrowableRetryable(Throwable th) { + for (Throwable ex = th; ex != null; ex = ex.getCause()) { + if (ex instanceof UnexpectedResultException) { + return isStatusRetryable(((UnexpectedResultException) ex).getStatus().getCode()); + } + } + return null; + } + + /** + * Retries a non idempotent operation forever with default exponential delay + * @return retry configuration object + */ + static RetryConfig retryForever() { + return newConfig().retryForever(); + } + + /** + * Retries a non idempotent operation with default exponential until the specified elapsed milliseconds expire + * @param maxElapsedMs maximum timeout for retries + * @return retry configuration object + */ + static RetryConfig retryUntilElapsed(long maxElapsedMs) { + return newConfig().retryUntilElapsed(maxElapsedMs); + } + + /** + * Retries an idempotent operation forever with default exponential delay + * @return retry configuration object + */ + static RetryConfig idempotentRetryForever() { + return newConfig().retryIdempotent(true).retryForever(); + } + + /** + * Retries an idempotent operation with default exponential until the specified elapsed milliseconds expire + * @param maxElapsedMs maximum timeout for retries + * @return retry configuration object + */ + static RetryConfig idempotentRetryUntilElapsed(long maxElapsedMs) { + return newConfig().retryIdempotent(true).retryUntilElapsed(maxElapsedMs); + } + + /** + * Disabled retries configuration. Any error is considered as non retryable + * @return retry configuration object + */ + static RetryConfig noRetries() { + return (StatusCode code) -> null; + } + + /** + * Create a new custom configuration of retries + * @return retry configuration builder + */ + static Builder newConfig() { + return new YdbRetryBuilder(); + } + + interface Builder { + Builder retryIdempotent(boolean retry); + Builder retryNotFound(boolean retry); + Builder withSlowBackoff(long backoff, int ceiling); + Builder withFastBackoff(long backoff, int ceiling); + + RetryConfig retryForever(); + RetryConfig retryNTimes(int maxRetries); + RetryConfig retryUntilElapsed(long maxElapsedMs); + } +} diff --git a/common/src/main/java/tech/ydb/common/retry/YdbRetryBuilder.java b/common/src/main/java/tech/ydb/common/retry/YdbRetryBuilder.java new file mode 100644 index 000000000..2ea0f4c97 --- /dev/null +++ b/common/src/main/java/tech/ydb/common/retry/YdbRetryBuilder.java @@ -0,0 +1,69 @@ +package tech.ydb.common.retry; + +/** + * + * @author Aleksandr Gorshenin + */ +class YdbRetryBuilder implements RetryConfig.Builder { + private boolean idempotent = false; + private boolean retryNotFound = false; + + private long fastBackoff = 5; + private int fastCeiling = 10; + + private long slowBackoff = 500; + private int slowCeiling = 6; + + @Override + public YdbRetryBuilder retryIdempotent(boolean retry) { + this.idempotent = retry; + return this; + } + + @Override + public YdbRetryBuilder retryNotFound(boolean retry) { + this.retryNotFound = retry; + return this; + } + + @Override + public YdbRetryBuilder withSlowBackoff(long backoff, int ceiling) { + this.slowBackoff = backoff; + this.slowCeiling = ceiling; + return this; + } + + @Override + public YdbRetryBuilder withFastBackoff(long backoff, int ceiling) { + this.fastBackoff = backoff; + this.fastCeiling = ceiling; + return this; + } + + @Override + public RetryConfig retryForever() { + return new YdbRetryConfig(idempotent, retryNotFound, + (int retryCount, long elapsedTimeMs) -> 0, + new ExponentialBackoffRetry(fastBackoff, fastCeiling), + new ExponentialBackoffRetry(slowBackoff, slowCeiling) + ); + } + + @Override + public RetryConfig retryNTimes(int maxRetries) { + return new YdbRetryConfig(idempotent, retryNotFound, + (int retryCount, long elapsedTimeMs) -> retryCount >= maxRetries ? -1 : 0, + new RetryNTimes(maxRetries, fastBackoff, fastCeiling), + new RetryNTimes(maxRetries, slowBackoff, slowCeiling) + ); + } + + @Override + public RetryConfig retryUntilElapsed(long maxElapsedMs) { + return new YdbRetryConfig(idempotent, retryNotFound, + (int retryCount, long elapsedTimeMs) -> elapsedTimeMs > maxElapsedMs ? -1 : 0, + new RetryUntilElapsed(maxElapsedMs, fastBackoff, fastCeiling), + new RetryUntilElapsed(maxElapsedMs, slowBackoff, slowCeiling) + ); + } +} diff --git a/common/src/main/java/tech/ydb/common/retry/YdbRetryConfig.java b/common/src/main/java/tech/ydb/common/retry/YdbRetryConfig.java new file mode 100644 index 000000000..a6600f4e4 --- /dev/null +++ b/common/src/main/java/tech/ydb/common/retry/YdbRetryConfig.java @@ -0,0 +1,62 @@ +package tech.ydb.common.retry; + +import tech.ydb.core.StatusCode; + +/** + * + * @author Aleksandr Gorshenin + */ +class YdbRetryConfig implements RetryConfig { + private final boolean idempotent; + private final boolean retryNotFound; + private final RetryPolicy immediatelly; + private final RetryPolicy fast; + private final RetryPolicy slow; + + YdbRetryConfig(boolean idempotent, boolean notFound, RetryPolicy immediatelly, RetryPolicy fast, RetryPolicy slow) { + this.idempotent = idempotent; + this.retryNotFound = notFound; + this.immediatelly = immediatelly; + this.fast = fast; + this.slow = slow; + } + + @Override + public RetryPolicy isStatusRetryable(StatusCode code) { + if (code == null) { + return null; + } + + switch (code) { + // Instant retry + case BAD_SESSION: + case SESSION_BUSY: + return immediatelly; + + // Fast backoff + case ABORTED: + case UNDETERMINED: + return fast; + + // Slow backoff + case OVERLOADED: + case CLIENT_RESOURCE_EXHAUSTED: + return slow; + + // Conditionally retry + case CLIENT_CANCELLED: + case CLIENT_INTERNAL_ERROR: + case TRANSPORT_UNAVAILABLE: + case UNAVAILABLE: + return idempotent ? fast : null; + + // Not found retry + case NOT_FOUND: + return retryNotFound ? fast : null; + + // All other codes are not retryable + default: + return null; + } + } +} diff --git a/common/src/test/java/tech/ydb/common/retry/RetryConfigTest.java b/common/src/test/java/tech/ydb/common/retry/RetryConfigTest.java new file mode 100644 index 000000000..e8f132830 --- /dev/null +++ b/common/src/test/java/tech/ydb/common/retry/RetryConfigTest.java @@ -0,0 +1,198 @@ +package tech.ydb.common.retry; + +import org.junit.Assert; +import org.junit.Test; + +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.UnexpectedResultException; + +/** + * + * @author Aleksandr Gorshenin + */ +public class RetryConfigTest { + private void assertDuration(long from, long to, long ms) { + Assert.assertTrue("time " + ms + " must be great than " + from, from <= ms); + Assert.assertTrue("time " + ms + " must be lower than " + to, to >= ms); + } + + @Test + public void nullStatusesTest() { + RetryConfig config = RetryConfig.retryForever(); + + Assert.assertNull(config.isThrowableRetryable(null)); + Assert.assertNull(config.isStatusRetryable(null)); + } + + @Test + public void throwableRetriesTest() { + RetryConfig config = RetryConfig.retryUntilElapsed(1000); + + Assert.assertNull(config.isThrowableRetryable(new RuntimeException("test message"))); + Assert.assertNull(config.isThrowableRetryable(new Exception("1", new RuntimeException("2")))); + + RetryPolicy immediatelly = config.isStatusRetryable(StatusCode.BAD_SESSION); + RetryPolicy fast = config.isStatusRetryable(StatusCode.NOT_FOUND); + + Assert.assertEquals(immediatelly, config.isThrowableRetryable( + new UnexpectedResultException("base", Status.of(StatusCode.BAD_SESSION))) + ); + Assert.assertEquals(immediatelly, config.isThrowableRetryable(new Exception("base", + new UnexpectedResultException("cause", Status.of(StatusCode.SESSION_BUSY))) + )); + Assert.assertEquals(fast, config.isThrowableRetryable(new Exception("base", + new UnexpectedResultException("cause", Status.of(StatusCode.NOT_FOUND))) + )); + } + + @Test + public void noRetryPolicyTest() { + RetryConfig config = RetryConfig.noRetries(); + // unretrayable + for (StatusCode code: StatusCode.values()) { + Assert.assertNull(config.isStatusRetryable(code)); + } + } + + @Test + public void nonIdempotentRetryPolicyTest() { + RetryConfig config = RetryConfig.retryForever(); + + // unretrayable + Assert.assertNull(config.isStatusRetryable(StatusCode.SCHEME_ERROR)); + Assert.assertNull(config.isStatusRetryable(StatusCode.ALREADY_EXISTS)); + Assert.assertNull(config.isStatusRetryable(StatusCode.UNAUTHORIZED)); + Assert.assertNull(config.isStatusRetryable(StatusCode.UNAVAILABLE)); + Assert.assertNull(config.isStatusRetryable(StatusCode.TRANSPORT_UNAVAILABLE)); + Assert.assertNull(config.isStatusRetryable(StatusCode.CLIENT_CANCELLED)); + Assert.assertNull(config.isStatusRetryable(StatusCode.CLIENT_INTERNAL_ERROR)); + Assert.assertNull(config.isStatusRetryable(StatusCode.NOT_FOUND)); + + RetryPolicy immediatelly = config.isStatusRetryable(StatusCode.BAD_SESSION); + Assert.assertNotNull(immediatelly); + Assert.assertEquals(immediatelly, config.isStatusRetryable(StatusCode.SESSION_BUSY)); + + RetryPolicy fast = config.isStatusRetryable(StatusCode.ABORTED); + Assert.assertNotNull(fast); + Assert.assertEquals(fast, config.isStatusRetryable(StatusCode.UNDETERMINED)); + + RetryPolicy slow = config.isStatusRetryable(StatusCode.OVERLOADED); + Assert.assertNotNull(slow); + Assert.assertEquals(slow, config.isStatusRetryable(StatusCode.CLIENT_RESOURCE_EXHAUSTED)); + } + + @Test + public void idempotentRetryPolicyTest() { + RetryConfig config = RetryConfig.idempotentRetryForever(); + + // unretrayable + Assert.assertNull(config.isStatusRetryable(StatusCode.SCHEME_ERROR)); + Assert.assertNull(config.isStatusRetryable(StatusCode.ALREADY_EXISTS)); + Assert.assertNull(config.isStatusRetryable(StatusCode.UNAUTHORIZED)); + Assert.assertNull(config.isStatusRetryable(StatusCode.NOT_FOUND)); + + RetryPolicy immediatelly = config.isStatusRetryable(StatusCode.BAD_SESSION); + Assert.assertNotNull(immediatelly); + Assert.assertEquals(immediatelly, config.isStatusRetryable(StatusCode.SESSION_BUSY)); + + RetryPolicy fast = config.isStatusRetryable(StatusCode.ABORTED); + Assert.assertNotNull(fast); + Assert.assertEquals(fast, config.isStatusRetryable(StatusCode.UNDETERMINED)); + Assert.assertEquals(fast, config.isStatusRetryable(StatusCode.UNAVAILABLE)); + Assert.assertEquals(fast, config.isStatusRetryable(StatusCode.TRANSPORT_UNAVAILABLE)); + Assert.assertEquals(fast, config.isStatusRetryable(StatusCode.CLIENT_CANCELLED)); + Assert.assertEquals(fast, config.isStatusRetryable(StatusCode.CLIENT_INTERNAL_ERROR)); + + RetryPolicy slow = config.isStatusRetryable(StatusCode.OVERLOADED); + Assert.assertNotNull(slow); + Assert.assertEquals(slow, config.isStatusRetryable(StatusCode.CLIENT_RESOURCE_EXHAUSTED)); + } + + @Test + public void notFoundRetryPolicyTest() { + RetryConfig config = RetryConfig.newConfig().retryNotFound(true).retryForever(); + + RetryPolicy fast = config.isStatusRetryable(StatusCode.ABORTED); + Assert.assertEquals(fast, config.isStatusRetryable(StatusCode.NOT_FOUND)); + } + + @Test + public void foreverRetryTest() { + RetryConfig config = RetryConfig.newConfig().withSlowBackoff(100, 5).withFastBackoff(10, 10).retryForever(); + + RetryPolicy immediatelly = config.isStatusRetryable(StatusCode.BAD_SESSION); + Assert.assertEquals(0, immediatelly.nextRetryMs(0, 0)); + Assert.assertEquals(0, immediatelly.nextRetryMs(0, Integer.MAX_VALUE)); + Assert.assertEquals(0, immediatelly.nextRetryMs(Integer.MAX_VALUE, 0)); + Assert.assertEquals(0, immediatelly.nextRetryMs(Integer.MAX_VALUE, Integer.MAX_VALUE)); + + RetryPolicy fast = config.isStatusRetryable(StatusCode.ABORTED); + assertDuration(10, 20, fast.nextRetryMs(0, 0)); + assertDuration(10, 20, fast.nextRetryMs(0, Integer.MAX_VALUE)); + assertDuration(10240, 20480, fast.nextRetryMs(Integer.MAX_VALUE, 0)); + assertDuration(10240, 20480, fast.nextRetryMs(Integer.MAX_VALUE, Integer.MAX_VALUE)); + + RetryPolicy slow = config.isStatusRetryable(StatusCode.OVERLOADED); + assertDuration(100, 200, slow.nextRetryMs(0, 0)); + assertDuration(100, 200, slow.nextRetryMs(0, Integer.MAX_VALUE)); + assertDuration(3200, 6400, slow.nextRetryMs(Integer.MAX_VALUE, 0)); + assertDuration(3200, 6400, slow.nextRetryMs(Integer.MAX_VALUE, Integer.MAX_VALUE)); + } + + @Test + public void untilElapsedRetryTest() { + RetryConfig config = RetryConfig.idempotentRetryUntilElapsed(5000); + + RetryPolicy immediatelly = config.isStatusRetryable(StatusCode.BAD_SESSION); + Assert.assertEquals(0, immediatelly.nextRetryMs(0, 0)); + Assert.assertEquals(0, immediatelly.nextRetryMs(0, 5000)); + Assert.assertEquals(0, immediatelly.nextRetryMs(Integer.MAX_VALUE, 0)); + Assert.assertEquals(0, immediatelly.nextRetryMs(Integer.MAX_VALUE, 5000)); + Assert.assertEquals(-1, immediatelly.nextRetryMs(0, 5001)); + Assert.assertEquals(-1, immediatelly.nextRetryMs(Integer.MAX_VALUE, 5001)); + + RetryPolicy fast = config.isStatusRetryable(StatusCode.ABORTED); + assertDuration(5, 10, fast.nextRetryMs(0, 0)); + Assert.assertEquals(3, fast.nextRetryMs(0, 4997)); + Assert.assertEquals(5000, fast.nextRetryMs(Integer.MAX_VALUE, 0)); + Assert.assertEquals(1, fast.nextRetryMs(Integer.MAX_VALUE, 4999)); + Assert.assertEquals(-1, fast.nextRetryMs(Integer.MAX_VALUE, 5000)); + + RetryPolicy slow = config.isStatusRetryable(StatusCode.OVERLOADED); + assertDuration(500, 1000, slow.nextRetryMs(0, 0)); + Assert.assertEquals(3, slow.nextRetryMs(0, 4997)); + Assert.assertEquals(5000, slow.nextRetryMs(Integer.MAX_VALUE, 0)); + Assert.assertEquals(1, slow.nextRetryMs(Integer.MAX_VALUE, 4999)); + Assert.assertEquals(-1, slow.nextRetryMs(Integer.MAX_VALUE, 5000)); + } + + @Test + public void nTimesRetryTest() { + RetryConfig config = RetryConfig.newConfig().retryNTimes(8); + + RetryPolicy immediatelly = config.isStatusRetryable(StatusCode.BAD_SESSION); + Assert.assertEquals(0, immediatelly.nextRetryMs(0, 0)); + Assert.assertEquals(0, immediatelly.nextRetryMs(0, Integer.MAX_VALUE)); + Assert.assertEquals(0, immediatelly.nextRetryMs(7, 0)); + Assert.assertEquals(0, immediatelly.nextRetryMs(7, Integer.MAX_VALUE)); + Assert.assertEquals(-1, immediatelly.nextRetryMs(8, 0)); + Assert.assertEquals(-1, immediatelly.nextRetryMs(8, Integer.MAX_VALUE)); + + RetryPolicy fast = config.isStatusRetryable(StatusCode.ABORTED); + assertDuration(5, 10, fast.nextRetryMs(0, 0)); + assertDuration(5, 10, fast.nextRetryMs(0, Integer.MAX_VALUE)); + assertDuration(5 * 128, 5 * 256, fast.nextRetryMs(7, 0)); + assertDuration(5 * 128, 5 * 256, fast.nextRetryMs(7, Integer.MAX_VALUE)); + Assert.assertEquals(-1, fast.nextRetryMs(8, 0)); + Assert.assertEquals(-1, fast.nextRetryMs(8, Integer.MAX_VALUE)); + + RetryPolicy slow = config.isStatusRetryable(StatusCode.OVERLOADED); + assertDuration(500, 1000, slow.nextRetryMs(0, 0)); + assertDuration(500, 1000, slow.nextRetryMs(0, Integer.MAX_VALUE)); + assertDuration(500 * 64, 500 * 128, slow.nextRetryMs(7, 0)); + assertDuration(500 * 64, 500 * 128, slow.nextRetryMs(7, Integer.MAX_VALUE)); + Assert.assertEquals(-1, slow.nextRetryMs(8, 0)); + Assert.assertEquals(-1, slow.nextRetryMs(8, Integer.MAX_VALUE)); + } +} diff --git a/common/src/test/java/tech/ydb/common/retry/RetryPoliciesTest.java b/common/src/test/java/tech/ydb/common/retry/RetryPoliciesTest.java index bf5baa0a8..cf127a259 100644 --- a/common/src/test/java/tech/ydb/common/retry/RetryPoliciesTest.java +++ b/common/src/test/java/tech/ydb/common/retry/RetryPoliciesTest.java @@ -1,9 +1,5 @@ package tech.ydb.common.retry; -import tech.ydb.common.retry.RetryNTimes; -import tech.ydb.common.retry.RetryUntilElapsed; -import tech.ydb.common.retry.RetryForever; - import org.junit.Assert; import org.junit.Test; @@ -99,6 +95,22 @@ public void untilElapsedTest() { Assert.assertEquals(-1, policy.nextRetryMs(7, 2500)); } + @Test + public void foreverElapsedTest() { + ExponentialBackoffRetry policy = new ExponentialBackoffRetry(50, 3); + + assertDuration(50, 100, policy.nextRetryMs(0, 0)); + assertDuration(50, 100, policy.nextRetryMs(0, Integer.MAX_VALUE)); + assertDuration(100, 200, policy.nextRetryMs(1, 75)); + assertDuration(200, 400, policy.nextRetryMs(2, 225)); + assertDuration(400, 800, policy.nextRetryMs(3, 525)); + assertDuration(400, 800, policy.nextRetryMs(4, 1125)); + assertDuration(400, 800, policy.nextRetryMs(5, 1725)); + assertDuration(400, 800, policy.nextRetryMs(Integer.MAX_VALUE, 0)); + assertDuration(400, 800, policy.nextRetryMs(Integer.MAX_VALUE, Integer.MAX_VALUE)); + } + + @Test public void updateElapsedTest() { RetryUntilElapsed policy = new RetryUntilElapsed(2500, 50, 3); diff --git a/common/src/test/java/tech/ydb/common/transaction/impl/YdbTransactionImplTest.java b/common/src/test/java/tech/ydb/common/transaction/impl/YdbTransactionImplTest.java new file mode 100644 index 000000000..9f25e0255 --- /dev/null +++ b/common/src/test/java/tech/ydb/common/transaction/impl/YdbTransactionImplTest.java @@ -0,0 +1,35 @@ +package tech.ydb.common.transaction.impl; + +import org.junit.Assert; +import org.junit.Test; + +import tech.ydb.common.transaction.TxMode; + +/** + * + * @author Aleksandr Gorshenin + */ +public class YdbTransactionImplTest { + private class MockTx extends YdbTransactionImpl { + + public MockTx(TxMode txMode, String txId) { + super(txMode, txId); + } + + @Override + public String getSessionId() { + return "MOCK"; + } + } + + @Test + public void baseTest() { + MockTx tx = new MockTx(TxMode.SNAPSHOT_RO, "test-id"); + + Assert.assertEquals("test-id", tx.getId()); + Assert.assertEquals("MOCK", tx.getSessionId()); + Assert.assertEquals(TxMode.SNAPSHOT_RO, tx.getTxMode()); + Assert.assertTrue(tx.isActive()); + Assert.assertFalse(tx.getStatusFuture().isDone()); + } +} From 173ad53d6531893e72bcdf08e1cd0d9b49f92093 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Thu, 23 Jan 2025 17:46:55 +0000 Subject: [PATCH 02/10] Added RetryConfig to configure retries behavior --- .../ydb/topic/impl/GrpcStreamRetrier.java | 12 ++--- .../tech/ydb/topic/read/impl/ReaderImpl.java | 2 +- .../ydb/topic/settings/ReaderSettings.java | 48 +++++++++++++++++-- .../ydb/topic/settings/WriterSettings.java | 47 ++++++++++++++++-- .../tech/ydb/topic/write/impl/WriterImpl.java | 2 +- 5 files changed, 91 insertions(+), 20 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java b/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java index d57352869..38d68ed5c 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java +++ b/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java @@ -8,10 +8,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; import org.slf4j.Logger; +import tech.ydb.common.retry.RetryConfig; import tech.ydb.core.Status; /** @@ -27,18 +27,18 @@ public abstract class GrpcStreamRetrier { private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABSDEFGHIJKLMNOPQRSTUVWXYZ1234567890" .toCharArray(); + private final RetryConfig retryConfig; protected final String id; protected final AtomicBoolean isReconnecting = new AtomicBoolean(false); protected final AtomicBoolean isStopped = new AtomicBoolean(false); protected final AtomicInteger reconnectCounter = new AtomicInteger(0); private final ScheduledExecutorService scheduler; - private final BiConsumer errorsHandler; - protected GrpcStreamRetrier(ScheduledExecutorService scheduler, BiConsumer errorsHandler) { + protected GrpcStreamRetrier(RetryConfig retryConfig, ScheduledExecutorService scheduler) { + this.retryConfig = retryConfig; this.scheduler = scheduler; this.id = generateRandomId(ID_LENGTH); - this.errorsHandler = errorsHandler; } protected abstract Logger getLogger(); @@ -131,10 +131,6 @@ protected void onSessionClosed(Status status, Throwable th) { } } - if (errorsHandler != null) { - errorsHandler.accept(status, th); - } - if (!isStopped.get()) { tryScheduleReconnect(); } else { diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index 984134e06..3a7688c42 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -55,7 +55,7 @@ public abstract class ReaderImpl extends GrpcStreamRetrier { private final String consumerName; public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) { - super(topicRpc.getScheduler(), settings.getErrorsHandler()); + super(settings.getRetryConfig(), topicRpc.getScheduler()); this.topicRpc = topicRpc; this.settings = settings; this.session = new ReadSessionImpl(); diff --git a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java index f27b719ed..fe477cceb 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java @@ -9,7 +9,10 @@ import com.google.common.collect.ImmutableList; +import tech.ydb.common.retry.RetryConfig; +import tech.ydb.common.retry.RetryPolicy; import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; /** * @author Nikolay Perfilov @@ -20,17 +23,17 @@ public class ReaderSettings { private final String consumerName; private final String readerName; private final List topics; + private final RetryConfig retryConfig; private final long maxMemoryUsageBytes; private final Executor decompressionExecutor; - private final BiConsumer errorsHandler; private ReaderSettings(Builder builder) { this.consumerName = builder.consumerName; this.readerName = builder.readerName; this.topics = ImmutableList.copyOf(builder.topics); + this.retryConfig = builder.retryConfig; this.maxMemoryUsageBytes = builder.maxMemoryUsageBytes; this.decompressionExecutor = builder.decompressionExecutor; - this.errorsHandler = builder.errorsHandler; } public String getConsumerName() { @@ -42,12 +45,17 @@ public String getReaderName() { return readerName; } + public RetryConfig getRetryConfig() { + return retryConfig; + } + public List getTopics() { return topics; } + @Deprecated public BiConsumer getErrorsHandler() { - return errorsHandler; + return null; } public long getMaxMemoryUsageBytes() { @@ -70,9 +78,9 @@ public static class Builder { private boolean readWithoutConsumer = false; private String readerName = null; private List topics = new ArrayList<>(); + private RetryConfig retryConfig = RetryConfig.idempotentRetryForever(); private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT; private Executor decompressionExecutor = null; - private BiConsumer errorsHandler = null; public Builder setConsumerName(String consumerName) { this.consumerName = consumerName; @@ -91,6 +99,7 @@ public Builder withoutConsumer() { /** * Set reader name for debug purposes + * @param readerName name of reader * @return settings builder */ public Builder setReaderName(String readerName) { @@ -108,13 +117,42 @@ public Builder setTopics(List topics) { return this; } + /** + * Set {@link RetryConfig} to define behavior of the stream internal retries + * @param config retry mode + * @return settings builder + */ + public Builder setRetryConfig(RetryConfig config) { + this.retryConfig = config; + return this; + } + public Builder setMaxMemoryUsageBytes(long maxMemoryUsageBytes) { this.maxMemoryUsageBytes = maxMemoryUsageBytes; return this; } + /** + * @param handler + * @return builder + * @deprecated use {@link Builder#setRetryConfig(tech.ydb.common.retry.RetryConfig)} instead + */ + @Deprecated public Builder setErrorsHandler(BiConsumer handler) { - this.errorsHandler = handler; + final RetryConfig currentConfig = retryConfig; + retryConfig = new RetryConfig() { + @Override + public RetryPolicy isStatusRetryable(StatusCode code) { + handler.accept(Status.of(code), null); + return currentConfig.isStatusRetryable(code); + } + + @Override + public RetryPolicy isThrowableRetryable(Throwable th) { + handler.accept(null, th); + return currentConfig.isThrowableRetryable(th); + } + }; return this; } diff --git a/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java b/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java index 7f160c4d6..0636d7652 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java @@ -2,7 +2,10 @@ import java.util.function.BiConsumer; +import tech.ydb.common.retry.RetryConfig; +import tech.ydb.common.retry.RetryPolicy; import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; import tech.ydb.topic.description.Codec; /** @@ -17,9 +20,9 @@ public class WriterSettings { private final String messageGroupId; private final Long partitionId; private final Codec codec; + private final RetryConfig retryConfig; private final long maxSendBufferMemorySize; private final int maxSendBufferMessagesCount; - private final BiConsumer errorsHandler; private WriterSettings(Builder builder) { this.topicPath = builder.topicPath; @@ -27,9 +30,9 @@ private WriterSettings(Builder builder) { this.messageGroupId = builder.messageGroupId; this.partitionId = builder.partitionId; this.codec = builder.codec; + this.retryConfig = builder.retryConfig; this.maxSendBufferMemorySize = builder.maxSendBufferMemorySize; this.maxSendBufferMessagesCount = builder.maxSendBufferMessagesCount; - this.errorsHandler = builder.errorsHandler; } public static Builder newBuilder() { @@ -48,8 +51,9 @@ public String getMessageGroupId() { return messageGroupId; } + @Deprecated public BiConsumer getErrorsHandler() { - return errorsHandler; + return null; } public Long getPartitionId() { @@ -60,6 +64,10 @@ public Codec getCodec() { return codec; } + public RetryConfig getRetryConfig() { + return retryConfig; + } + public long getMaxSendBufferMemorySize() { return maxSendBufferMemorySize; } @@ -77,9 +85,9 @@ public static class Builder { private String messageGroupId = null; private Long partitionId = null; private Codec codec = Codec.GZIP; + private RetryConfig retryConfig = RetryConfig.idempotentRetryForever(); private long maxSendBufferMemorySize = MAX_MEMORY_USAGE_BYTES_DEFAULT; private int maxSendBufferMessagesCount = MAX_IN_FLIGHT_COUNT_DEFAULT; - private BiConsumer errorsHandler = null; /** * Set path to a topic to write to @@ -135,6 +143,16 @@ public Builder setCodec(Codec codec) { return this; } + /** + * Set {@link RetryConfig} to define behavior of the stream internal retries + * @param config retry mode + * @return settings builder + */ + public Builder setRetryConfig(RetryConfig config) { + this.retryConfig = config; + return this; + } + /** * Set memory usage limit for send buffer. * Writer will not accept new messages if memory usage exceeds this limit. @@ -158,8 +176,27 @@ public Builder setMaxSendBufferMessagesCount(int maxMessagesCount) { return this; } + /** + * @param handler + * @return builder + * @deprecated use {@link Builder#setRetryConfig(tech.ydb.common.retry.RetryConfig)} instead + */ + @Deprecated public Builder setErrorsHandler(BiConsumer handler) { - this.errorsHandler = handler; + final RetryConfig currentConfig = retryConfig; + retryConfig = new RetryConfig() { + @Override + public RetryPolicy isStatusRetryable(StatusCode code) { + handler.accept(Status.of(code), null); + return currentConfig.isStatusRetryable(code); + } + + @Override + public RetryPolicy isThrowableRetryable(Throwable th) { + handler.accept(null, th); + return currentConfig.isThrowableRetryable(th); + } + }; return this; } diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index 49a7e7ebd..253ee11d8 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -66,7 +66,7 @@ public abstract class WriterImpl extends GrpcStreamRetrier { private CompletableFuture lastAcceptedMessageFuture; public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) { - super(topicRpc.getScheduler(), settings.getErrorsHandler()); + super(settings.getRetryConfig(), topicRpc.getScheduler()); this.topicRpc = topicRpc; this.settings = settings; this.session = new WriteSessionImpl(); From 9e5b678879d852d98499e79203e3c002443b0e45 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Thu, 23 Jan 2025 18:33:42 +0000 Subject: [PATCH 03/10] Updated implementation of GrpcStreamRetrier --- .../ydb/topic/impl/GrpcStreamRetrier.java | 102 ++++++++++-------- .../tech/ydb/topic/read/impl/ReaderImpl.java | 2 +- .../tech/ydb/topic/write/impl/WriterImpl.java | 2 +- 3 files changed, 57 insertions(+), 49 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java b/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java index 38d68ed5c..09b908145 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java +++ b/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java @@ -4,36 +4,31 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import tech.ydb.common.retry.RetryConfig; +import tech.ydb.common.retry.RetryPolicy; import tech.ydb.core.Status; /** * @author Nikolay Perfilov */ public abstract class GrpcStreamRetrier { - // TODO: add retry policy - private static final int MAX_RECONNECT_COUNT = 0; // Inf - private static final int EXP_BACKOFF_BASE_MS = 256; - private static final int EXP_BACKOFF_CEILING_MS = 40000; // 40 sec (max delays would be 40-80 sec) - private static final int EXP_BACKOFF_MAX_POWER = 7; private static final int ID_LENGTH = 6; private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABSDEFGHIJKLMNOPQRSTUVWXYZ1234567890" .toCharArray(); - private final RetryConfig retryConfig; protected final String id; protected final AtomicBoolean isReconnecting = new AtomicBoolean(false); protected final AtomicBoolean isStopped = new AtomicBoolean(false); - protected final AtomicInteger reconnectCounter = new AtomicInteger(0); private final ScheduledExecutorService scheduler; + private final RetryConfig retryConfig; + private volatile int retryCount; + private volatile long retryStartedAt; protected GrpcStreamRetrier(RetryConfig retryConfig, ScheduledExecutorService scheduler) { this.retryConfig = retryConfig; @@ -54,45 +49,31 @@ protected static String generateRandomId(int length) { .toString(); } - private void tryScheduleReconnect() { - int currentReconnectCounter = reconnectCounter.get() + 1; - if (MAX_RECONNECT_COUNT > 0 && currentReconnectCounter > MAX_RECONNECT_COUNT) { - if (isStopped.compareAndSet(false, true)) { - String errorMessage = "[" + id + "] Maximum retry count (" + MAX_RECONNECT_COUNT - + ") exceeded. Shutting down " + getStreamName(); - getLogger().error(errorMessage); - shutdownImpl(errorMessage); - return; - } else { - getLogger().info("[{}] Maximum retry count ({}}) exceeded. Need to shutdown {} but it's already " + - "shut down.", id, MAX_RECONNECT_COUNT, getStreamName()); - } - } - if (isReconnecting.compareAndSet(false, true)) { - reconnectCounter.set(currentReconnectCounter); - int delayMs = currentReconnectCounter <= EXP_BACKOFF_MAX_POWER - ? EXP_BACKOFF_BASE_MS * (1 << currentReconnectCounter) - : EXP_BACKOFF_CEILING_MS; - // Add jitter - delayMs = delayMs + ThreadLocalRandom.current().nextInt(delayMs); - getLogger().warn("[{}] Retry #{}. Scheduling {} reconnect in {}ms...", id, currentReconnectCounter, - getStreamName(), delayMs); - try { - scheduler.schedule(this::reconnect, delayMs, TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException exception) { - String errorMessage = "[" + id + "] Couldn't schedule reconnect: scheduler is already shut down. " + - "Shutting down " + getStreamName(); - getLogger().error(errorMessage); - shutdownImpl(errorMessage); - } - } else { + private void tryReconnect(long delay) { + if (!isReconnecting.compareAndSet(false, true)) { getLogger().info("[{}] should reconnect {} stream, but reconnect is already in progress", id, getStreamName()); + return; } + + getLogger().warn("[{}] Retry #{}. Scheduling {} reconnect in {}ms...", id, retryCount, getStreamName(), delay); + try { + scheduler.schedule(this::reconnect, delay, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException exception) { + String errorMessage = "[" + id + "] Couldn't schedule reconnect: scheduler is already shut down. " + + "Shutting down " + getStreamName(); + getLogger().error(errorMessage); + shutdownImpl(errorMessage); + } + } + + protected void resetRetries() { + retryStartedAt = -1; + retryCount = 0; } void reconnect() { - getLogger().info("[{}] {} reconnect #{} started", id, getStreamName(), reconnectCounter.get()); + getLogger().info("[{}] {} reconnect #{} started", id, getStreamName(), retryCount); if (!isReconnecting.compareAndSet(true, false)) { getLogger().warn("Couldn't reset reconnect flag. Shouldn't happen"); } @@ -115,26 +96,53 @@ protected CompletableFuture shutdownImpl(String reason) { protected void onSessionClosed(Status status, Throwable th) { getLogger().info("[{}] onSessionClosed called", id); + RetryPolicy retryPolicy = null; if (th != null) { getLogger().error("[{}] Exception in {} stream session: ", id, getStreamName(), th); + retryPolicy = retryConfig.isThrowableRetryable(th); } else { if (status.isSuccess()) { if (isStopped.get()) { getLogger().info("[{}] {} stream session closed successfully", id, getStreamName()); return; } else { - getLogger().warn("[{}] {} stream session was closed on working {}", id, getStreamName(), - getStreamName()); + getLogger().warn("[{}] {} stream session was closed on working {}", id, getStreamName()); } } else { getLogger().warn("[{}] Error in {} stream session: {}", id, getStreamName(), status); + retryPolicy = retryConfig.isStatusRetryable(status.getCode()); } } - if (!isStopped.get()) { - tryScheduleReconnect(); - } else { + if (isStopped.get()) { getLogger().info("[{}] {} is already stopped, no need to schedule reconnect", id, getStreamName()); + return; + } + + if (retryPolicy != null) { + if (retryCount < 1) { + retryStartedAt = System.currentTimeMillis(); + } + long delay = retryPolicy.nextRetryMs(retryCount + 1, System.currentTimeMillis() - retryStartedAt); + if (delay >= 0) { + retryCount++; + tryReconnect(delay); + return; + } } + + long elapsedMs = retryStartedAt > 0 ? System.currentTimeMillis() - retryStartedAt : 0; + if (!isStopped.compareAndSet(false, true)) { + getLogger().warn("[{}] Stopped after {} retries and {} ms elapsed. But {} is already shut down.", + id, retryCount, elapsedMs, getStreamName()); + return; + } + + String errorMessage = "[" + id + "] Stopped after " + retryCount + " retries and " + elapsedMs + + " ms elapsed. Shutting down " + getStreamName(); + getLogger().error(errorMessage); + shutdownImpl(errorMessage); } + + } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index 3a7688c42..43b420bac 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -515,7 +515,7 @@ private void processMessage(YdbTopic.StreamReadMessage.FromServer message) { } logger.debug("[{}] processMessage called", streamId); if (message.getStatus() == StatusCodesProtos.StatusIds.StatusCode.SUCCESS) { - reconnectCounter.set(0); + resetRetries(); } else { Status status = Status.of(StatusCode.fromProto(message.getStatus()), Issue.fromPb(message.getIssuesList())); diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index 253ee11d8..dde80321f 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -479,7 +479,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response) private void processMessage(YdbTopic.StreamWriteMessage.FromServer message) { logger.debug("[{}] processMessage called", streamId); if (message.getStatus() == StatusCodesProtos.StatusIds.StatusCode.SUCCESS) { - reconnectCounter.set(0); + resetRetries(); } else { Status status = Status.of(StatusCode.fromProto(message.getStatus()), Issue.fromPb(message.getIssuesList())); From 0f8a74b849b83f2666c9a23a87bd801bb99aff97 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Thu, 23 Jan 2025 15:45:48 +0000 Subject: [PATCH 04/10] Added base mock implementation for unit tests --- topic/pom.xml | 7 + .../ydb/topic/impl/GrpcStreamRetrier.java | 39 ++- .../tech/ydb/topic/read/impl/ReaderImpl.java | 7 +- .../tech/ydb/topic/write/impl/WriterImpl.java | 7 +- .../tech/ydb/topic/impl/BaseMockedTest.java | 229 ++++++++++++++++++ topic/src/test/resources/log4j2.xml | 2 +- 6 files changed, 258 insertions(+), 33 deletions(-) create mode 100644 topic/src/test/java/tech/ydb/topic/impl/BaseMockedTest.java diff --git a/topic/pom.xml b/topic/pom.xml index 98652283c..46d8ae4f1 100644 --- a/topic/pom.xml +++ b/topic/pom.xml @@ -40,14 +40,21 @@ zstd-jni 1.5.2-5 + junit junit test + + org.mockito + mockito-inline + test + tech.ydb.test ydb-junit4-support + test org.apache.logging.log4j diff --git a/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java b/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java index 09b908145..010b7dc3b 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java +++ b/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java @@ -25,18 +25,19 @@ public abstract class GrpcStreamRetrier { protected final AtomicBoolean isReconnecting = new AtomicBoolean(false); protected final AtomicBoolean isStopped = new AtomicBoolean(false); + private final Logger logger; private final ScheduledExecutorService scheduler; private final RetryConfig retryConfig; private volatile int retryCount; private volatile long retryStartedAt; - protected GrpcStreamRetrier(RetryConfig retryConfig, ScheduledExecutorService scheduler) { + protected GrpcStreamRetrier(Logger logger, RetryConfig retryConfig, ScheduledExecutorService scheduler) { + this.logger = logger; this.retryConfig = retryConfig; this.scheduler = scheduler; this.id = generateRandomId(ID_LENGTH); } - protected abstract Logger getLogger(); protected abstract String getStreamName(); protected abstract void onStreamReconnect(); protected abstract void onShutdown(String reason); @@ -49,20 +50,20 @@ protected static String generateRandomId(int length) { .toString(); } - private void tryReconnect(long delay) { + private void tryScheduleReconnect(long delay) { if (!isReconnecting.compareAndSet(false, true)) { - getLogger().info("[{}] should reconnect {} stream, but reconnect is already in progress", id, + logger.info("[{}] should reconnect {} stream, but reconnect is already in progress", id, getStreamName()); return; } - getLogger().warn("[{}] Retry #{}. Scheduling {} reconnect in {}ms...", id, retryCount, getStreamName(), delay); + logger.warn("[{}] Retry #{}. Scheduling {} reconnect in {}ms...", id, retryCount, getStreamName(), delay); try { scheduler.schedule(this::reconnect, delay, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException exception) { String errorMessage = "[" + id + "] Couldn't schedule reconnect: scheduler is already shut down. " + "Shutting down " + getStreamName(); - getLogger().error(errorMessage); + logger.error(errorMessage); shutdownImpl(errorMessage); } } @@ -73,9 +74,9 @@ protected void resetRetries() { } void reconnect() { - getLogger().info("[{}] {} reconnect #{} started", id, getStreamName(), retryCount); + logger.info("[{}] {} reconnect #{} started", id, getStreamName(), retryCount); if (!isReconnecting.compareAndSet(true, false)) { - getLogger().warn("Couldn't reset reconnect flag. Shouldn't happen"); + logger.warn("Couldn't reset reconnect flag. Shouldn't happen"); } onStreamReconnect(); } @@ -85,7 +86,7 @@ protected CompletableFuture shutdownImpl() { } protected CompletableFuture shutdownImpl(String reason) { - getLogger().info("[{}] Shutting down {}" + logger.info("[{}] Shutting down {}" + (reason == null || reason.isEmpty() ? "" : " with reason: " + reason), id, getStreamName()); isStopped.set(true); return CompletableFuture.runAsync(() -> { @@ -94,28 +95,28 @@ protected CompletableFuture shutdownImpl(String reason) { } protected void onSessionClosed(Status status, Throwable th) { - getLogger().info("[{}] onSessionClosed called", id); + logger.info("[{}] onSessionClosed called", id); RetryPolicy retryPolicy = null; if (th != null) { - getLogger().error("[{}] Exception in {} stream session: ", id, getStreamName(), th); + logger.error("[{}] Exception in {} stream session: ", id, getStreamName(), th); retryPolicy = retryConfig.isThrowableRetryable(th); } else { if (status.isSuccess()) { if (isStopped.get()) { - getLogger().info("[{}] {} stream session closed successfully", id, getStreamName()); + logger.info("[{}] {} stream session closed successfully", id, getStreamName()); return; } else { - getLogger().warn("[{}] {} stream session was closed on working {}", id, getStreamName()); + logger.warn("[{}] {} stream session was closed on working {}", id, getStreamName()); } } else { - getLogger().warn("[{}] Error in {} stream session: {}", id, getStreamName(), status); + logger.warn("[{}] Error in {} stream session: {}", id, getStreamName(), status); retryPolicy = retryConfig.isStatusRetryable(status.getCode()); } } if (isStopped.get()) { - getLogger().info("[{}] {} is already stopped, no need to schedule reconnect", id, getStreamName()); + logger.info("[{}] {} is already stopped, no need to schedule reconnect", id, getStreamName()); return; } @@ -126,23 +127,21 @@ protected void onSessionClosed(Status status, Throwable th) { long delay = retryPolicy.nextRetryMs(retryCount + 1, System.currentTimeMillis() - retryStartedAt); if (delay >= 0) { retryCount++; - tryReconnect(delay); + tryScheduleReconnect(delay); return; } } long elapsedMs = retryStartedAt > 0 ? System.currentTimeMillis() - retryStartedAt : 0; if (!isStopped.compareAndSet(false, true)) { - getLogger().warn("[{}] Stopped after {} retries and {} ms elapsed. But {} is already shut down.", + logger.warn("[{}] Stopped after {} retries and {} ms elapsed. But {} is already shut down.", id, retryCount, elapsedMs, getStreamName()); return; } String errorMessage = "[" + id + "] Stopped after " + retryCount + " retries and " + elapsedMs + " ms elapsed. Shutting down " + getStreamName(); - getLogger().error(errorMessage); + logger.error(errorMessage); shutdownImpl(errorMessage); } - - } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index 43b420bac..a0694e5c7 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -55,7 +55,7 @@ public abstract class ReaderImpl extends GrpcStreamRetrier { private final String consumerName; public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) { - super(settings.getRetryConfig(), topicRpc.getScheduler()); + super(logger, settings.getRetryConfig(), topicRpc.getScheduler()); this.topicRpc = topicRpc; this.settings = settings; this.session = new ReadSessionImpl(); @@ -88,11 +88,6 @@ public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) { logger.info(message.toString()); } - @Override - protected Logger getLogger() { - return logger; - } - @Override protected String getStreamName() { return "Reader"; diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index dde80321f..41529d97e 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -66,7 +66,7 @@ public abstract class WriterImpl extends GrpcStreamRetrier { private CompletableFuture lastAcceptedMessageFuture; public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) { - super(settings.getRetryConfig(), topicRpc.getScheduler()); + super(logger, settings.getRetryConfig(), topicRpc.getScheduler()); this.topicRpc = topicRpc; this.settings = settings; this.session = new WriteSessionImpl(); @@ -81,11 +81,6 @@ public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressi logger.info(message); } - @Override - protected Logger getLogger() { - return logger; - } - @Override protected String getStreamName() { return "Writer"; diff --git a/topic/src/test/java/tech/ydb/topic/impl/BaseMockedTest.java b/topic/src/test/java/tech/ydb/topic/impl/BaseMockedTest.java new file mode 100644 index 000000000..9deeb4f12 --- /dev/null +++ b/topic/src/test/java/tech/ydb/topic/impl/BaseMockedTest.java @@ -0,0 +1,229 @@ +package tech.ydb.topic.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import org.junit.Assert; +import org.junit.Before; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.mockito.stubbing.OngoingStubbing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.grpc.GrpcReadStream; +import tech.ydb.core.grpc.GrpcReadWriteStream; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.proto.StatusCodesProtos; +import tech.ydb.proto.topic.YdbTopic; +import tech.ydb.proto.topic.v1.TopicServiceGrpc; +import tech.ydb.topic.TopicClient; + +/** + * + * @author Aleksandr Gorshenin + */ +public class BaseMockedTest { + private static final Logger logger = LoggerFactory.getLogger(BaseMockedTest.class); + + private interface WriteStream extends + GrpcReadWriteStream { + } + + private final GrpcTransport transport = Mockito.mock(GrpcTransport.class); + private final ScheduledExecutorService scheduler = Mockito.mock(ScheduledExecutorService.class); + private final ScheduledFuture emptyFuture = Mockito.mock(ScheduledFuture.class); + private final WriteStream writeStream = Mockito.mock(WriteStream.class); + private final SchedulerAssert schedulerHelper = new SchedulerAssert(); + + protected final TopicClient client = TopicClient.newClient(transport) + .setCompressionExecutor(Runnable::run) // Disable compression in separate executors + .build(); + + private volatile MockedWriteStream streamMock = null; + + @Before + public void beforeEach() { + streamMock = null; + + Mockito.when(transport.getScheduler()).thenReturn(scheduler); + Mockito.when(transport.readWriteStreamCall(Mockito.eq(TopicServiceGrpc.getStreamWriteMethod()), Mockito.any())) + .thenReturn(writeStream); + + // Every writeStream.start updates mockedWriteStream + Mockito.when(writeStream.start(Mockito.any())).thenAnswer(defaultStreamMockAnswer()); + + // Every writeStream.senbNext add message from client to mockedWriteStream.sent list + Mockito.doAnswer((Answer) (InvocationOnMock iom) -> { + streamMock.sent.add(iom.getArgument(0, YdbTopic.StreamWriteMessage.FromClient.class)); + return null; + }).when(writeStream).sendNext(Mockito.any()); + + Mockito.when(scheduler.schedule(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any())) + .thenAnswer((InvocationOnMock iom) -> { + logger.debug("mock scheduled task"); + schedulerHelper.tasks.add(iom.getArgument(0, Runnable.class)); + return emptyFuture; + }); + } + + protected MockedWriteStream currentStream() { + return streamMock; + } + + protected SchedulerAssert getScheduler() { + return schedulerHelper; + } + + protected OngoingStubbing> mockStreams() { + return Mockito.when(writeStream.start(Mockito.any())); + } + + protected Answer> defaultStreamMockAnswer() { + return (InvocationOnMock iom) -> { + streamMock = new MockedWriteStream(iom.getArgument(0)); + return streamMock.streamFuture; + }; + } + + protected Answer> errorStreamMockAnswer(StatusCode code) { + return (iom) -> { + streamMock = null; + return CompletableFuture.completedFuture(Status.of(code)); + }; + } + + protected static class SchedulerAssert { + private final Queue tasks = new ConcurrentLinkedQueue<>(); + + public SchedulerAssert hasNoTasks() { + Assert.assertTrue(tasks.isEmpty()); + return this; + } + + public SchedulerAssert hasTasks(int count) { + Assert.assertEquals(count, tasks.size()); + return this; + } + + public SchedulerAssert executeNextTasks(int count) { + Assert.assertTrue(count <= tasks.size()); + + CompletableFuture.runAsync(() -> { + logger.debug("execute {} scheduled tasks", count); + for (int idx = 0; idx < count; idx++) { + tasks.poll().run(); + } + }).join(); + return this; + } + } + + protected static class MockedWriteStream { + private final GrpcReadWriteStream.Observer observer; + private final CompletableFuture streamFuture = new CompletableFuture<>(); + private final List sent = new ArrayList<>(); + private volatile int sentIdx = 0; + + public MockedWriteStream(GrpcReadStream.Observer observer) { + this.observer = observer; + } + + public void complete(Status status) { + streamFuture.complete(status); + } + + public void complete(Throwable th) { + streamFuture.completeExceptionally(th); + } + + public void hasNoNewMessages() { + Assert.assertTrue(sentIdx >= sent.size()); + } + + public Checker nextMsg() { + Assert.assertTrue(sentIdx < sent.size()); + return new Checker(sent.get(sentIdx++)); + } + + public void responseErrorBadRequest() { + YdbTopic.StreamWriteMessage.FromServer msg = YdbTopic.StreamWriteMessage.FromServer.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.BAD_REQUEST) + .build(); + observer.onNext(msg); + } + + public void responseInit(long lastSeqNo) { + responseInit(lastSeqNo, 123, "mocked", new int[] { 0, 1, 2}); + } + + public void responseInit(long lastSeqNo, long partitionId, String sessionId, int[] codecs) { + YdbTopic.StreamWriteMessage.FromServer msg = YdbTopic.StreamWriteMessage.FromServer.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) + .setInitResponse(YdbTopic.StreamWriteMessage.InitResponse.newBuilder() + .setLastSeqNo(lastSeqNo) + .setPartitionId(partitionId) + .setSessionId(sessionId) + .setSupportedCodecs(YdbTopic.SupportedCodecs.newBuilder() + .addAllCodecs(IntStream.of(codecs).boxed().collect(Collectors.toList()))) + ).build(); + observer.onNext(msg); + } + + public void responseWriteWritten(long firstSeqNo, int messagesCount) { + List acks = LongStream + .range(firstSeqNo, firstSeqNo + messagesCount) + .mapToObj(seqNo -> YdbTopic.StreamWriteMessage.WriteResponse.WriteAck.newBuilder() + .setSeqNo(seqNo) + .setWritten(YdbTopic.StreamWriteMessage.WriteResponse.WriteAck.Written.newBuilder()) + .build()) + .collect(Collectors.toList()); + + YdbTopic.StreamWriteMessage.FromServer msg = YdbTopic.StreamWriteMessage.FromServer.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) + .setWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse.newBuilder().addAllAcks(acks)) + .build(); + observer.onNext(msg); + } + + protected class Checker { + private final YdbTopic.StreamWriteMessage.FromClient msg; + + public Checker(YdbTopic.StreamWriteMessage.FromClient msg) { + this.msg = msg; + } + + public Checker isInit() { + Assert.assertTrue(msg.hasInitRequest()); + return this; + } + + public Checker hasInitPath(String path) { + Assert.assertEquals(path, msg.getInitRequest().getPath()); + return this; + } + + public Checker isWrite() { + Assert.assertTrue(msg.hasWriteRequest()); + return this; + } + + public Checker hasWrite(int codec, int messagesCount) { + Assert.assertEquals(codec, msg.getWriteRequest().getCodec()); + Assert.assertEquals(messagesCount, msg.getWriteRequest().getMessagesCount()); + return this; + } + } + } +} diff --git a/topic/src/test/resources/log4j2.xml b/topic/src/test/resources/log4j2.xml index c799da309..c59b5d2aa 100644 --- a/topic/src/test/resources/log4j2.xml +++ b/topic/src/test/resources/log4j2.xml @@ -2,7 +2,7 @@ - + From 44bc7de2d19808f09c7610d64885307ab0f46abb Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Thu, 23 Jan 2025 18:34:03 +0000 Subject: [PATCH 05/10] Tests for RetryMode --- .../tech/ydb/topic/impl/RetryModeTest.java | 233 ++++++++++++++++++ 1 file changed, 233 insertions(+) create mode 100644 topic/src/test/java/tech/ydb/topic/impl/RetryModeTest.java diff --git a/topic/src/test/java/tech/ydb/topic/impl/RetryModeTest.java b/topic/src/test/java/tech/ydb/topic/impl/RetryModeTest.java new file mode 100644 index 000000000..f39c5162a --- /dev/null +++ b/topic/src/test/java/tech/ydb/topic/impl/RetryModeTest.java @@ -0,0 +1,233 @@ +package tech.ydb.topic.impl; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.Assert; +import org.junit.Test; + +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.topic.settings.RetryMode; +import tech.ydb.topic.settings.WriterSettings; +import tech.ydb.topic.write.Message; +import tech.ydb.topic.write.SyncWriter; + +/** + * + * @author Aleksandr Gorshenin + */ +public class RetryModeTest extends BaseMockedTest { + + @Test + public void alwaysRetryWriterTest() throws InterruptedException, ExecutionException, TimeoutException { + mockStreams() + .then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE)) + .then(defaultStreamMockAnswer()) + .then(errorStreamMockAnswer(StatusCode.OVERLOADED)) + .then(defaultStreamMockAnswer()); // and repeat + + SyncWriter writer = client.createSyncWriter(WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setRetryMode(RetryMode.ALWAYS) + .build()); + writer.init(); + + // Retry #1 - TRANSPORT_UNAVAILABLE + Assert.assertNull(currentStream()); + getScheduler().hasTasks(1).executeNextTasks(1); + + MockedWriteStream stream1 = currentStream(); + stream1.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream1.hasNoNewMessages(); + stream1.responseInit(0); + + writer.send(Message.of("test-message".getBytes())); + stream1.nextMsg().isWrite().hasWrite(2, 1); + stream1.responseWriteWritten(1, 1); + + stream1.complete(Status.SUCCESS); + + // Retry #2 - Stream is closed by server + getScheduler().hasTasks(1).executeNextTasks(1); + + // Retry #3 - OVERLOADED + getScheduler().hasTasks(1).executeNextTasks(1); + + MockedWriteStream stream2 = currentStream(); + Assert.assertNotEquals(stream1, stream2); + + stream2.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream2.hasNoNewMessages(); + stream2.responseErrorBadRequest(); + + // Retry #4 - Stream send bad request + getScheduler().hasTasks(1).executeNextTasks(1); + + MockedWriteStream stream3 = currentStream(); + Assert.assertNotEquals(stream2, stream3); + + stream3.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream3.hasNoNewMessages(); + stream3.responseInit(1); + + writer.send(Message.of("other-message".getBytes())); + stream3.nextMsg().isWrite().hasWrite(2, 1); + stream3.responseWriteWritten(2, 1); + + writer.flush(); + writer.shutdown(1, TimeUnit.SECONDS); + stream3.complete(Status.SUCCESS); + } + + @Test + public void disabledRetryNetworkErrorTest() throws InterruptedException, ExecutionException, TimeoutException { + mockStreams() + .then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE)); + + WriterSettings settings = WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setRetryMode(RetryMode.NONE) + .build(); + + SyncWriter writer = client.createSyncWriter(settings); + writer.init(); + + // No stream and no retries in scheduler + Assert.assertNull(currentStream()); + getScheduler().hasNoTasks(); + + RuntimeException ex = Assert.assertThrows(RuntimeException.class, + () -> writer.send(Message.of("test-message".getBytes()))); + Assert.assertEquals("Writer is already stopped", ex.getMessage()); + + writer.shutdown(1, TimeUnit.SECONDS); + } + + @Test + public void disabledRetryStreamCloseTest() throws InterruptedException, ExecutionException, TimeoutException { + WriterSettings settings = WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setRetryMode(RetryMode.NONE) + .build(); + + SyncWriter writer = client.createSyncWriter(settings); + writer.init(); + + MockedWriteStream stream1 = currentStream(); + stream1.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream1.hasNoNewMessages(); + stream1.responseInit(0); + + // Even successful completing closes writer + stream1.complete(Status.SUCCESS); + + RuntimeException ex = Assert.assertThrows(RuntimeException.class, + () -> writer.send(Message.of("test-message".getBytes()))); + Assert.assertEquals("Writer is already stopped", ex.getMessage()); + + writer.shutdown(1, TimeUnit.SECONDS); + } + + @Test + public void disabledRetryStreamErrorTest() throws InterruptedException, ExecutionException, TimeoutException { + WriterSettings settings = WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setRetryMode(RetryMode.NONE) + .build(); + + SyncWriter writer = client.createSyncWriter(settings); + writer.init(); + + MockedWriteStream stream1 = currentStream(); + stream1.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream1.hasNoNewMessages(); + stream1.responseInit(0); + + stream1.responseErrorBadRequest(); + stream1.complete(Status.SUCCESS); + + RuntimeException ex = Assert.assertThrows(RuntimeException.class, + () -> writer.send(Message.of("test-message".getBytes()))); + Assert.assertEquals("Writer is already stopped", ex.getMessage()); + + writer.shutdown(1, TimeUnit.SECONDS); + } + + @Test + public void recoverRetryNetworkErrorTest() throws InterruptedException, ExecutionException, TimeoutException { + mockStreams() + .then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE)); + + WriterSettings settings = WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setRetryMode(RetryMode.RECOVER) + .build(); + + SyncWriter writer = client.createSyncWriter(settings); + writer.init(); + + // No stream and no retries in scheduler + Assert.assertNull(currentStream()); + getScheduler().hasNoTasks(); + + RuntimeException ex = Assert.assertThrows(RuntimeException.class, + () -> writer.send(Message.of("test-message".getBytes()))); + Assert.assertEquals("Writer is already stopped", ex.getMessage()); + + writer.shutdown(1, TimeUnit.SECONDS); + } + + @Test + public void recoverRetryWriterTest() throws InterruptedException, ExecutionException, TimeoutException { + mockStreams() + .then(defaultStreamMockAnswer()) + .then(errorStreamMockAnswer(StatusCode.OVERLOADED)) + .then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE)) + .then(errorStreamMockAnswer(StatusCode.OVERLOADED)) + .then(defaultStreamMockAnswer()); // and repeat + + SyncWriter writer = client.createSyncWriter(WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setRetryMode(RetryMode.RECOVER) + .build()); + writer.init(); + + MockedWriteStream stream1 = currentStream(); + stream1.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream1.hasNoNewMessages(); + stream1.responseInit(0); + + writer.send(Message.of("test-message".getBytes())); + stream1.nextMsg().isWrite().hasWrite(2, 1); + stream1.responseWriteWritten(1, 1); + + stream1.complete(new RuntimeException("io exception")); + + // Retry #1 - Stream is by runtime exception + getScheduler().hasTasks(1).executeNextTasks(1); + + // Retry #2 - OVERLOADED + getScheduler().hasTasks(1).executeNextTasks(1); + // Retry #3 - TRANSPORT_UNAVAILABLE + getScheduler().hasTasks(1).executeNextTasks(1); + // Retry #4 - OVERLOADED + getScheduler().hasTasks(1).executeNextTasks(1); + + MockedWriteStream stream2 = currentStream(); + Assert.assertNotEquals(stream1, stream2); + + stream2.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream2.hasNoNewMessages(); + stream2.responseInit(1); + + writer.send(Message.of("other-message".getBytes())); + stream2.nextMsg().isWrite().hasWrite(2, 1); + stream2.responseWriteWritten(2, 1); + + writer.flush(); + writer.shutdown(1, TimeUnit.SECONDS); + stream2.complete(Status.SUCCESS); + } +} From 16fd5e0357eaee912432dcbccc258e048c096252 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 27 Jan 2025 13:55:54 +0000 Subject: [PATCH 06/10] Revert old behavior with infinite retries by default --- .../ydb/topic/impl/GrpcStreamRetrier.java | 20 ++++++++++++-- .../ydb/topic/settings/ReaderSettings.java | 3 ++- .../ydb/topic/settings/WriterSettings.java | 3 ++- ...tryModeTest.java => TopicRetriesTest.java} | 26 ++++++++++--------- 4 files changed, 36 insertions(+), 16 deletions(-) rename topic/src/test/java/tech/ydb/topic/impl/{RetryModeTest.java => TopicRetriesTest.java} (90%) diff --git a/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java b/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java index 010b7dc3b..f8d22def3 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java +++ b/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java @@ -9,14 +9,30 @@ import org.slf4j.Logger; +import tech.ydb.common.retry.ExponentialBackoffRetry; import tech.ydb.common.retry.RetryConfig; import tech.ydb.common.retry.RetryPolicy; import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; /** * @author Nikolay Perfilov */ public abstract class GrpcStreamRetrier { + public static final RetryConfig RETRY_ALL = new RetryConfig() { + @Override + public RetryPolicy isStatusRetryable(StatusCode code) { + return RETRY_ALL_POLICY; + } + + @Override + public RetryPolicy isThrowableRetryable(Throwable th) { + return RETRY_ALL_POLICY; + } + }; + + private static final RetryPolicy RETRY_ALL_POLICY = new ExponentialBackoffRetry(256, 7); + private static final int ID_LENGTH = 6; private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABSDEFGHIJKLMNOPQRSTUVWXYZ1234567890" .toCharArray(); @@ -97,7 +113,7 @@ protected CompletableFuture shutdownImpl(String reason) { protected void onSessionClosed(Status status, Throwable th) { logger.info("[{}] onSessionClosed called", id); - RetryPolicy retryPolicy = null; + RetryPolicy retryPolicy; if (th != null) { logger.error("[{}] Exception in {} stream session: ", id, getStreamName(), th); retryPolicy = retryConfig.isThrowableRetryable(th); @@ -111,8 +127,8 @@ protected void onSessionClosed(Status status, Throwable th) { } } else { logger.warn("[{}] Error in {} stream session: {}", id, getStreamName(), status); - retryPolicy = retryConfig.isStatusRetryable(status.getCode()); } + retryPolicy = retryConfig.isStatusRetryable(status.getCode()); } if (isStopped.get()) { diff --git a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java index fe477cceb..af2c324fb 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java @@ -13,6 +13,7 @@ import tech.ydb.common.retry.RetryPolicy; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; +import tech.ydb.topic.impl.GrpcStreamRetrier; /** * @author Nikolay Perfilov @@ -78,7 +79,7 @@ public static class Builder { private boolean readWithoutConsumer = false; private String readerName = null; private List topics = new ArrayList<>(); - private RetryConfig retryConfig = RetryConfig.idempotentRetryForever(); + private RetryConfig retryConfig = GrpcStreamRetrier.RETRY_ALL; private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT; private Executor decompressionExecutor = null; diff --git a/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java b/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java index 0636d7652..4de103438 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java @@ -7,6 +7,7 @@ import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.topic.description.Codec; +import tech.ydb.topic.impl.GrpcStreamRetrier; /** * @author Nikolay Perfilov @@ -85,7 +86,7 @@ public static class Builder { private String messageGroupId = null; private Long partitionId = null; private Codec codec = Codec.GZIP; - private RetryConfig retryConfig = RetryConfig.idempotentRetryForever(); + private RetryConfig retryConfig = GrpcStreamRetrier.RETRY_ALL; private long maxSendBufferMemorySize = MAX_MEMORY_USAGE_BYTES_DEFAULT; private int maxSendBufferMessagesCount = MAX_IN_FLIGHT_COUNT_DEFAULT; diff --git a/topic/src/test/java/tech/ydb/topic/impl/RetryModeTest.java b/topic/src/test/java/tech/ydb/topic/impl/TopicRetriesTest.java similarity index 90% rename from topic/src/test/java/tech/ydb/topic/impl/RetryModeTest.java rename to topic/src/test/java/tech/ydb/topic/impl/TopicRetriesTest.java index f39c5162a..ebd260bd3 100644 --- a/topic/src/test/java/tech/ydb/topic/impl/RetryModeTest.java +++ b/topic/src/test/java/tech/ydb/topic/impl/TopicRetriesTest.java @@ -7,9 +7,10 @@ import org.junit.Assert; import org.junit.Test; +import tech.ydb.common.retry.RetryConfig; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; -import tech.ydb.topic.settings.RetryMode; +import tech.ydb.core.UnexpectedResultException; import tech.ydb.topic.settings.WriterSettings; import tech.ydb.topic.write.Message; import tech.ydb.topic.write.SyncWriter; @@ -18,10 +19,10 @@ * * @author Aleksandr Gorshenin */ -public class RetryModeTest extends BaseMockedTest { +public class TopicRetriesTest extends BaseMockedTest { @Test - public void alwaysRetryWriterTest() throws InterruptedException, ExecutionException, TimeoutException { + public void defaultRetryWriterTest() throws InterruptedException, ExecutionException, TimeoutException { mockStreams() .then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE)) .then(defaultStreamMockAnswer()) @@ -30,7 +31,6 @@ public void alwaysRetryWriterTest() throws InterruptedException, ExecutionExcept SyncWriter writer = client.createSyncWriter(WriterSettings.newBuilder() .setTopicPath("/mocked_topic") - .setRetryMode(RetryMode.ALWAYS) .build()); writer.init(); @@ -47,7 +47,7 @@ public void alwaysRetryWriterTest() throws InterruptedException, ExecutionExcept stream1.nextMsg().isWrite().hasWrite(2, 1); stream1.responseWriteWritten(1, 1); - stream1.complete(Status.SUCCESS); + stream1.complete(Status.of(StatusCode.SUCCESS)); // Retry #2 - Stream is closed by server getScheduler().hasTasks(1).executeNextTasks(1); @@ -88,7 +88,7 @@ public void disabledRetryNetworkErrorTest() throws InterruptedException, Executi WriterSettings settings = WriterSettings.newBuilder() .setTopicPath("/mocked_topic") - .setRetryMode(RetryMode.NONE) + .setRetryConfig(RetryConfig.noRetries()) .build(); SyncWriter writer = client.createSyncWriter(settings); @@ -109,7 +109,7 @@ public void disabledRetryNetworkErrorTest() throws InterruptedException, Executi public void disabledRetryStreamCloseTest() throws InterruptedException, ExecutionException, TimeoutException { WriterSettings settings = WriterSettings.newBuilder() .setTopicPath("/mocked_topic") - .setRetryMode(RetryMode.NONE) + .setRetryConfig(RetryConfig.noRetries()) .build(); SyncWriter writer = client.createSyncWriter(settings); @@ -134,7 +134,7 @@ public void disabledRetryStreamCloseTest() throws InterruptedException, Executio public void disabledRetryStreamErrorTest() throws InterruptedException, ExecutionException, TimeoutException { WriterSettings settings = WriterSettings.newBuilder() .setTopicPath("/mocked_topic") - .setRetryMode(RetryMode.NONE) + .setRetryConfig(RetryConfig.noRetries()) .build(); SyncWriter writer = client.createSyncWriter(settings); @@ -162,7 +162,7 @@ public void recoverRetryNetworkErrorTest() throws InterruptedException, Executio WriterSettings settings = WriterSettings.newBuilder() .setTopicPath("/mocked_topic") - .setRetryMode(RetryMode.RECOVER) + .setRetryConfig(RetryConfig.noRetries()) .build(); SyncWriter writer = client.createSyncWriter(settings); @@ -180,7 +180,7 @@ public void recoverRetryNetworkErrorTest() throws InterruptedException, Executio } @Test - public void recoverRetryWriterTest() throws InterruptedException, ExecutionException, TimeoutException { + public void idempotentRetryWriterTest() throws InterruptedException, ExecutionException, TimeoutException { mockStreams() .then(defaultStreamMockAnswer()) .then(errorStreamMockAnswer(StatusCode.OVERLOADED)) @@ -190,7 +190,7 @@ public void recoverRetryWriterTest() throws InterruptedException, ExecutionExcep SyncWriter writer = client.createSyncWriter(WriterSettings.newBuilder() .setTopicPath("/mocked_topic") - .setRetryMode(RetryMode.RECOVER) + .setRetryConfig(RetryConfig.idempotentRetryForever()) .build()); writer.init(); @@ -203,7 +203,9 @@ public void recoverRetryWriterTest() throws InterruptedException, ExecutionExcep stream1.nextMsg().isWrite().hasWrite(2, 1); stream1.responseWriteWritten(1, 1); - stream1.complete(new RuntimeException("io exception")); + stream1.complete(new RuntimeException("io exception", + new UnexpectedResultException("inner", Status.of(StatusCode.CLIENT_INTERNAL_ERROR))) + ); // Retry #1 - Stream is by runtime exception getScheduler().hasTasks(1).executeNextTasks(1); From a80cd8864316b45c3346addfab1d3fc76a8784b7 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Tue, 28 Jan 2025 11:26:12 +0000 Subject: [PATCH 07/10] Update methods names and javadoc --- .../tech/ydb/common/retry/RetryConfig.java | 25 +++-- .../ydb/common/retry/YdbRetryBuilder.java | 2 +- .../tech/ydb/common/retry/YdbRetryConfig.java | 16 +-- .../ydb/common/retry/RetryConfigTest.java | 100 +++++++++--------- .../ydb/topic/impl/GrpcStreamRetrier.java | 8 +- 5 files changed, 78 insertions(+), 73 deletions(-) diff --git a/common/src/main/java/tech/ydb/common/retry/RetryConfig.java b/common/src/main/java/tech/ydb/common/retry/RetryConfig.java index f3c3fbd15..6774afc2e 100644 --- a/common/src/main/java/tech/ydb/common/retry/RetryConfig.java +++ b/common/src/main/java/tech/ydb/common/retry/RetryConfig.java @@ -18,7 +18,7 @@ public interface RetryConfig { * @param code status code to check * @return policy of retries or {@code null} if the status code is not retryable */ - RetryPolicy isStatusRetryable(StatusCode code); + RetryPolicy getStatusCodeRetryPolicy(StatusCode code); /** * Returns retry policy for the given exception and {@code null} if that exception is not retryable @@ -26,17 +26,19 @@ public interface RetryConfig { * @param th exception to check * @return policy of retries or {@code null} if the exception is not retryable */ - default RetryPolicy isThrowableRetryable(Throwable th) { + default RetryPolicy getThrowableRetryPolicy(Throwable th) { for (Throwable ex = th; ex != null; ex = ex.getCause()) { if (ex instanceof UnexpectedResultException) { - return isStatusRetryable(((UnexpectedResultException) ex).getStatus().getCode()); + return getStatusCodeRetryPolicy(((UnexpectedResultException) ex).getStatus().getCode()); } } return null; } /** - * Retries a non idempotent operation forever with default exponential delay + * Infinity retries with default exponential delay.
That policy does not retries conditionally + * retryable errors so it can be used for both as idempotent and non idempotent operations + * * @return retry configuration object */ static RetryConfig retryForever() { @@ -44,7 +46,8 @@ static RetryConfig retryForever() { } /** - * Retries a non idempotent operation with default exponential until the specified elapsed milliseconds expire + * Retries until the specified elapsed milliseconds expire.
That policy does not retries + * conditionally retryable errors so it can be used for both as idempotent and non idempotent operations * @param maxElapsedMs maximum timeout for retries * @return retry configuration object */ @@ -53,20 +56,22 @@ static RetryConfig retryUntilElapsed(long maxElapsedMs) { } /** - * Retries an idempotent operation forever with default exponential delay + * Infinity retries with default exponential delay.
That policy does retries conditionally + * retryable errors so it can be used ONLY for idempotent operations * @return retry configuration object */ static RetryConfig idempotentRetryForever() { - return newConfig().retryIdempotent(true).retryForever(); + return newConfig().retryConditionallyRetryableErrors(true).retryForever(); } /** - * Retries an idempotent operation with default exponential until the specified elapsed milliseconds expire + * Retries until the specified elapsed milliseconds expire.
That policy does retries + * conditionally retryable errors so it can be used ONLY for idempotent operations * @param maxElapsedMs maximum timeout for retries * @return retry configuration object */ static RetryConfig idempotentRetryUntilElapsed(long maxElapsedMs) { - return newConfig().retryIdempotent(true).retryUntilElapsed(maxElapsedMs); + return newConfig().retryConditionallyRetryableErrors(true).retryUntilElapsed(maxElapsedMs); } /** @@ -86,7 +91,7 @@ static Builder newConfig() { } interface Builder { - Builder retryIdempotent(boolean retry); + Builder retryConditionallyRetryableErrors(boolean retry); Builder retryNotFound(boolean retry); Builder withSlowBackoff(long backoff, int ceiling); Builder withFastBackoff(long backoff, int ceiling); diff --git a/common/src/main/java/tech/ydb/common/retry/YdbRetryBuilder.java b/common/src/main/java/tech/ydb/common/retry/YdbRetryBuilder.java index 2ea0f4c97..dffedf382 100644 --- a/common/src/main/java/tech/ydb/common/retry/YdbRetryBuilder.java +++ b/common/src/main/java/tech/ydb/common/retry/YdbRetryBuilder.java @@ -15,7 +15,7 @@ class YdbRetryBuilder implements RetryConfig.Builder { private int slowCeiling = 6; @Override - public YdbRetryBuilder retryIdempotent(boolean retry) { + public YdbRetryBuilder retryConditionallyRetryableErrors(boolean retry) { this.idempotent = retry; return this; } diff --git a/common/src/main/java/tech/ydb/common/retry/YdbRetryConfig.java b/common/src/main/java/tech/ydb/common/retry/YdbRetryConfig.java index a6600f4e4..e68fda2ba 100644 --- a/common/src/main/java/tech/ydb/common/retry/YdbRetryConfig.java +++ b/common/src/main/java/tech/ydb/common/retry/YdbRetryConfig.java @@ -7,22 +7,22 @@ * @author Aleksandr Gorshenin */ class YdbRetryConfig implements RetryConfig { - private final boolean idempotent; + private final boolean retryConditionally; private final boolean retryNotFound; private final RetryPolicy immediatelly; private final RetryPolicy fast; private final RetryPolicy slow; - YdbRetryConfig(boolean idempotent, boolean notFound, RetryPolicy immediatelly, RetryPolicy fast, RetryPolicy slow) { - this.idempotent = idempotent; + YdbRetryConfig(boolean conditionally, boolean notFound, RetryPolicy instant, RetryPolicy fast, RetryPolicy slow) { + this.retryConditionally = conditionally; this.retryNotFound = notFound; - this.immediatelly = immediatelly; + this.immediatelly = instant; this.fast = fast; this.slow = slow; } @Override - public RetryPolicy isStatusRetryable(StatusCode code) { + public RetryPolicy getStatusCodeRetryPolicy(StatusCode code) { if (code == null) { return null; } @@ -43,14 +43,14 @@ public RetryPolicy isStatusRetryable(StatusCode code) { case CLIENT_RESOURCE_EXHAUSTED: return slow; - // Conditionally retry + // Conditionally retryable statuses case CLIENT_CANCELLED: case CLIENT_INTERNAL_ERROR: case TRANSPORT_UNAVAILABLE: case UNAVAILABLE: - return idempotent ? fast : null; + return retryConditionally ? fast : null; - // Not found retry + // Not found has special flag for retries case NOT_FOUND: return retryNotFound ? fast : null; diff --git a/common/src/test/java/tech/ydb/common/retry/RetryConfigTest.java b/common/src/test/java/tech/ydb/common/retry/RetryConfigTest.java index e8f132830..3c8d4b6a7 100644 --- a/common/src/test/java/tech/ydb/common/retry/RetryConfigTest.java +++ b/common/src/test/java/tech/ydb/common/retry/RetryConfigTest.java @@ -21,27 +21,27 @@ private void assertDuration(long from, long to, long ms) { public void nullStatusesTest() { RetryConfig config = RetryConfig.retryForever(); - Assert.assertNull(config.isThrowableRetryable(null)); - Assert.assertNull(config.isStatusRetryable(null)); + Assert.assertNull(config.getThrowableRetryPolicy(null)); + Assert.assertNull(config.getStatusCodeRetryPolicy(null)); } @Test public void throwableRetriesTest() { RetryConfig config = RetryConfig.retryUntilElapsed(1000); - Assert.assertNull(config.isThrowableRetryable(new RuntimeException("test message"))); - Assert.assertNull(config.isThrowableRetryable(new Exception("1", new RuntimeException("2")))); + Assert.assertNull(config.getThrowableRetryPolicy(new RuntimeException("test message"))); + Assert.assertNull(config.getThrowableRetryPolicy(new Exception("1", new RuntimeException("2")))); - RetryPolicy immediatelly = config.isStatusRetryable(StatusCode.BAD_SESSION); - RetryPolicy fast = config.isStatusRetryable(StatusCode.NOT_FOUND); + RetryPolicy immediatelly = config.getStatusCodeRetryPolicy(StatusCode.BAD_SESSION); + RetryPolicy fast = config.getStatusCodeRetryPolicy(StatusCode.NOT_FOUND); - Assert.assertEquals(immediatelly, config.isThrowableRetryable( + Assert.assertEquals(immediatelly, config.getThrowableRetryPolicy( new UnexpectedResultException("base", Status.of(StatusCode.BAD_SESSION))) ); - Assert.assertEquals(immediatelly, config.isThrowableRetryable(new Exception("base", + Assert.assertEquals(immediatelly, config.getThrowableRetryPolicy(new Exception("base", new UnexpectedResultException("cause", Status.of(StatusCode.SESSION_BUSY))) )); - Assert.assertEquals(fast, config.isThrowableRetryable(new Exception("base", + Assert.assertEquals(fast, config.getThrowableRetryPolicy(new Exception("base", new UnexpectedResultException("cause", Status.of(StatusCode.NOT_FOUND))) )); } @@ -51,7 +51,7 @@ public void noRetryPolicyTest() { RetryConfig config = RetryConfig.noRetries(); // unretrayable for (StatusCode code: StatusCode.values()) { - Assert.assertNull(config.isStatusRetryable(code)); + Assert.assertNull(config.getStatusCodeRetryPolicy(code)); } } @@ -60,26 +60,26 @@ public void nonIdempotentRetryPolicyTest() { RetryConfig config = RetryConfig.retryForever(); // unretrayable - Assert.assertNull(config.isStatusRetryable(StatusCode.SCHEME_ERROR)); - Assert.assertNull(config.isStatusRetryable(StatusCode.ALREADY_EXISTS)); - Assert.assertNull(config.isStatusRetryable(StatusCode.UNAUTHORIZED)); - Assert.assertNull(config.isStatusRetryable(StatusCode.UNAVAILABLE)); - Assert.assertNull(config.isStatusRetryable(StatusCode.TRANSPORT_UNAVAILABLE)); - Assert.assertNull(config.isStatusRetryable(StatusCode.CLIENT_CANCELLED)); - Assert.assertNull(config.isStatusRetryable(StatusCode.CLIENT_INTERNAL_ERROR)); - Assert.assertNull(config.isStatusRetryable(StatusCode.NOT_FOUND)); - - RetryPolicy immediatelly = config.isStatusRetryable(StatusCode.BAD_SESSION); + Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.SCHEME_ERROR)); + Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.ALREADY_EXISTS)); + Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.UNAUTHORIZED)); + Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.UNAVAILABLE)); + Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.TRANSPORT_UNAVAILABLE)); + Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.CLIENT_CANCELLED)); + Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.CLIENT_INTERNAL_ERROR)); + Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.NOT_FOUND)); + + RetryPolicy immediatelly = config.getStatusCodeRetryPolicy(StatusCode.BAD_SESSION); Assert.assertNotNull(immediatelly); - Assert.assertEquals(immediatelly, config.isStatusRetryable(StatusCode.SESSION_BUSY)); + Assert.assertEquals(immediatelly, config.getStatusCodeRetryPolicy(StatusCode.SESSION_BUSY)); - RetryPolicy fast = config.isStatusRetryable(StatusCode.ABORTED); + RetryPolicy fast = config.getStatusCodeRetryPolicy(StatusCode.ABORTED); Assert.assertNotNull(fast); - Assert.assertEquals(fast, config.isStatusRetryable(StatusCode.UNDETERMINED)); + Assert.assertEquals(fast, config.getStatusCodeRetryPolicy(StatusCode.UNDETERMINED)); - RetryPolicy slow = config.isStatusRetryable(StatusCode.OVERLOADED); + RetryPolicy slow = config.getStatusCodeRetryPolicy(StatusCode.OVERLOADED); Assert.assertNotNull(slow); - Assert.assertEquals(slow, config.isStatusRetryable(StatusCode.CLIENT_RESOURCE_EXHAUSTED)); + Assert.assertEquals(slow, config.getStatusCodeRetryPolicy(StatusCode.CLIENT_RESOURCE_EXHAUSTED)); } @Test @@ -87,53 +87,53 @@ public void idempotentRetryPolicyTest() { RetryConfig config = RetryConfig.idempotentRetryForever(); // unretrayable - Assert.assertNull(config.isStatusRetryable(StatusCode.SCHEME_ERROR)); - Assert.assertNull(config.isStatusRetryable(StatusCode.ALREADY_EXISTS)); - Assert.assertNull(config.isStatusRetryable(StatusCode.UNAUTHORIZED)); - Assert.assertNull(config.isStatusRetryable(StatusCode.NOT_FOUND)); + Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.SCHEME_ERROR)); + Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.ALREADY_EXISTS)); + Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.UNAUTHORIZED)); + Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.NOT_FOUND)); - RetryPolicy immediatelly = config.isStatusRetryable(StatusCode.BAD_SESSION); + RetryPolicy immediatelly = config.getStatusCodeRetryPolicy(StatusCode.BAD_SESSION); Assert.assertNotNull(immediatelly); - Assert.assertEquals(immediatelly, config.isStatusRetryable(StatusCode.SESSION_BUSY)); + Assert.assertEquals(immediatelly, config.getStatusCodeRetryPolicy(StatusCode.SESSION_BUSY)); - RetryPolicy fast = config.isStatusRetryable(StatusCode.ABORTED); + RetryPolicy fast = config.getStatusCodeRetryPolicy(StatusCode.ABORTED); Assert.assertNotNull(fast); - Assert.assertEquals(fast, config.isStatusRetryable(StatusCode.UNDETERMINED)); - Assert.assertEquals(fast, config.isStatusRetryable(StatusCode.UNAVAILABLE)); - Assert.assertEquals(fast, config.isStatusRetryable(StatusCode.TRANSPORT_UNAVAILABLE)); - Assert.assertEquals(fast, config.isStatusRetryable(StatusCode.CLIENT_CANCELLED)); - Assert.assertEquals(fast, config.isStatusRetryable(StatusCode.CLIENT_INTERNAL_ERROR)); + Assert.assertEquals(fast, config.getStatusCodeRetryPolicy(StatusCode.UNDETERMINED)); + Assert.assertEquals(fast, config.getStatusCodeRetryPolicy(StatusCode.UNAVAILABLE)); + Assert.assertEquals(fast, config.getStatusCodeRetryPolicy(StatusCode.TRANSPORT_UNAVAILABLE)); + Assert.assertEquals(fast, config.getStatusCodeRetryPolicy(StatusCode.CLIENT_CANCELLED)); + Assert.assertEquals(fast, config.getStatusCodeRetryPolicy(StatusCode.CLIENT_INTERNAL_ERROR)); - RetryPolicy slow = config.isStatusRetryable(StatusCode.OVERLOADED); + RetryPolicy slow = config.getStatusCodeRetryPolicy(StatusCode.OVERLOADED); Assert.assertNotNull(slow); - Assert.assertEquals(slow, config.isStatusRetryable(StatusCode.CLIENT_RESOURCE_EXHAUSTED)); + Assert.assertEquals(slow, config.getStatusCodeRetryPolicy(StatusCode.CLIENT_RESOURCE_EXHAUSTED)); } @Test public void notFoundRetryPolicyTest() { RetryConfig config = RetryConfig.newConfig().retryNotFound(true).retryForever(); - RetryPolicy fast = config.isStatusRetryable(StatusCode.ABORTED); - Assert.assertEquals(fast, config.isStatusRetryable(StatusCode.NOT_FOUND)); + RetryPolicy fast = config.getStatusCodeRetryPolicy(StatusCode.ABORTED); + Assert.assertEquals(fast, config.getStatusCodeRetryPolicy(StatusCode.NOT_FOUND)); } @Test public void foreverRetryTest() { RetryConfig config = RetryConfig.newConfig().withSlowBackoff(100, 5).withFastBackoff(10, 10).retryForever(); - RetryPolicy immediatelly = config.isStatusRetryable(StatusCode.BAD_SESSION); + RetryPolicy immediatelly = config.getStatusCodeRetryPolicy(StatusCode.BAD_SESSION); Assert.assertEquals(0, immediatelly.nextRetryMs(0, 0)); Assert.assertEquals(0, immediatelly.nextRetryMs(0, Integer.MAX_VALUE)); Assert.assertEquals(0, immediatelly.nextRetryMs(Integer.MAX_VALUE, 0)); Assert.assertEquals(0, immediatelly.nextRetryMs(Integer.MAX_VALUE, Integer.MAX_VALUE)); - RetryPolicy fast = config.isStatusRetryable(StatusCode.ABORTED); + RetryPolicy fast = config.getStatusCodeRetryPolicy(StatusCode.ABORTED); assertDuration(10, 20, fast.nextRetryMs(0, 0)); assertDuration(10, 20, fast.nextRetryMs(0, Integer.MAX_VALUE)); assertDuration(10240, 20480, fast.nextRetryMs(Integer.MAX_VALUE, 0)); assertDuration(10240, 20480, fast.nextRetryMs(Integer.MAX_VALUE, Integer.MAX_VALUE)); - RetryPolicy slow = config.isStatusRetryable(StatusCode.OVERLOADED); + RetryPolicy slow = config.getStatusCodeRetryPolicy(StatusCode.OVERLOADED); assertDuration(100, 200, slow.nextRetryMs(0, 0)); assertDuration(100, 200, slow.nextRetryMs(0, Integer.MAX_VALUE)); assertDuration(3200, 6400, slow.nextRetryMs(Integer.MAX_VALUE, 0)); @@ -144,7 +144,7 @@ public void foreverRetryTest() { public void untilElapsedRetryTest() { RetryConfig config = RetryConfig.idempotentRetryUntilElapsed(5000); - RetryPolicy immediatelly = config.isStatusRetryable(StatusCode.BAD_SESSION); + RetryPolicy immediatelly = config.getStatusCodeRetryPolicy(StatusCode.BAD_SESSION); Assert.assertEquals(0, immediatelly.nextRetryMs(0, 0)); Assert.assertEquals(0, immediatelly.nextRetryMs(0, 5000)); Assert.assertEquals(0, immediatelly.nextRetryMs(Integer.MAX_VALUE, 0)); @@ -152,14 +152,14 @@ public void untilElapsedRetryTest() { Assert.assertEquals(-1, immediatelly.nextRetryMs(0, 5001)); Assert.assertEquals(-1, immediatelly.nextRetryMs(Integer.MAX_VALUE, 5001)); - RetryPolicy fast = config.isStatusRetryable(StatusCode.ABORTED); + RetryPolicy fast = config.getStatusCodeRetryPolicy(StatusCode.ABORTED); assertDuration(5, 10, fast.nextRetryMs(0, 0)); Assert.assertEquals(3, fast.nextRetryMs(0, 4997)); Assert.assertEquals(5000, fast.nextRetryMs(Integer.MAX_VALUE, 0)); Assert.assertEquals(1, fast.nextRetryMs(Integer.MAX_VALUE, 4999)); Assert.assertEquals(-1, fast.nextRetryMs(Integer.MAX_VALUE, 5000)); - RetryPolicy slow = config.isStatusRetryable(StatusCode.OVERLOADED); + RetryPolicy slow = config.getStatusCodeRetryPolicy(StatusCode.OVERLOADED); assertDuration(500, 1000, slow.nextRetryMs(0, 0)); Assert.assertEquals(3, slow.nextRetryMs(0, 4997)); Assert.assertEquals(5000, slow.nextRetryMs(Integer.MAX_VALUE, 0)); @@ -171,7 +171,7 @@ public void untilElapsedRetryTest() { public void nTimesRetryTest() { RetryConfig config = RetryConfig.newConfig().retryNTimes(8); - RetryPolicy immediatelly = config.isStatusRetryable(StatusCode.BAD_SESSION); + RetryPolicy immediatelly = config.getStatusCodeRetryPolicy(StatusCode.BAD_SESSION); Assert.assertEquals(0, immediatelly.nextRetryMs(0, 0)); Assert.assertEquals(0, immediatelly.nextRetryMs(0, Integer.MAX_VALUE)); Assert.assertEquals(0, immediatelly.nextRetryMs(7, 0)); @@ -179,7 +179,7 @@ public void nTimesRetryTest() { Assert.assertEquals(-1, immediatelly.nextRetryMs(8, 0)); Assert.assertEquals(-1, immediatelly.nextRetryMs(8, Integer.MAX_VALUE)); - RetryPolicy fast = config.isStatusRetryable(StatusCode.ABORTED); + RetryPolicy fast = config.getStatusCodeRetryPolicy(StatusCode.ABORTED); assertDuration(5, 10, fast.nextRetryMs(0, 0)); assertDuration(5, 10, fast.nextRetryMs(0, Integer.MAX_VALUE)); assertDuration(5 * 128, 5 * 256, fast.nextRetryMs(7, 0)); @@ -187,7 +187,7 @@ public void nTimesRetryTest() { Assert.assertEquals(-1, fast.nextRetryMs(8, 0)); Assert.assertEquals(-1, fast.nextRetryMs(8, Integer.MAX_VALUE)); - RetryPolicy slow = config.isStatusRetryable(StatusCode.OVERLOADED); + RetryPolicy slow = config.getStatusCodeRetryPolicy(StatusCode.OVERLOADED); assertDuration(500, 1000, slow.nextRetryMs(0, 0)); assertDuration(500, 1000, slow.nextRetryMs(0, Integer.MAX_VALUE)); assertDuration(500 * 64, 500 * 128, slow.nextRetryMs(7, 0)); diff --git a/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java b/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java index f8d22def3..6f75cb62f 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java +++ b/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java @@ -21,12 +21,12 @@ public abstract class GrpcStreamRetrier { public static final RetryConfig RETRY_ALL = new RetryConfig() { @Override - public RetryPolicy isStatusRetryable(StatusCode code) { + public RetryPolicy getStatusCodeRetryPolicy(StatusCode code) { return RETRY_ALL_POLICY; } @Override - public RetryPolicy isThrowableRetryable(Throwable th) { + public RetryPolicy getThrowableRetryPolicy(Throwable th) { return RETRY_ALL_POLICY; } }; @@ -116,7 +116,7 @@ protected void onSessionClosed(Status status, Throwable th) { RetryPolicy retryPolicy; if (th != null) { logger.error("[{}] Exception in {} stream session: ", id, getStreamName(), th); - retryPolicy = retryConfig.isThrowableRetryable(th); + retryPolicy = retryConfig.getThrowableRetryPolicy(th); } else { if (status.isSuccess()) { if (isStopped.get()) { @@ -128,7 +128,7 @@ protected void onSessionClosed(Status status, Throwable th) { } else { logger.warn("[{}] Error in {} stream session: {}", id, getStreamName(), status); } - retryPolicy = retryConfig.isStatusRetryable(status.getCode()); + retryPolicy = retryConfig.getStatusCodeRetryPolicy(status.getCode()); } if (isStopped.get()) { From ca70f87d4f8ad370535891ffb301a8132ea19156 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Wed, 29 Jan 2025 09:16:14 +0000 Subject: [PATCH 08/10] Update retry policy to use Status insted of StatusCode --- .../tech/ydb/common/retry/RetryConfig.java | 16 ++-- .../tech/ydb/common/retry/RetryPolicy.java | 1 + .../tech/ydb/common/retry/YdbRetryConfig.java | 8 +- .../ydb/common/retry/RetryConfigTest.java | 88 +++++++++---------- .../ydb/topic/impl/GrpcStreamRetrier.java | 5 +- 5 files changed, 59 insertions(+), 59 deletions(-) diff --git a/common/src/main/java/tech/ydb/common/retry/RetryConfig.java b/common/src/main/java/tech/ydb/common/retry/RetryConfig.java index 6774afc2e..e42e8a856 100644 --- a/common/src/main/java/tech/ydb/common/retry/RetryConfig.java +++ b/common/src/main/java/tech/ydb/common/retry/RetryConfig.java @@ -1,11 +1,11 @@ package tech.ydb.common.retry; -import tech.ydb.core.StatusCode; +import tech.ydb.core.Status; import tech.ydb.core.UnexpectedResultException; /** * Recipes should use the retry configuration to decide how to retry - * errors like unsuccessful {@link tech.ydb.core.StatusCode}. + * errors like unsuccessful {@link tech.ydb.core.Status}. * * @author Aleksandr Gorshenin */ @@ -13,12 +13,12 @@ public interface RetryConfig { /** - * Returns retry policy for the given status code and {@code null} if that status code is not retryable + * Returns retry policy for the given {@link Status} and {@code null} if that status is not retryable * - * @param code status code to check - * @return policy of retries or {@code null} if the status code is not retryable + * @param code status to check + * @return policy of retries or {@code null} if the status is not retryable */ - RetryPolicy getStatusCodeRetryPolicy(StatusCode code); + RetryPolicy getStatusRetryPolicy(Status code); /** * Returns retry policy for the given exception and {@code null} if that exception is not retryable @@ -29,7 +29,7 @@ public interface RetryConfig { default RetryPolicy getThrowableRetryPolicy(Throwable th) { for (Throwable ex = th; ex != null; ex = ex.getCause()) { if (ex instanceof UnexpectedResultException) { - return getStatusCodeRetryPolicy(((UnexpectedResultException) ex).getStatus().getCode()); + return getStatusRetryPolicy(((UnexpectedResultException) ex).getStatus()); } } return null; @@ -79,7 +79,7 @@ static RetryConfig idempotentRetryUntilElapsed(long maxElapsedMs) { * @return retry configuration object */ static RetryConfig noRetries() { - return (StatusCode code) -> null; + return (Status status) -> null; } /** diff --git a/common/src/main/java/tech/ydb/common/retry/RetryPolicy.java b/common/src/main/java/tech/ydb/common/retry/RetryPolicy.java index 83a3010e2..9095482a2 100644 --- a/common/src/main/java/tech/ydb/common/retry/RetryPolicy.java +++ b/common/src/main/java/tech/ydb/common/retry/RetryPolicy.java @@ -5,6 +5,7 @@ * * @author Aleksandr Gorshenin */ +@FunctionalInterface public interface RetryPolicy { /** * Called when an operation is failed for some reason to determine if it should be retried. diff --git a/common/src/main/java/tech/ydb/common/retry/YdbRetryConfig.java b/common/src/main/java/tech/ydb/common/retry/YdbRetryConfig.java index e68fda2ba..754a2e32b 100644 --- a/common/src/main/java/tech/ydb/common/retry/YdbRetryConfig.java +++ b/common/src/main/java/tech/ydb/common/retry/YdbRetryConfig.java @@ -1,6 +1,6 @@ package tech.ydb.common.retry; -import tech.ydb.core.StatusCode; +import tech.ydb.core.Status; /** * @@ -22,12 +22,12 @@ class YdbRetryConfig implements RetryConfig { } @Override - public RetryPolicy getStatusCodeRetryPolicy(StatusCode code) { - if (code == null) { + public RetryPolicy getStatusRetryPolicy(Status status) { + if (status == null) { return null; } - switch (code) { + switch (status.getCode()) { // Instant retry case BAD_SESSION: case SESSION_BUSY: diff --git a/common/src/test/java/tech/ydb/common/retry/RetryConfigTest.java b/common/src/test/java/tech/ydb/common/retry/RetryConfigTest.java index 3c8d4b6a7..93774eef6 100644 --- a/common/src/test/java/tech/ydb/common/retry/RetryConfigTest.java +++ b/common/src/test/java/tech/ydb/common/retry/RetryConfigTest.java @@ -22,7 +22,7 @@ public void nullStatusesTest() { RetryConfig config = RetryConfig.retryForever(); Assert.assertNull(config.getThrowableRetryPolicy(null)); - Assert.assertNull(config.getStatusCodeRetryPolicy(null)); + Assert.assertNull(config.getStatusRetryPolicy(null)); } @Test @@ -32,8 +32,8 @@ public void throwableRetriesTest() { Assert.assertNull(config.getThrowableRetryPolicy(new RuntimeException("test message"))); Assert.assertNull(config.getThrowableRetryPolicy(new Exception("1", new RuntimeException("2")))); - RetryPolicy immediatelly = config.getStatusCodeRetryPolicy(StatusCode.BAD_SESSION); - RetryPolicy fast = config.getStatusCodeRetryPolicy(StatusCode.NOT_FOUND); + RetryPolicy immediatelly = config.getStatusRetryPolicy(Status.of(StatusCode.BAD_SESSION)); + RetryPolicy fast = config.getStatusRetryPolicy(Status.of(StatusCode.NOT_FOUND)); Assert.assertEquals(immediatelly, config.getThrowableRetryPolicy( new UnexpectedResultException("base", Status.of(StatusCode.BAD_SESSION))) @@ -51,7 +51,7 @@ public void noRetryPolicyTest() { RetryConfig config = RetryConfig.noRetries(); // unretrayable for (StatusCode code: StatusCode.values()) { - Assert.assertNull(config.getStatusCodeRetryPolicy(code)); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(code))); } } @@ -60,26 +60,26 @@ public void nonIdempotentRetryPolicyTest() { RetryConfig config = RetryConfig.retryForever(); // unretrayable - Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.SCHEME_ERROR)); - Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.ALREADY_EXISTS)); - Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.UNAUTHORIZED)); - Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.UNAVAILABLE)); - Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.TRANSPORT_UNAVAILABLE)); - Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.CLIENT_CANCELLED)); - Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.CLIENT_INTERNAL_ERROR)); - Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.NOT_FOUND)); - - RetryPolicy immediatelly = config.getStatusCodeRetryPolicy(StatusCode.BAD_SESSION); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.SCHEME_ERROR))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.ALREADY_EXISTS))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.UNAUTHORIZED))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.UNAVAILABLE))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.TRANSPORT_UNAVAILABLE))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.CLIENT_CANCELLED))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.CLIENT_INTERNAL_ERROR))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.NOT_FOUND))); + + RetryPolicy immediatelly = config.getStatusRetryPolicy(Status.of(StatusCode.BAD_SESSION)); Assert.assertNotNull(immediatelly); - Assert.assertEquals(immediatelly, config.getStatusCodeRetryPolicy(StatusCode.SESSION_BUSY)); + Assert.assertEquals(immediatelly, config.getStatusRetryPolicy(Status.of(StatusCode.SESSION_BUSY))); - RetryPolicy fast = config.getStatusCodeRetryPolicy(StatusCode.ABORTED); + RetryPolicy fast = config.getStatusRetryPolicy(Status.of(StatusCode.ABORTED)); Assert.assertNotNull(fast); - Assert.assertEquals(fast, config.getStatusCodeRetryPolicy(StatusCode.UNDETERMINED)); + Assert.assertEquals(fast, config.getStatusRetryPolicy(Status.of(StatusCode.UNDETERMINED))); - RetryPolicy slow = config.getStatusCodeRetryPolicy(StatusCode.OVERLOADED); + RetryPolicy slow = config.getStatusRetryPolicy(Status.of(StatusCode.OVERLOADED)); Assert.assertNotNull(slow); - Assert.assertEquals(slow, config.getStatusCodeRetryPolicy(StatusCode.CLIENT_RESOURCE_EXHAUSTED)); + Assert.assertEquals(slow, config.getStatusRetryPolicy(Status.of(StatusCode.CLIENT_RESOURCE_EXHAUSTED))); } @Test @@ -87,53 +87,53 @@ public void idempotentRetryPolicyTest() { RetryConfig config = RetryConfig.idempotentRetryForever(); // unretrayable - Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.SCHEME_ERROR)); - Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.ALREADY_EXISTS)); - Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.UNAUTHORIZED)); - Assert.assertNull(config.getStatusCodeRetryPolicy(StatusCode.NOT_FOUND)); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.SCHEME_ERROR))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.ALREADY_EXISTS))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.UNAUTHORIZED))); + Assert.assertNull(config.getStatusRetryPolicy(Status.of(StatusCode.NOT_FOUND))); - RetryPolicy immediatelly = config.getStatusCodeRetryPolicy(StatusCode.BAD_SESSION); + RetryPolicy immediatelly = config.getStatusRetryPolicy(Status.of(StatusCode.BAD_SESSION)); Assert.assertNotNull(immediatelly); - Assert.assertEquals(immediatelly, config.getStatusCodeRetryPolicy(StatusCode.SESSION_BUSY)); + Assert.assertEquals(immediatelly, config.getStatusRetryPolicy(Status.of(StatusCode.SESSION_BUSY))); - RetryPolicy fast = config.getStatusCodeRetryPolicy(StatusCode.ABORTED); + RetryPolicy fast = config.getStatusRetryPolicy(Status.of(StatusCode.ABORTED)); Assert.assertNotNull(fast); - Assert.assertEquals(fast, config.getStatusCodeRetryPolicy(StatusCode.UNDETERMINED)); - Assert.assertEquals(fast, config.getStatusCodeRetryPolicy(StatusCode.UNAVAILABLE)); - Assert.assertEquals(fast, config.getStatusCodeRetryPolicy(StatusCode.TRANSPORT_UNAVAILABLE)); - Assert.assertEquals(fast, config.getStatusCodeRetryPolicy(StatusCode.CLIENT_CANCELLED)); - Assert.assertEquals(fast, config.getStatusCodeRetryPolicy(StatusCode.CLIENT_INTERNAL_ERROR)); + Assert.assertEquals(fast, config.getStatusRetryPolicy(Status.of(StatusCode.UNDETERMINED))); + Assert.assertEquals(fast, config.getStatusRetryPolicy(Status.of(StatusCode.UNAVAILABLE))); + Assert.assertEquals(fast, config.getStatusRetryPolicy(Status.of(StatusCode.TRANSPORT_UNAVAILABLE))); + Assert.assertEquals(fast, config.getStatusRetryPolicy(Status.of(StatusCode.CLIENT_CANCELLED))); + Assert.assertEquals(fast, config.getStatusRetryPolicy(Status.of(StatusCode.CLIENT_INTERNAL_ERROR))); - RetryPolicy slow = config.getStatusCodeRetryPolicy(StatusCode.OVERLOADED); + RetryPolicy slow = config.getStatusRetryPolicy(Status.of(StatusCode.OVERLOADED)); Assert.assertNotNull(slow); - Assert.assertEquals(slow, config.getStatusCodeRetryPolicy(StatusCode.CLIENT_RESOURCE_EXHAUSTED)); + Assert.assertEquals(slow, config.getStatusRetryPolicy(Status.of(StatusCode.CLIENT_RESOURCE_EXHAUSTED))); } @Test public void notFoundRetryPolicyTest() { RetryConfig config = RetryConfig.newConfig().retryNotFound(true).retryForever(); - RetryPolicy fast = config.getStatusCodeRetryPolicy(StatusCode.ABORTED); - Assert.assertEquals(fast, config.getStatusCodeRetryPolicy(StatusCode.NOT_FOUND)); + RetryPolicy fast = config.getStatusRetryPolicy(Status.of(StatusCode.ABORTED)); + Assert.assertEquals(fast, config.getStatusRetryPolicy(Status.of(StatusCode.NOT_FOUND))); } @Test public void foreverRetryTest() { RetryConfig config = RetryConfig.newConfig().withSlowBackoff(100, 5).withFastBackoff(10, 10).retryForever(); - RetryPolicy immediatelly = config.getStatusCodeRetryPolicy(StatusCode.BAD_SESSION); + RetryPolicy immediatelly = config.getStatusRetryPolicy(Status.of(StatusCode.BAD_SESSION)); Assert.assertEquals(0, immediatelly.nextRetryMs(0, 0)); Assert.assertEquals(0, immediatelly.nextRetryMs(0, Integer.MAX_VALUE)); Assert.assertEquals(0, immediatelly.nextRetryMs(Integer.MAX_VALUE, 0)); Assert.assertEquals(0, immediatelly.nextRetryMs(Integer.MAX_VALUE, Integer.MAX_VALUE)); - RetryPolicy fast = config.getStatusCodeRetryPolicy(StatusCode.ABORTED); + RetryPolicy fast = config.getStatusRetryPolicy(Status.of(StatusCode.ABORTED)); assertDuration(10, 20, fast.nextRetryMs(0, 0)); assertDuration(10, 20, fast.nextRetryMs(0, Integer.MAX_VALUE)); assertDuration(10240, 20480, fast.nextRetryMs(Integer.MAX_VALUE, 0)); assertDuration(10240, 20480, fast.nextRetryMs(Integer.MAX_VALUE, Integer.MAX_VALUE)); - RetryPolicy slow = config.getStatusCodeRetryPolicy(StatusCode.OVERLOADED); + RetryPolicy slow = config.getStatusRetryPolicy(Status.of(StatusCode.OVERLOADED)); assertDuration(100, 200, slow.nextRetryMs(0, 0)); assertDuration(100, 200, slow.nextRetryMs(0, Integer.MAX_VALUE)); assertDuration(3200, 6400, slow.nextRetryMs(Integer.MAX_VALUE, 0)); @@ -144,7 +144,7 @@ public void foreverRetryTest() { public void untilElapsedRetryTest() { RetryConfig config = RetryConfig.idempotentRetryUntilElapsed(5000); - RetryPolicy immediatelly = config.getStatusCodeRetryPolicy(StatusCode.BAD_SESSION); + RetryPolicy immediatelly = config.getStatusRetryPolicy(Status.of(StatusCode.BAD_SESSION)); Assert.assertEquals(0, immediatelly.nextRetryMs(0, 0)); Assert.assertEquals(0, immediatelly.nextRetryMs(0, 5000)); Assert.assertEquals(0, immediatelly.nextRetryMs(Integer.MAX_VALUE, 0)); @@ -152,14 +152,14 @@ public void untilElapsedRetryTest() { Assert.assertEquals(-1, immediatelly.nextRetryMs(0, 5001)); Assert.assertEquals(-1, immediatelly.nextRetryMs(Integer.MAX_VALUE, 5001)); - RetryPolicy fast = config.getStatusCodeRetryPolicy(StatusCode.ABORTED); + RetryPolicy fast = config.getStatusRetryPolicy(Status.of(StatusCode.ABORTED)); assertDuration(5, 10, fast.nextRetryMs(0, 0)); Assert.assertEquals(3, fast.nextRetryMs(0, 4997)); Assert.assertEquals(5000, fast.nextRetryMs(Integer.MAX_VALUE, 0)); Assert.assertEquals(1, fast.nextRetryMs(Integer.MAX_VALUE, 4999)); Assert.assertEquals(-1, fast.nextRetryMs(Integer.MAX_VALUE, 5000)); - RetryPolicy slow = config.getStatusCodeRetryPolicy(StatusCode.OVERLOADED); + RetryPolicy slow = config.getStatusRetryPolicy(Status.of(StatusCode.OVERLOADED)); assertDuration(500, 1000, slow.nextRetryMs(0, 0)); Assert.assertEquals(3, slow.nextRetryMs(0, 4997)); Assert.assertEquals(5000, slow.nextRetryMs(Integer.MAX_VALUE, 0)); @@ -171,7 +171,7 @@ public void untilElapsedRetryTest() { public void nTimesRetryTest() { RetryConfig config = RetryConfig.newConfig().retryNTimes(8); - RetryPolicy immediatelly = config.getStatusCodeRetryPolicy(StatusCode.BAD_SESSION); + RetryPolicy immediatelly = config.getStatusRetryPolicy(Status.of(StatusCode.BAD_SESSION)); Assert.assertEquals(0, immediatelly.nextRetryMs(0, 0)); Assert.assertEquals(0, immediatelly.nextRetryMs(0, Integer.MAX_VALUE)); Assert.assertEquals(0, immediatelly.nextRetryMs(7, 0)); @@ -179,7 +179,7 @@ public void nTimesRetryTest() { Assert.assertEquals(-1, immediatelly.nextRetryMs(8, 0)); Assert.assertEquals(-1, immediatelly.nextRetryMs(8, Integer.MAX_VALUE)); - RetryPolicy fast = config.getStatusCodeRetryPolicy(StatusCode.ABORTED); + RetryPolicy fast = config.getStatusRetryPolicy(Status.of(StatusCode.ABORTED)); assertDuration(5, 10, fast.nextRetryMs(0, 0)); assertDuration(5, 10, fast.nextRetryMs(0, Integer.MAX_VALUE)); assertDuration(5 * 128, 5 * 256, fast.nextRetryMs(7, 0)); @@ -187,7 +187,7 @@ public void nTimesRetryTest() { Assert.assertEquals(-1, fast.nextRetryMs(8, 0)); Assert.assertEquals(-1, fast.nextRetryMs(8, Integer.MAX_VALUE)); - RetryPolicy slow = config.getStatusCodeRetryPolicy(StatusCode.OVERLOADED); + RetryPolicy slow = config.getStatusRetryPolicy(Status.of(StatusCode.OVERLOADED)); assertDuration(500, 1000, slow.nextRetryMs(0, 0)); assertDuration(500, 1000, slow.nextRetryMs(0, Integer.MAX_VALUE)); assertDuration(500 * 64, 500 * 128, slow.nextRetryMs(7, 0)); diff --git a/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java b/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java index 6f75cb62f..7cf1e980d 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java +++ b/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java @@ -13,7 +13,6 @@ import tech.ydb.common.retry.RetryConfig; import tech.ydb.common.retry.RetryPolicy; import tech.ydb.core.Status; -import tech.ydb.core.StatusCode; /** * @author Nikolay Perfilov @@ -21,7 +20,7 @@ public abstract class GrpcStreamRetrier { public static final RetryConfig RETRY_ALL = new RetryConfig() { @Override - public RetryPolicy getStatusCodeRetryPolicy(StatusCode code) { + public RetryPolicy getStatusRetryPolicy(Status status) { return RETRY_ALL_POLICY; } @@ -128,7 +127,7 @@ protected void onSessionClosed(Status status, Throwable th) { } else { logger.warn("[{}] Error in {} stream session: {}", id, getStreamName(), status); } - retryPolicy = retryConfig.getStatusCodeRetryPolicy(status.getCode()); + retryPolicy = retryConfig.getStatusRetryPolicy(status); } if (isStopped.get()) { From 9859e4f7277608dc719ee9f25b50ef6fcc55ce51 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Fri, 7 Feb 2025 10:18:19 +0000 Subject: [PATCH 09/10] New tests --- .../tech/ydb/common/retry/RetryConfig.java | 13 +- .../impl/YdbTransactionImplTest.java | 11 + .../tech/ydb/topic/impl/BaseMockedTest.java | 23 +- .../tech/ydb/topic/impl/TopicRetriesTest.java | 292 +++++++++++++++--- 4 files changed, 291 insertions(+), 48 deletions(-) diff --git a/common/src/main/java/tech/ydb/common/retry/RetryConfig.java b/common/src/main/java/tech/ydb/common/retry/RetryConfig.java index e42e8a856..bb80fc8c0 100644 --- a/common/src/main/java/tech/ydb/common/retry/RetryConfig.java +++ b/common/src/main/java/tech/ydb/common/retry/RetryConfig.java @@ -1,5 +1,6 @@ package tech.ydb.common.retry; + import tech.ydb.core.Status; import tech.ydb.core.UnexpectedResultException; @@ -15,10 +16,10 @@ public interface RetryConfig { /** * Returns retry policy for the given {@link Status} and {@code null} if that status is not retryable * - * @param code status to check + * @param status status to check * @return policy of retries or {@code null} if the status is not retryable */ - RetryPolicy getStatusRetryPolicy(Status code); + RetryPolicy getStatusRetryPolicy(Status status); /** * Returns retry policy for the given exception and {@code null} if that exception is not retryable @@ -36,7 +37,7 @@ default RetryPolicy getThrowableRetryPolicy(Throwable th) { } /** - * Infinity retries with default exponential delay.
That policy does not retries conditionally + * Infinity retries with default exponential delay.
This policy does not retries conditionally * retryable errors so it can be used for both as idempotent and non idempotent operations * * @return retry configuration object @@ -46,7 +47,7 @@ static RetryConfig retryForever() { } /** - * Retries until the specified elapsed milliseconds expire.
That policy does not retries + * Retries until the specified elapsed milliseconds expire.
This policy does not retries * conditionally retryable errors so it can be used for both as idempotent and non idempotent operations * @param maxElapsedMs maximum timeout for retries * @return retry configuration object @@ -56,7 +57,7 @@ static RetryConfig retryUntilElapsed(long maxElapsedMs) { } /** - * Infinity retries with default exponential delay.
That policy does retries conditionally + * Infinity retries with default exponential delay.
This policy does retries conditionally * retryable errors so it can be used ONLY for idempotent operations * @return retry configuration object */ @@ -65,7 +66,7 @@ static RetryConfig idempotentRetryForever() { } /** - * Retries until the specified elapsed milliseconds expire.
That policy does retries + * Retries until the specified elapsed milliseconds expire.
This policy does retries * conditionally retryable errors so it can be used ONLY for idempotent operations * @param maxElapsedMs maximum timeout for retries * @return retry configuration object diff --git a/common/src/test/java/tech/ydb/common/transaction/impl/YdbTransactionImplTest.java b/common/src/test/java/tech/ydb/common/transaction/impl/YdbTransactionImplTest.java index 9f25e0255..37cd1a1dd 100644 --- a/common/src/test/java/tech/ydb/common/transaction/impl/YdbTransactionImplTest.java +++ b/common/src/test/java/tech/ydb/common/transaction/impl/YdbTransactionImplTest.java @@ -32,4 +32,15 @@ public void baseTest() { Assert.assertTrue(tx.isActive()); Assert.assertFalse(tx.getStatusFuture().isDone()); } + + @Test + public void nullTest() { + MockTx tx = new MockTx(TxMode.NONE, null); + + Assert.assertNull(tx.getId()); + Assert.assertEquals("MOCK", tx.getSessionId()); + Assert.assertEquals(TxMode.NONE, tx.getTxMode()); + Assert.assertFalse(tx.isActive()); + Assert.assertFalse(tx.getStatusFuture().isDone()); + } } diff --git a/topic/src/test/java/tech/ydb/topic/impl/BaseMockedTest.java b/topic/src/test/java/tech/ydb/topic/impl/BaseMockedTest.java index 9deeb4f12..8358aafd2 100644 --- a/topic/src/test/java/tech/ydb/topic/impl/BaseMockedTest.java +++ b/topic/src/test/java/tech/ydb/topic/impl/BaseMockedTest.java @@ -164,6 +164,13 @@ public void responseErrorBadRequest() { observer.onNext(msg); } + public void responseErrorSchemeError() { + YdbTopic.StreamWriteMessage.FromServer msg = YdbTopic.StreamWriteMessage.FromServer.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SCHEME_ERROR) + .build(); + observer.onNext(msg); + } + public void responseInit(long lastSeqNo) { responseInit(lastSeqNo, 123, "mocked", new int[] { 0, 1, 2}); } @@ -205,23 +212,27 @@ public Checker(YdbTopic.StreamWriteMessage.FromClient msg) { } public Checker isInit() { - Assert.assertTrue(msg.hasInitRequest()); + Assert.assertTrue("next msg must be init request", msg.hasInitRequest()); return this; } public Checker hasInitPath(String path) { - Assert.assertEquals(path, msg.getInitRequest().getPath()); + Assert.assertEquals("invalid init request path", path, msg.getInitRequest().getPath()); return this; } public Checker isWrite() { - Assert.assertTrue(msg.hasWriteRequest()); + Assert.assertTrue("next msg must be write request", msg.hasWriteRequest()); return this; } - public Checker hasWrite(int codec, int messagesCount) { - Assert.assertEquals(codec, msg.getWriteRequest().getCodec()); - Assert.assertEquals(messagesCount, msg.getWriteRequest().getMessagesCount()); + public Checker hasWrite(int codec, long... seqnums) { + Assert.assertEquals("invalid write codec", codec, msg.getWriteRequest().getCodec()); + Assert.assertEquals("invalid messages count", seqnums.length, msg.getWriteRequest().getMessagesCount()); + for (int idx = 0; idx < seqnums.length; idx++) { + Assert.assertEquals("invalid msg seqNo " + idx, seqnums[idx], + msg.getWriteRequest().getMessages(idx).getSeqNo()); + } return this; } } diff --git a/topic/src/test/java/tech/ydb/topic/impl/TopicRetriesTest.java b/topic/src/test/java/tech/ydb/topic/impl/TopicRetriesTest.java index ebd260bd3..10c105e82 100644 --- a/topic/src/test/java/tech/ydb/topic/impl/TopicRetriesTest.java +++ b/topic/src/test/java/tech/ydb/topic/impl/TopicRetriesTest.java @@ -1,5 +1,6 @@ package tech.ydb.topic.impl; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -12,8 +13,12 @@ import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; import tech.ydb.topic.settings.WriterSettings; +import tech.ydb.topic.write.AsyncWriter; +import tech.ydb.topic.write.InitResult; import tech.ydb.topic.write.Message; +import tech.ydb.topic.write.QueueOverflowException; import tech.ydb.topic.write.SyncWriter; +import tech.ydb.topic.write.WriteAck; /** * @@ -22,7 +27,7 @@ public class TopicRetriesTest extends BaseMockedTest { @Test - public void defaultRetryWriterTest() throws InterruptedException, ExecutionException, TimeoutException { + public void writerDefaultRetryTest() throws InterruptedException, ExecutionException, TimeoutException { mockStreams() .then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE)) .then(defaultStreamMockAnswer()) @@ -73,7 +78,7 @@ public void defaultRetryWriterTest() throws InterruptedException, ExecutionExcep stream3.responseInit(1); writer.send(Message.of("other-message".getBytes())); - stream3.nextMsg().isWrite().hasWrite(2, 1); + stream3.nextMsg().isWrite().hasWrite(2, 2); stream3.responseWriteWritten(2, 1); writer.flush(); @@ -82,7 +87,7 @@ public void defaultRetryWriterTest() throws InterruptedException, ExecutionExcep } @Test - public void disabledRetryNetworkErrorTest() throws InterruptedException, ExecutionException, TimeoutException { + public void writerNoRetryNetworkErrorTest() throws InterruptedException, ExecutionException, TimeoutException { mockStreams() .then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE)); @@ -106,7 +111,7 @@ public void disabledRetryNetworkErrorTest() throws InterruptedException, Executi } @Test - public void disabledRetryStreamCloseTest() throws InterruptedException, ExecutionException, TimeoutException { + public void writerNoRetryStreamCloseTest() throws InterruptedException, ExecutionException, TimeoutException { WriterSettings settings = WriterSettings.newBuilder() .setTopicPath("/mocked_topic") .setRetryConfig(RetryConfig.noRetries()) @@ -131,7 +136,7 @@ public void disabledRetryStreamCloseTest() throws InterruptedException, Executio } @Test - public void disabledRetryStreamErrorTest() throws InterruptedException, ExecutionException, TimeoutException { + public void writerNoRetryStreamErrorTest() throws InterruptedException, ExecutionException, TimeoutException { WriterSettings settings = WriterSettings.newBuilder() .setTopicPath("/mocked_topic") .setRetryConfig(RetryConfig.noRetries()) @@ -156,31 +161,7 @@ public void disabledRetryStreamErrorTest() throws InterruptedException, Executio } @Test - public void recoverRetryNetworkErrorTest() throws InterruptedException, ExecutionException, TimeoutException { - mockStreams() - .then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE)); - - WriterSettings settings = WriterSettings.newBuilder() - .setTopicPath("/mocked_topic") - .setRetryConfig(RetryConfig.noRetries()) - .build(); - - SyncWriter writer = client.createSyncWriter(settings); - writer.init(); - - // No stream and no retries in scheduler - Assert.assertNull(currentStream()); - getScheduler().hasNoTasks(); - - RuntimeException ex = Assert.assertThrows(RuntimeException.class, - () -> writer.send(Message.of("test-message".getBytes()))); - Assert.assertEquals("Writer is already stopped", ex.getMessage()); - - writer.shutdown(1, TimeUnit.SECONDS); - } - - @Test - public void idempotentRetryWriterTest() throws InterruptedException, ExecutionException, TimeoutException { + public void writerIdempotentRetryTest() throws InterruptedException, ExecutionException, TimeoutException { mockStreams() .then(defaultStreamMockAnswer()) .then(errorStreamMockAnswer(StatusCode.OVERLOADED)) @@ -190,6 +171,7 @@ public void idempotentRetryWriterTest() throws InterruptedException, ExecutionEx SyncWriter writer = client.createSyncWriter(WriterSettings.newBuilder() .setTopicPath("/mocked_topic") + .setProducerId("test-id") .setRetryConfig(RetryConfig.idempotentRetryForever()) .build()); writer.init(); @@ -197,11 +179,11 @@ public void idempotentRetryWriterTest() throws InterruptedException, ExecutionEx MockedWriteStream stream1 = currentStream(); stream1.nextMsg().isInit().hasInitPath("/mocked_topic"); stream1.hasNoNewMessages(); - stream1.responseInit(0); + stream1.responseInit(10); writer.send(Message.of("test-message".getBytes())); - stream1.nextMsg().isWrite().hasWrite(2, 1); - stream1.responseWriteWritten(1, 1); + stream1.nextMsg().isWrite().hasWrite(2, 11); + stream1.responseWriteWritten(11, 1); stream1.complete(new RuntimeException("io exception", new UnexpectedResultException("inner", Status.of(StatusCode.CLIENT_INTERNAL_ERROR))) @@ -222,14 +204,252 @@ public void idempotentRetryWriterTest() throws InterruptedException, ExecutionEx stream2.nextMsg().isInit().hasInitPath("/mocked_topic"); stream2.hasNoNewMessages(); - stream2.responseInit(1); + stream2.responseInit(12); writer.send(Message.of("other-message".getBytes())); - stream2.nextMsg().isWrite().hasWrite(2, 1); - stream2.responseWriteWritten(2, 1); + stream2.nextMsg().isWrite().hasWrite(2, 13); + stream2.responseWriteWritten(13, 1); writer.flush(); writer.shutdown(1, TimeUnit.SECONDS); stream2.complete(Status.SUCCESS); } + + @Test + public void asyncWriterDefaultRetryTest() throws QueueOverflowException { + mockStreams() + .then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE)) + .then(defaultStreamMockAnswer()) + .then(errorStreamMockAnswer(StatusCode.OVERLOADED)) + .then(defaultStreamMockAnswer()); // and repeat + + AsyncWriter writer = client.createAsyncWriter(WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setProducerId("producer-id") + .build()); + CompletableFuture initFuture = writer.init(); + + // Retry #1 - TRANSPORT_UNAVAILABLE + Assert.assertNull(currentStream()); + getScheduler().hasTasks(1).executeNextTasks(1); + + MockedWriteStream stream1 = currentStream(); + stream1.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream1.hasNoNewMessages(); + stream1.responseInit(3); + + Assert.assertTrue(initFuture.isDone()); + Assert.assertEquals(3, initFuture.join().getSeqNo()); + + CompletableFuture m1Future = writer.send(Message.of("test-message".getBytes())); + CompletableFuture m2Future = writer.send(Message.of("test-message2".getBytes())); + + stream1.nextMsg().isWrite().hasWrite(2, 4); // m1 + stream1.nextMsg().isWrite().hasWrite(2, 5); // m2 + + Assert.assertFalse(m1Future.isDone()); + Assert.assertFalse(m2Future.isDone()); + + stream1.responseWriteWritten(4, 1); // ack for m1 + + Assert.assertTrue(m1Future.isDone()); + Assert.assertEquals(4, m1Future.join().getSeqNo()); + Assert.assertFalse(m2Future.isDone()); + + stream1.complete(Status.of(StatusCode.BAD_SESSION)); + + // Retry #2 - Stream is closed by server + getScheduler().hasTasks(1).executeNextTasks(1); + + // Retry #3 - OVERLOADED + getScheduler().hasTasks(1).executeNextTasks(1); + + MockedWriteStream stream2 = currentStream(); + Assert.assertNotEquals(stream1, stream2); + + stream2.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream2.hasNoNewMessages(); + stream2.responseInit(4); + + stream2.nextMsg().isWrite().hasWrite(2, 5); // m2 + + CompletableFuture m3Future = writer.send(Message.of("other-message3".getBytes())); + + stream2.responseErrorSchemeError(); + + // Retry #4 - Stream send bad request + getScheduler().hasTasks(1).executeNextTasks(1); + + MockedWriteStream stream3 = currentStream(); + Assert.assertNotEquals(stream2, stream3); + + stream3.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream3.hasNoNewMessages(); + stream3.responseInit(4); + + stream3.nextMsg().isWrite().hasWrite(2, 5, 6); // m2 & m3 + + Assert.assertFalse(m2Future.isDone()); + Assert.assertFalse(m3Future.isDone()); + + stream3.responseWriteWritten(5, 2); + + Assert.assertTrue(m2Future.isDone()); + Assert.assertEquals(5, m2Future.join().getSeqNo()); + Assert.assertTrue(m3Future.isDone()); + Assert.assertEquals(6, m3Future.join().getSeqNo()); + + writer.shutdown(); + stream3.complete(Status.SUCCESS); + } + + @Test + public void asyncWriterIdempotentRetryTest() throws QueueOverflowException { + mockStreams() + .then(defaultStreamMockAnswer()) + .then(errorStreamMockAnswer(StatusCode.OVERLOADED)) + .then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE)) + .then(errorStreamMockAnswer(StatusCode.OVERLOADED)) + .then(defaultStreamMockAnswer()); // and repeat + + AsyncWriter writer = client.createAsyncWriter(WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setRetryConfig(RetryConfig.idempotentRetryForever()) + .build()); + + CompletableFuture initFuture = writer.init(); + CompletableFuture m1Future = writer.send(Message.of("test-message".getBytes())); + CompletableFuture m2Future = writer.send(Message.of("test-message2".getBytes())); + + MockedWriteStream stream1 = currentStream(); + stream1.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream1.hasNoNewMessages(); + stream1.responseInit(30); + + Assert.assertTrue(initFuture.isDone()); + Assert.assertEquals(30, initFuture.join().getSeqNo()); + + stream1.nextMsg().isWrite().hasWrite(2, 31, 32); // m1 & m2 + + Assert.assertFalse(m1Future.isDone()); + Assert.assertFalse(m2Future.isDone()); + + stream1.responseWriteWritten(31, 1); + + Assert.assertTrue(m1Future.isDone()); + Assert.assertEquals(31, m1Future.join().getSeqNo()); + Assert.assertFalse(m2Future.isDone()); + + stream1.complete(Status.of(StatusCode.ABORTED)); + + // Retry #1 - ABORTED + getScheduler().hasTasks(1).executeNextTasks(1); + // Retry #2 - OVERLOADED + getScheduler().hasTasks(1).executeNextTasks(1); + // Retry #3 - TRANSPORT_UNAVAILABLE + getScheduler().hasTasks(1).executeNextTasks(1); + // Retry #4 - OVERLOADED + getScheduler().hasTasks(1).executeNextTasks(1); + + MockedWriteStream stream2 = currentStream(); + stream2.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream2.hasNoNewMessages(); + stream2.responseInit(31); + + stream2.nextMsg().isWrite().hasWrite(2, 32); // m2 + + CompletableFuture m3Future = writer.send(Message.of("other-message".getBytes())); + + stream2.nextMsg().isWrite().hasWrite(2, 33); + stream2.responseWriteWritten(32, 2); + + Assert.assertTrue(m2Future.isDone()); + Assert.assertTrue(m3Future.isDone()); + Assert.assertEquals(32, m2Future.join().getSeqNo()); + Assert.assertEquals(33, m3Future.join().getSeqNo()); + + writer.shutdown().join(); + stream2.complete(Status.SUCCESS); + } + + @Test + public void asyncDisabledRetryStreamCloseTest() throws QueueOverflowException { + WriterSettings settings = WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setRetryConfig(RetryConfig.noRetries()) + .build(); + + AsyncWriter writer = client.createAsyncWriter(settings); + CompletableFuture initFuture = writer.init(); + CompletableFuture messageFuture = writer.send(Message.of("test".getBytes())); + + MockedWriteStream stream1 = currentStream(); + stream1.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream1.hasNoNewMessages(); + + Assert.assertFalse(initFuture.isDone()); + Assert.assertFalse(messageFuture.isDone()); + + // Even successful completing closes writer + stream1.complete(Status.SUCCESS); + + Assert.assertTrue(initFuture.isCompletedExceptionally()); + Assert.assertTrue(messageFuture.isCompletedExceptionally()); + + RuntimeException ex = Assert.assertThrows(RuntimeException.class, + () -> writer.send(Message.of("test-message".getBytes()))); + Assert.assertEquals("Writer is already stopped", ex.getMessage()); + + writer.shutdown().join(); + } + + @Test + public void asyncDisabledRetryStreamErrorTest() throws InterruptedException, ExecutionException, TimeoutException { + WriterSettings settings = WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setRetryConfig(RetryConfig.noRetries()) + .build(); + + SyncWriter writer = client.createSyncWriter(settings); + writer.init(); + + MockedWriteStream stream1 = currentStream(); + stream1.nextMsg().isInit().hasInitPath("/mocked_topic"); + stream1.hasNoNewMessages(); + stream1.responseInit(0); + + stream1.responseErrorBadRequest(); + stream1.complete(Status.SUCCESS); + + RuntimeException ex = Assert.assertThrows(RuntimeException.class, + () -> writer.send(Message.of("test-message".getBytes()))); + Assert.assertEquals("Writer is already stopped", ex.getMessage()); + + writer.shutdown(1, TimeUnit.SECONDS); + } + + @Test + public void asyncDisabledRetryNetworkErrorTest() throws InterruptedException, ExecutionException, TimeoutException { + mockStreams() + .then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE)); + + WriterSettings settings = WriterSettings.newBuilder() + .setTopicPath("/mocked_topic") + .setRetryConfig(RetryConfig.noRetries()) + .build(); + + SyncWriter writer = client.createSyncWriter(settings); + writer.init(); +// writer. + + // No stream and no retries in scheduler + Assert.assertNull(currentStream()); + getScheduler().hasNoTasks(); + + RuntimeException ex = Assert.assertThrows(RuntimeException.class, + () -> writer.send(Message.of("test-message".getBytes()))); + Assert.assertEquals("Writer is already stopped", ex.getMessage()); + + writer.shutdown(1, TimeUnit.SECONDS); + } } From f531139efb067ca79efc8895e4909dbe312e01aa Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 10 Mar 2025 14:36:30 +0000 Subject: [PATCH 10/10] Fixed mockito --- topic/pom.xml | 24 ++++++++++++++++++- .../ydb/topic/settings/ReaderSettings.java | 11 ++++----- .../ydb/topic/settings/WriterSettings.java | 11 ++++----- 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/topic/pom.xml b/topic/pom.xml index 46d8ae4f1..5c6acd117 100644 --- a/topic/pom.xml +++ b/topic/pom.xml @@ -48,7 +48,7 @@
org.mockito - mockito-inline + mockito-core test @@ -62,4 +62,26 @@ test + + + + jdk8-build + + 1.8 + + + + + 4.11.0 + + + + + org.mockito + mockito-inline + ${mockito.version} + + + + diff --git a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java index af2c324fb..88843cb65 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java @@ -12,7 +12,6 @@ import tech.ydb.common.retry.RetryConfig; import tech.ydb.common.retry.RetryPolicy; import tech.ydb.core.Status; -import tech.ydb.core.StatusCode; import tech.ydb.topic.impl.GrpcStreamRetrier; /** @@ -143,15 +142,15 @@ public Builder setErrorsHandler(BiConsumer handler) { final RetryConfig currentConfig = retryConfig; retryConfig = new RetryConfig() { @Override - public RetryPolicy isStatusRetryable(StatusCode code) { - handler.accept(Status.of(code), null); - return currentConfig.isStatusRetryable(code); + public RetryPolicy getStatusRetryPolicy(Status status) { + handler.accept(status, null); + return currentConfig.getStatusRetryPolicy(status); } @Override - public RetryPolicy isThrowableRetryable(Throwable th) { + public RetryPolicy getThrowableRetryPolicy(Throwable th) { handler.accept(null, th); - return currentConfig.isThrowableRetryable(th); + return currentConfig.getThrowableRetryPolicy(th); } }; return this; diff --git a/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java b/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java index 4de103438..6a59468d0 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java @@ -5,7 +5,6 @@ import tech.ydb.common.retry.RetryConfig; import tech.ydb.common.retry.RetryPolicy; import tech.ydb.core.Status; -import tech.ydb.core.StatusCode; import tech.ydb.topic.description.Codec; import tech.ydb.topic.impl.GrpcStreamRetrier; @@ -187,15 +186,15 @@ public Builder setErrorsHandler(BiConsumer handler) { final RetryConfig currentConfig = retryConfig; retryConfig = new RetryConfig() { @Override - public RetryPolicy isStatusRetryable(StatusCode code) { - handler.accept(Status.of(code), null); - return currentConfig.isStatusRetryable(code); + public RetryPolicy getStatusRetryPolicy(Status status) { + handler.accept(status, null); + return currentConfig.getStatusRetryPolicy(status); } @Override - public RetryPolicy isThrowableRetryable(Throwable th) { + public RetryPolicy getThrowableRetryPolicy(Throwable th) { handler.accept(null, th); - return currentConfig.isThrowableRetryable(th); + return currentConfig.getThrowableRetryPolicy(th); } }; return this;