diff --git a/README.md b/README.md
index ae21703e..6a720345 100644
--- a/README.md
+++ b/README.md
@@ -69,7 +69,7 @@ The first step is to install the FF SDK as a dependency in your application usin
Refer to the [Harness Feature Flag Java Server SDK](https://mvnrepository.com/artifact/io.harness/ff-java-server-sdk) to identify the latest version for your build automation tool.
-This section lists dependencies for Maven and Gradle and uses the 1.1.10 version as an example:
+This section lists dependencies for Maven and Gradle and uses the 1.1.11 version as an example:
#### Maven
@@ -78,14 +78,14 @@ Add the following Maven dependency in your project's pom.xml file:
io.harness
ff-java-server-sdk
- 1.1.10
+ 1.1.11
```
#### Gradle
```
-implementation group: 'io.harness', name: 'ff-java-server-sdk', version: '1.1.10'
+implementation group: 'io.harness', name: 'ff-java-server-sdk', version: '1.1.11'
```
### Code Sample
diff --git a/examples/pom.xml b/examples/pom.xml
index 1f0d8ac1..632ca006 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -6,7 +6,7 @@
io.harness.featureflags
examples
- 1.1.10
+ 1.1.11
8
@@ -33,7 +33,7 @@
io.harness
ff-java-server-sdk
- 1.1.10
+ 1.1.11
diff --git a/pom.xml b/pom.xml
index 3896335f..f4fc01fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
io.harness
ff-java-server-sdk
- 1.1.10
+ 1.1.11
jar
Harness Feature Flag Java Server SDK
Harness Feature Flag Java Server SDK
diff --git a/src/main/java/io/harness/cf/client/api/InnerClient.java b/src/main/java/io/harness/cf/client/api/InnerClient.java
index b23fbddf..e0ceb807 100644
--- a/src/main/java/io/harness/cf/client/api/InnerClient.java
+++ b/src/main/java/io/harness/cf/client/api/InnerClient.java
@@ -246,12 +246,13 @@ public void onDisconnected() {
pollProcessor.start();
pollerStartedAt = new Date();
} else {
- log.warn(
- "Poller was not restarted [closing={} state={} pollStartTime+interval={} now={} ]",
+ log.debug(
+ "Poller already running [closing={} state={} pollStartTime={} interval={} now={}]",
closing,
pollProcessor.state(),
- instant,
- now);
+ pollerStartedAt.toInstant(),
+ options.getPollIntervalInSeconds(),
+ now.toInstant());
}
}
diff --git a/src/main/java/io/harness/cf/client/api/UpdateProcessor.java b/src/main/java/io/harness/cf/client/api/UpdateProcessor.java
index bfd0877b..3c4f2255 100644
--- a/src/main/java/io/harness/cf/client/api/UpdateProcessor.java
+++ b/src/main/java/io/harness/cf/client/api/UpdateProcessor.java
@@ -47,6 +47,10 @@ public void start() {
running = true;
} catch (ConnectorException | InterruptedException e) {
log.error("Starting updater failed with exc: {}", e.getMessage());
+
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
}
}
@@ -61,6 +65,7 @@ public void stop() {
stream.stop();
running = false;
}
+ executor.shutdown();
boolean result = executor.awaitTermination(3, TimeUnit.SECONDS);
if (result) {
log.debug("All tasks done");
@@ -69,8 +74,8 @@ public void stop() {
}
} catch (InterruptedException e) {
log.error("Exception was raised when stopping update tasks", e);
+ Thread.currentThread().interrupt();
}
- executor.shutdown();
}
public void update(@NonNull final Message message) {
@@ -139,6 +144,7 @@ public void close() {
stream.close();
} catch (InterruptedException e) {
log.error("Exception was raised while trying to close the stream, err: {}", e.getMessage());
+ Thread.currentThread().interrupt();
}
}
log.info("UpdateProcessor closed");
diff --git a/src/main/java/io/harness/cf/client/connector/EventSource.java b/src/main/java/io/harness/cf/client/connector/EventSource.java
index 9d83b08e..523d0778 100644
--- a/src/main/java/io/harness/cf/client/connector/EventSource.java
+++ b/src/main/java/io/harness/cf/client/connector/EventSource.java
@@ -23,7 +23,7 @@ public class EventSource implements ServerSentEvent.Listener, AutoCloseable, Ser
private final Updater updater;
private final Gson gson = new Gson();
private final Request.Builder builder;
- private int retryTime = 2_000;
+ private int retryTime;
private HttpLoggingInterceptor loggingInterceptor;
private ServerSentEvent sse;
@@ -37,7 +37,17 @@ public EventSource(
Map headers,
@NonNull Updater updater,
long sseReadTimeoutMins) {
+ this(url, headers, updater, sseReadTimeoutMins, 2_000);
+ }
+
+ EventSource(
+ @NonNull String url,
+ Map headers,
+ @NonNull Updater updater,
+ long sseReadTimeoutMins,
+ int retryDelayMs) {
this.updater = updater;
+ this.retryTime = retryDelayMs;
okSse = new OkSse(makeStreamClient(sseReadTimeoutMins));
builder = new Request.Builder().url(url);
headers.put("User-Agent", "JavaSDK " + io.harness.cf.Version.VERSION);
@@ -105,6 +115,7 @@ public boolean onRetryTime(ServerSentEvent serverSentEvent, long l) {
@Override
public boolean onRetryError(
ServerSentEvent serverSentEvent, Throwable throwable, Response response) {
+
log.warn(
"EventSource onRetryError [throwable={} message={}]",
throwable.getClass().getSimpleName(),
diff --git a/src/test/java/io/harness/cf/client/api/dispatchers/CannedResponses.java b/src/test/java/io/harness/cf/client/api/dispatchers/CannedResponses.java
index 18ba1e4d..ce87b7f1 100644
--- a/src/test/java/io/harness/cf/client/api/dispatchers/CannedResponses.java
+++ b/src/test/java/io/harness/cf/client/api/dispatchers/CannedResponses.java
@@ -19,7 +19,7 @@ public class CannedResponses {
@Data
@AllArgsConstructor
- static class Event {
+ public static class Event {
String event, domain, identifier;
int version;
}
diff --git a/src/test/java/io/harness/cf/client/connector/CountingUpdater.java b/src/test/java/io/harness/cf/client/connector/CountingUpdater.java
new file mode 100644
index 00000000..3d6b66ce
--- /dev/null
+++ b/src/test/java/io/harness/cf/client/connector/CountingUpdater.java
@@ -0,0 +1,53 @@
+package io.harness.cf.client.connector;
+
+import io.harness.cf.client.api.testutils.PollingAtomicLong;
+import io.harness.cf.client.dto.Message;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+class CountingUpdater implements Updater {
+ @Getter @Setter private PollingAtomicLong connectCount = new PollingAtomicLong(0);
+ @Getter @Setter private PollingAtomicLong disconnectCount = new PollingAtomicLong(0);
+ @Getter @Setter private PollingAtomicLong readyCount = new PollingAtomicLong(0);
+ @Getter @Setter private PollingAtomicLong failureCount = new PollingAtomicLong(0);
+ @Getter @Setter private PollingAtomicLong updateCount = new PollingAtomicLong(0);
+ @Getter @Setter private PollingAtomicLong errorCount = new PollingAtomicLong(0);
+
+ @Override
+ public void onConnected() {
+ log.debug("onConnected");
+ connectCount.incrementAndGet();
+ }
+
+ @Override
+ public void onDisconnected() {
+ log.debug("onDisconnected");
+ disconnectCount.incrementAndGet();
+ }
+
+ @Override
+ public void onReady() {
+ log.debug("onReady");
+ readyCount.incrementAndGet();
+ }
+
+ @Override
+ public void onFailure(String message) {
+ log.debug("onFailure: " + message);
+ failureCount.incrementAndGet();
+ }
+
+ @Override
+ public void onError() {
+ log.info("onError");
+ errorCount.incrementAndGet();
+ }
+
+ @Override
+ public void update(Message message) {
+ log.debug("update: " + message);
+ updateCount.incrementAndGet();
+ }
+};
diff --git a/src/test/java/io/harness/cf/client/connector/EventSourceTest.java b/src/test/java/io/harness/cf/client/connector/EventSourceTest.java
new file mode 100644
index 00000000..c03f3b70
--- /dev/null
+++ b/src/test/java/io/harness/cf/client/connector/EventSourceTest.java
@@ -0,0 +1,127 @@
+package io.harness.cf.client.connector;
+
+import static io.harness.cf.client.api.dispatchers.CannedResponses.*;
+import static java.lang.System.out;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.mockwebserver.*;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Test;
+
+@Slf4j
+class EventSourceTest {
+
+ private final AtomicInteger version = new AtomicInteger(2);
+
+ static class StreamDispatcher extends Dispatcher {
+ private final AtomicInteger version = new AtomicInteger(2);
+ protected final AtomicInteger request = new AtomicInteger(1);
+
+ protected MockResponse makeStreamResponse() {
+ int reqNo = request.getAndIncrement();
+ if (reqNo <= 3) {
+ // Force a disconnect on the first few attempts
+ out.printf("ReqNo %d will be disconnected on purpose\n", reqNo);
+ return new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AFTER_REQUEST);
+ } else {
+ // Eventually allow a good connection attempt and send a flag
+ out.printf("ReqNo %d will be allowed\n", reqNo);
+ return makeMockStreamResponse(
+ 200,
+ makeFlagPatchEvent("simplebool", version.getAndIncrement()),
+ makeFlagPatchEvent("simplebool", version.getAndIncrement()))
+ .setSocketPolicy(SocketPolicy.KEEP_OPEN);
+ }
+ }
+
+ @Override
+ @SneakyThrows
+ @NotNull
+ public MockResponse dispatch(RecordedRequest recordedRequest) {
+ out.println("DISPATCH GOT ------> " + recordedRequest.getPath());
+
+ // recordedRequest.getHeaders().forEach(h -> out.printf(" %s: %s\n", h.component1(),
+ // h.component2()));
+
+ if (Objects.requireNonNull(recordedRequest.getPath()).equals("/api/1.0/stream?cluster=1")) {
+ return makeStreamResponse();
+ }
+ throw new UnsupportedOperationException("ERROR: url not mapped " + recordedRequest.getPath());
+ }
+ }
+
+ static class FailingStreamDispatcher extends StreamDispatcher {
+ @Override
+ protected MockResponse makeStreamResponse() {
+ int reqNo = request.getAndIncrement();
+ // Force a disconnect on all requests
+ out.printf("ReqNo %d will be disconnected on purpose\n", reqNo);
+ return new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AFTER_REQUEST);
+ }
+ }
+
+ @Test
+ void shouldNotCallErrorHandlerIfRetryEventuallyReconnectsToStreamEndpoint()
+ throws IOException, InterruptedException {
+ CountingUpdater updater = new CountingUpdater();
+
+ try (MockWebServer mockSvr = new MockWebServer();
+ EventSource eventSource =
+ new EventSource(
+ setupMockServer(mockSvr, new StreamDispatcher()), new HashMap<>(), updater, 1, 1)) {
+ eventSource.start();
+
+ TimeUnit.SECONDS.sleep(15);
+ }
+
+ // for this test, connection to the /stream endpoint will fail several times but eventually
+ // connect.
+ // There should be at least 1 connect event. Each retry calls onError
+
+ assertTrue(updater.getConnectCount().get() >= 1);
+ assertEquals(0, updater.getFailureCount().get());
+ assertEquals(5, updater.getErrorCount().get());
+ }
+
+ @Test
+ void shouldRestartPollerIfAllConnectionAttemptsToStreamEndpointFail()
+ throws IOException, InterruptedException {
+ CountingUpdater updater = new CountingUpdater();
+
+ try (MockWebServer mockSvr = new MockWebServer();
+ EventSource eventSource =
+ new EventSource(
+ setupMockServer(mockSvr, new FailingStreamDispatcher()),
+ new HashMap<>(),
+ updater,
+ 1,
+ 1)) {
+ eventSource.start();
+
+ TimeUnit.SECONDS.sleep(15);
+ }
+
+ // for this test, connection to the /stream endpoint will never succeed.
+ // we expect the error handler to be called, connect handler should not be called
+
+ assertEquals(0, updater.getConnectCount().get());
+ assertEquals(0, updater.getFailureCount().get());
+ assertTrue(updater.getErrorCount().get() >= 1);
+ }
+
+ @SneakyThrows
+ private String setupMockServer(MockWebServer mockSvr, Dispatcher dispatcher) {
+ mockSvr.setDispatcher(dispatcher);
+ mockSvr.start();
+ return String.format(
+ "http://%s:%s/api/1.0/stream?cluster=%d", mockSvr.getHostName(), mockSvr.getPort(), 1);
+ }
+}
diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml
index 59f4853c..39d9aa38 100644
--- a/src/test/resources/logback-test.xml
+++ b/src/test/resources/logback-test.xml
@@ -4,7 +4,7 @@
- %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36}.%M\(%line\) - SDK=${SDK}, Version=${version}, flag=%X{flag}, target=%X{target}, requestID=%X{requestId} - %msg%n
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36}.%M\(%line\) - %msg%n