From b2bdbe0bda03a02d2f1fc8f52bc5a3b83275b9c3 Mon Sep 17 00:00:00 2001 From: Andrew Bell <115623869+andybharness@users.noreply.github.com> Date: Fri, 27 Jan 2023 10:58:51 +0000 Subject: [PATCH] FFM-6581 - Java SDK - Poller was not restarted + timeouts logged (#135) * FFM-6581 - Java SDK - Poller was not restarted + timeouts logged What Fixed timeout exception, reduce some log levels to debug and added additional unit tests. Why We see timeout errors logged becauses the code calls awaitTermination() before shutDown() when stopping the update processor. Also we log misleading warnings about poller not being restarted when in fact the code is working as intented (though it's candidate for additional optimisation in future releases). Testing Added new unit tets for EventSource + manual testing --- README.md | 6 +- examples/pom.xml | 4 +- pom.xml | 2 +- .../io/harness/cf/client/api/InnerClient.java | 9 +- .../cf/client/api/UpdateProcessor.java | 8 +- .../cf/client/connector/EventSource.java | 13 +- .../api/dispatchers/CannedResponses.java | 2 +- .../cf/client/connector/CountingUpdater.java | 53 ++++++++ .../cf/client/connector/EventSourceTest.java | 127 ++++++++++++++++++ src/test/resources/logback-test.xml | 2 +- 10 files changed, 212 insertions(+), 14 deletions(-) create mode 100644 src/test/java/io/harness/cf/client/connector/CountingUpdater.java create mode 100644 src/test/java/io/harness/cf/client/connector/EventSourceTest.java diff --git a/README.md b/README.md index ae21703e..6a720345 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,7 @@ The first step is to install the FF SDK as a dependency in your application usin Refer to the [Harness Feature Flag Java Server SDK](https://mvnrepository.com/artifact/io.harness/ff-java-server-sdk) to identify the latest version for your build automation tool. -This section lists dependencies for Maven and Gradle and uses the 1.1.10 version as an example: +This section lists dependencies for Maven and Gradle and uses the 1.1.11 version as an example: #### Maven @@ -78,14 +78,14 @@ Add the following Maven dependency in your project's pom.xml file: io.harness ff-java-server-sdk - 1.1.10 + 1.1.11 ``` #### Gradle ``` -implementation group: 'io.harness', name: 'ff-java-server-sdk', version: '1.1.10' +implementation group: 'io.harness', name: 'ff-java-server-sdk', version: '1.1.11' ``` ### Code Sample diff --git a/examples/pom.xml b/examples/pom.xml index 1f0d8ac1..632ca006 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -6,7 +6,7 @@ io.harness.featureflags examples - 1.1.10 + 1.1.11 8 @@ -33,7 +33,7 @@ io.harness ff-java-server-sdk - 1.1.10 + 1.1.11 diff --git a/pom.xml b/pom.xml index 3896335f..f4fc01fd 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ io.harness ff-java-server-sdk - 1.1.10 + 1.1.11 jar Harness Feature Flag Java Server SDK Harness Feature Flag Java Server SDK diff --git a/src/main/java/io/harness/cf/client/api/InnerClient.java b/src/main/java/io/harness/cf/client/api/InnerClient.java index b23fbddf..e0ceb807 100644 --- a/src/main/java/io/harness/cf/client/api/InnerClient.java +++ b/src/main/java/io/harness/cf/client/api/InnerClient.java @@ -246,12 +246,13 @@ public void onDisconnected() { pollProcessor.start(); pollerStartedAt = new Date(); } else { - log.warn( - "Poller was not restarted [closing={} state={} pollStartTime+interval={} now={} ]", + log.debug( + "Poller already running [closing={} state={} pollStartTime={} interval={} now={}]", closing, pollProcessor.state(), - instant, - now); + pollerStartedAt.toInstant(), + options.getPollIntervalInSeconds(), + now.toInstant()); } } diff --git a/src/main/java/io/harness/cf/client/api/UpdateProcessor.java b/src/main/java/io/harness/cf/client/api/UpdateProcessor.java index bfd0877b..3c4f2255 100644 --- a/src/main/java/io/harness/cf/client/api/UpdateProcessor.java +++ b/src/main/java/io/harness/cf/client/api/UpdateProcessor.java @@ -47,6 +47,10 @@ public void start() { running = true; } catch (ConnectorException | InterruptedException e) { log.error("Starting updater failed with exc: {}", e.getMessage()); + + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } } } @@ -61,6 +65,7 @@ public void stop() { stream.stop(); running = false; } + executor.shutdown(); boolean result = executor.awaitTermination(3, TimeUnit.SECONDS); if (result) { log.debug("All tasks done"); @@ -69,8 +74,8 @@ public void stop() { } } catch (InterruptedException e) { log.error("Exception was raised when stopping update tasks", e); + Thread.currentThread().interrupt(); } - executor.shutdown(); } public void update(@NonNull final Message message) { @@ -139,6 +144,7 @@ public void close() { stream.close(); } catch (InterruptedException e) { log.error("Exception was raised while trying to close the stream, err: {}", e.getMessage()); + Thread.currentThread().interrupt(); } } log.info("UpdateProcessor closed"); diff --git a/src/main/java/io/harness/cf/client/connector/EventSource.java b/src/main/java/io/harness/cf/client/connector/EventSource.java index 9d83b08e..523d0778 100644 --- a/src/main/java/io/harness/cf/client/connector/EventSource.java +++ b/src/main/java/io/harness/cf/client/connector/EventSource.java @@ -23,7 +23,7 @@ public class EventSource implements ServerSentEvent.Listener, AutoCloseable, Ser private final Updater updater; private final Gson gson = new Gson(); private final Request.Builder builder; - private int retryTime = 2_000; + private int retryTime; private HttpLoggingInterceptor loggingInterceptor; private ServerSentEvent sse; @@ -37,7 +37,17 @@ public EventSource( Map headers, @NonNull Updater updater, long sseReadTimeoutMins) { + this(url, headers, updater, sseReadTimeoutMins, 2_000); + } + + EventSource( + @NonNull String url, + Map headers, + @NonNull Updater updater, + long sseReadTimeoutMins, + int retryDelayMs) { this.updater = updater; + this.retryTime = retryDelayMs; okSse = new OkSse(makeStreamClient(sseReadTimeoutMins)); builder = new Request.Builder().url(url); headers.put("User-Agent", "JavaSDK " + io.harness.cf.Version.VERSION); @@ -105,6 +115,7 @@ public boolean onRetryTime(ServerSentEvent serverSentEvent, long l) { @Override public boolean onRetryError( ServerSentEvent serverSentEvent, Throwable throwable, Response response) { + log.warn( "EventSource onRetryError [throwable={} message={}]", throwable.getClass().getSimpleName(), diff --git a/src/test/java/io/harness/cf/client/api/dispatchers/CannedResponses.java b/src/test/java/io/harness/cf/client/api/dispatchers/CannedResponses.java index 18ba1e4d..ce87b7f1 100644 --- a/src/test/java/io/harness/cf/client/api/dispatchers/CannedResponses.java +++ b/src/test/java/io/harness/cf/client/api/dispatchers/CannedResponses.java @@ -19,7 +19,7 @@ public class CannedResponses { @Data @AllArgsConstructor - static class Event { + public static class Event { String event, domain, identifier; int version; } diff --git a/src/test/java/io/harness/cf/client/connector/CountingUpdater.java b/src/test/java/io/harness/cf/client/connector/CountingUpdater.java new file mode 100644 index 00000000..3d6b66ce --- /dev/null +++ b/src/test/java/io/harness/cf/client/connector/CountingUpdater.java @@ -0,0 +1,53 @@ +package io.harness.cf.client.connector; + +import io.harness.cf.client.api.testutils.PollingAtomicLong; +import io.harness.cf.client.dto.Message; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +class CountingUpdater implements Updater { + @Getter @Setter private PollingAtomicLong connectCount = new PollingAtomicLong(0); + @Getter @Setter private PollingAtomicLong disconnectCount = new PollingAtomicLong(0); + @Getter @Setter private PollingAtomicLong readyCount = new PollingAtomicLong(0); + @Getter @Setter private PollingAtomicLong failureCount = new PollingAtomicLong(0); + @Getter @Setter private PollingAtomicLong updateCount = new PollingAtomicLong(0); + @Getter @Setter private PollingAtomicLong errorCount = new PollingAtomicLong(0); + + @Override + public void onConnected() { + log.debug("onConnected"); + connectCount.incrementAndGet(); + } + + @Override + public void onDisconnected() { + log.debug("onDisconnected"); + disconnectCount.incrementAndGet(); + } + + @Override + public void onReady() { + log.debug("onReady"); + readyCount.incrementAndGet(); + } + + @Override + public void onFailure(String message) { + log.debug("onFailure: " + message); + failureCount.incrementAndGet(); + } + + @Override + public void onError() { + log.info("onError"); + errorCount.incrementAndGet(); + } + + @Override + public void update(Message message) { + log.debug("update: " + message); + updateCount.incrementAndGet(); + } +}; diff --git a/src/test/java/io/harness/cf/client/connector/EventSourceTest.java b/src/test/java/io/harness/cf/client/connector/EventSourceTest.java new file mode 100644 index 00000000..c03f3b70 --- /dev/null +++ b/src/test/java/io/harness/cf/client/connector/EventSourceTest.java @@ -0,0 +1,127 @@ +package io.harness.cf.client.connector; + +import static io.harness.cf.client.api.dispatchers.CannedResponses.*; +import static java.lang.System.out; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import okhttp3.mockwebserver.*; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; + +@Slf4j +class EventSourceTest { + + private final AtomicInteger version = new AtomicInteger(2); + + static class StreamDispatcher extends Dispatcher { + private final AtomicInteger version = new AtomicInteger(2); + protected final AtomicInteger request = new AtomicInteger(1); + + protected MockResponse makeStreamResponse() { + int reqNo = request.getAndIncrement(); + if (reqNo <= 3) { + // Force a disconnect on the first few attempts + out.printf("ReqNo %d will be disconnected on purpose\n", reqNo); + return new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AFTER_REQUEST); + } else { + // Eventually allow a good connection attempt and send a flag + out.printf("ReqNo %d will be allowed\n", reqNo); + return makeMockStreamResponse( + 200, + makeFlagPatchEvent("simplebool", version.getAndIncrement()), + makeFlagPatchEvent("simplebool", version.getAndIncrement())) + .setSocketPolicy(SocketPolicy.KEEP_OPEN); + } + } + + @Override + @SneakyThrows + @NotNull + public MockResponse dispatch(RecordedRequest recordedRequest) { + out.println("DISPATCH GOT ------> " + recordedRequest.getPath()); + + // recordedRequest.getHeaders().forEach(h -> out.printf(" %s: %s\n", h.component1(), + // h.component2())); + + if (Objects.requireNonNull(recordedRequest.getPath()).equals("/api/1.0/stream?cluster=1")) { + return makeStreamResponse(); + } + throw new UnsupportedOperationException("ERROR: url not mapped " + recordedRequest.getPath()); + } + } + + static class FailingStreamDispatcher extends StreamDispatcher { + @Override + protected MockResponse makeStreamResponse() { + int reqNo = request.getAndIncrement(); + // Force a disconnect on all requests + out.printf("ReqNo %d will be disconnected on purpose\n", reqNo); + return new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AFTER_REQUEST); + } + } + + @Test + void shouldNotCallErrorHandlerIfRetryEventuallyReconnectsToStreamEndpoint() + throws IOException, InterruptedException { + CountingUpdater updater = new CountingUpdater(); + + try (MockWebServer mockSvr = new MockWebServer(); + EventSource eventSource = + new EventSource( + setupMockServer(mockSvr, new StreamDispatcher()), new HashMap<>(), updater, 1, 1)) { + eventSource.start(); + + TimeUnit.SECONDS.sleep(15); + } + + // for this test, connection to the /stream endpoint will fail several times but eventually + // connect. + // There should be at least 1 connect event. Each retry calls onError + + assertTrue(updater.getConnectCount().get() >= 1); + assertEquals(0, updater.getFailureCount().get()); + assertEquals(5, updater.getErrorCount().get()); + } + + @Test + void shouldRestartPollerIfAllConnectionAttemptsToStreamEndpointFail() + throws IOException, InterruptedException { + CountingUpdater updater = new CountingUpdater(); + + try (MockWebServer mockSvr = new MockWebServer(); + EventSource eventSource = + new EventSource( + setupMockServer(mockSvr, new FailingStreamDispatcher()), + new HashMap<>(), + updater, + 1, + 1)) { + eventSource.start(); + + TimeUnit.SECONDS.sleep(15); + } + + // for this test, connection to the /stream endpoint will never succeed. + // we expect the error handler to be called, connect handler should not be called + + assertEquals(0, updater.getConnectCount().get()); + assertEquals(0, updater.getFailureCount().get()); + assertTrue(updater.getErrorCount().get() >= 1); + } + + @SneakyThrows + private String setupMockServer(MockWebServer mockSvr, Dispatcher dispatcher) { + mockSvr.setDispatcher(dispatcher); + mockSvr.start(); + return String.format( + "http://%s:%s/api/1.0/stream?cluster=%d", mockSvr.getHostName(), mockSvr.getPort(), 1); + } +} diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 59f4853c..39d9aa38 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -4,7 +4,7 @@ - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36}.%M\(%line\) - SDK=${SDK}, Version=${version}, flag=%X{flag}, target=%X{target}, requestID=%X{requestId} - %msg%n + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36}.%M\(%line\) - %msg%n