Skip to content

Commit b2bdbe0

Browse files
authored
FFM-6581 - Java SDK - Poller was not restarted + timeouts logged (#135)
* FFM-6581 - Java SDK - Poller was not restarted + timeouts logged What Fixed timeout exception, reduce some log levels to debug and added additional unit tests. Why We see timeout errors logged becauses the code calls awaitTermination() before shutDown() when stopping the update processor. Also we log misleading warnings about poller not being restarted when in fact the code is working as intented (though it's candidate for additional optimisation in future releases). Testing Added new unit tets for EventSource + manual testing
1 parent 21d2e9a commit b2bdbe0

File tree

10 files changed

+212
-14
lines changed

10 files changed

+212
-14
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ The first step is to install the FF SDK as a dependency in your application usin
6969

7070
Refer to the [Harness Feature Flag Java Server SDK](https://mvnrepository.com/artifact/io.harness/ff-java-server-sdk) to identify the latest version for your build automation tool.
7171

72-
This section lists dependencies for Maven and Gradle and uses the 1.1.10 version as an example:
72+
This section lists dependencies for Maven and Gradle and uses the 1.1.11 version as an example:
7373

7474
#### Maven
7575

@@ -78,14 +78,14 @@ Add the following Maven dependency in your project's pom.xml file:
7878
<dependency>
7979
<groupId>io.harness</groupId>
8080
<artifactId>ff-java-server-sdk</artifactId>
81-
<version>1.1.10</version>
81+
<version>1.1.11</version>
8282
</dependency>
8383
```
8484

8585
#### Gradle
8686

8787
```
88-
implementation group: 'io.harness', name: 'ff-java-server-sdk', version: '1.1.10'
88+
implementation group: 'io.harness', name: 'ff-java-server-sdk', version: '1.1.11'
8989
```
9090

9191
### Code Sample

examples/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>io.harness.featureflags</groupId>
88
<artifactId>examples</artifactId>
9-
<version>1.1.10</version>
9+
<version>1.1.11</version>
1010

1111
<properties>
1212
<maven.compiler.source>8</maven.compiler.source>
@@ -33,7 +33,7 @@
3333
<dependency>
3434
<groupId>io.harness</groupId>
3535
<artifactId>ff-java-server-sdk</artifactId>
36-
<version>1.1.10</version>
36+
<version>1.1.11</version>
3737
</dependency>
3838

3939
<dependency>

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>io.harness</groupId>
88
<artifactId>ff-java-server-sdk</artifactId>
9-
<version>1.1.10</version>
9+
<version>1.1.11</version>
1010
<packaging>jar</packaging>
1111
<name>Harness Feature Flag Java Server SDK</name>
1212
<description>Harness Feature Flag Java Server SDK</description>

src/main/java/io/harness/cf/client/api/InnerClient.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -246,12 +246,13 @@ public void onDisconnected() {
246246
pollProcessor.start();
247247
pollerStartedAt = new Date();
248248
} else {
249-
log.warn(
250-
"Poller was not restarted [closing={} state={} pollStartTime+interval={} now={} ]",
249+
log.debug(
250+
"Poller already running [closing={} state={} pollStartTime={} interval={} now={}]",
251251
closing,
252252
pollProcessor.state(),
253-
instant,
254-
now);
253+
pollerStartedAt.toInstant(),
254+
options.getPollIntervalInSeconds(),
255+
now.toInstant());
255256
}
256257
}
257258

src/main/java/io/harness/cf/client/api/UpdateProcessor.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ public void start() {
4747
running = true;
4848
} catch (ConnectorException | InterruptedException e) {
4949
log.error("Starting updater failed with exc: {}", e.getMessage());
50+
51+
if (e instanceof InterruptedException) {
52+
Thread.currentThread().interrupt();
53+
}
5054
}
5155
}
5256

@@ -61,6 +65,7 @@ public void stop() {
6165
stream.stop();
6266
running = false;
6367
}
68+
executor.shutdown();
6469
boolean result = executor.awaitTermination(3, TimeUnit.SECONDS);
6570
if (result) {
6671
log.debug("All tasks done");
@@ -69,8 +74,8 @@ public void stop() {
6974
}
7075
} catch (InterruptedException e) {
7176
log.error("Exception was raised when stopping update tasks", e);
77+
Thread.currentThread().interrupt();
7278
}
73-
executor.shutdown();
7479
}
7580

7681
public void update(@NonNull final Message message) {
@@ -139,6 +144,7 @@ public void close() {
139144
stream.close();
140145
} catch (InterruptedException e) {
141146
log.error("Exception was raised while trying to close the stream, err: {}", e.getMessage());
147+
Thread.currentThread().interrupt();
142148
}
143149
}
144150
log.info("UpdateProcessor closed");

src/main/java/io/harness/cf/client/connector/EventSource.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public class EventSource implements ServerSentEvent.Listener, AutoCloseable, Ser
2323
private final Updater updater;
2424
private final Gson gson = new Gson();
2525
private final Request.Builder builder;
26-
private int retryTime = 2_000;
26+
private int retryTime;
2727
private HttpLoggingInterceptor loggingInterceptor;
2828

2929
private ServerSentEvent sse;
@@ -37,7 +37,17 @@ public EventSource(
3737
Map<String, String> headers,
3838
@NonNull Updater updater,
3939
long sseReadTimeoutMins) {
40+
this(url, headers, updater, sseReadTimeoutMins, 2_000);
41+
}
42+
43+
EventSource(
44+
@NonNull String url,
45+
Map<String, String> headers,
46+
@NonNull Updater updater,
47+
long sseReadTimeoutMins,
48+
int retryDelayMs) {
4049
this.updater = updater;
50+
this.retryTime = retryDelayMs;
4151
okSse = new OkSse(makeStreamClient(sseReadTimeoutMins));
4252
builder = new Request.Builder().url(url);
4353
headers.put("User-Agent", "JavaSDK " + io.harness.cf.Version.VERSION);
@@ -105,6 +115,7 @@ public boolean onRetryTime(ServerSentEvent serverSentEvent, long l) {
105115
@Override
106116
public boolean onRetryError(
107117
ServerSentEvent serverSentEvent, Throwable throwable, Response response) {
118+
108119
log.warn(
109120
"EventSource onRetryError [throwable={} message={}]",
110121
throwable.getClass().getSimpleName(),

src/test/java/io/harness/cf/client/api/dispatchers/CannedResponses.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public class CannedResponses {
1919

2020
@Data
2121
@AllArgsConstructor
22-
static class Event {
22+
public static class Event {
2323
String event, domain, identifier;
2424
int version;
2525
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package io.harness.cf.client.connector;
2+
3+
import io.harness.cf.client.api.testutils.PollingAtomicLong;
4+
import io.harness.cf.client.dto.Message;
5+
import lombok.Getter;
6+
import lombok.Setter;
7+
import lombok.extern.slf4j.Slf4j;
8+
9+
@Slf4j
10+
class CountingUpdater implements Updater {
11+
@Getter @Setter private PollingAtomicLong connectCount = new PollingAtomicLong(0);
12+
@Getter @Setter private PollingAtomicLong disconnectCount = new PollingAtomicLong(0);
13+
@Getter @Setter private PollingAtomicLong readyCount = new PollingAtomicLong(0);
14+
@Getter @Setter private PollingAtomicLong failureCount = new PollingAtomicLong(0);
15+
@Getter @Setter private PollingAtomicLong updateCount = new PollingAtomicLong(0);
16+
@Getter @Setter private PollingAtomicLong errorCount = new PollingAtomicLong(0);
17+
18+
@Override
19+
public void onConnected() {
20+
log.debug("onConnected");
21+
connectCount.incrementAndGet();
22+
}
23+
24+
@Override
25+
public void onDisconnected() {
26+
log.debug("onDisconnected");
27+
disconnectCount.incrementAndGet();
28+
}
29+
30+
@Override
31+
public void onReady() {
32+
log.debug("onReady");
33+
readyCount.incrementAndGet();
34+
}
35+
36+
@Override
37+
public void onFailure(String message) {
38+
log.debug("onFailure: " + message);
39+
failureCount.incrementAndGet();
40+
}
41+
42+
@Override
43+
public void onError() {
44+
log.info("onError");
45+
errorCount.incrementAndGet();
46+
}
47+
48+
@Override
49+
public void update(Message message) {
50+
log.debug("update: " + message);
51+
updateCount.incrementAndGet();
52+
}
53+
};
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package io.harness.cf.client.connector;
2+
3+
import static io.harness.cf.client.api.dispatchers.CannedResponses.*;
4+
import static java.lang.System.out;
5+
import static org.junit.jupiter.api.Assertions.assertEquals;
6+
import static org.junit.jupiter.api.Assertions.assertTrue;
7+
8+
import java.io.IOException;
9+
import java.util.HashMap;
10+
import java.util.Objects;
11+
import java.util.concurrent.TimeUnit;
12+
import java.util.concurrent.atomic.AtomicInteger;
13+
import lombok.SneakyThrows;
14+
import lombok.extern.slf4j.Slf4j;
15+
import okhttp3.mockwebserver.*;
16+
import org.jetbrains.annotations.NotNull;
17+
import org.junit.jupiter.api.Test;
18+
19+
@Slf4j
20+
class EventSourceTest {
21+
22+
private final AtomicInteger version = new AtomicInteger(2);
23+
24+
static class StreamDispatcher extends Dispatcher {
25+
private final AtomicInteger version = new AtomicInteger(2);
26+
protected final AtomicInteger request = new AtomicInteger(1);
27+
28+
protected MockResponse makeStreamResponse() {
29+
int reqNo = request.getAndIncrement();
30+
if (reqNo <= 3) {
31+
// Force a disconnect on the first few attempts
32+
out.printf("ReqNo %d will be disconnected on purpose\n", reqNo);
33+
return new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AFTER_REQUEST);
34+
} else {
35+
// Eventually allow a good connection attempt and send a flag
36+
out.printf("ReqNo %d will be allowed\n", reqNo);
37+
return makeMockStreamResponse(
38+
200,
39+
makeFlagPatchEvent("simplebool", version.getAndIncrement()),
40+
makeFlagPatchEvent("simplebool", version.getAndIncrement()))
41+
.setSocketPolicy(SocketPolicy.KEEP_OPEN);
42+
}
43+
}
44+
45+
@Override
46+
@SneakyThrows
47+
@NotNull
48+
public MockResponse dispatch(RecordedRequest recordedRequest) {
49+
out.println("DISPATCH GOT ------> " + recordedRequest.getPath());
50+
51+
// recordedRequest.getHeaders().forEach(h -> out.printf(" %s: %s\n", h.component1(),
52+
// h.component2()));
53+
54+
if (Objects.requireNonNull(recordedRequest.getPath()).equals("/api/1.0/stream?cluster=1")) {
55+
return makeStreamResponse();
56+
}
57+
throw new UnsupportedOperationException("ERROR: url not mapped " + recordedRequest.getPath());
58+
}
59+
}
60+
61+
static class FailingStreamDispatcher extends StreamDispatcher {
62+
@Override
63+
protected MockResponse makeStreamResponse() {
64+
int reqNo = request.getAndIncrement();
65+
// Force a disconnect on all requests
66+
out.printf("ReqNo %d will be disconnected on purpose\n", reqNo);
67+
return new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AFTER_REQUEST);
68+
}
69+
}
70+
71+
@Test
72+
void shouldNotCallErrorHandlerIfRetryEventuallyReconnectsToStreamEndpoint()
73+
throws IOException, InterruptedException {
74+
CountingUpdater updater = new CountingUpdater();
75+
76+
try (MockWebServer mockSvr = new MockWebServer();
77+
EventSource eventSource =
78+
new EventSource(
79+
setupMockServer(mockSvr, new StreamDispatcher()), new HashMap<>(), updater, 1, 1)) {
80+
eventSource.start();
81+
82+
TimeUnit.SECONDS.sleep(15);
83+
}
84+
85+
// for this test, connection to the /stream endpoint will fail several times but eventually
86+
// connect.
87+
// There should be at least 1 connect event. Each retry calls onError
88+
89+
assertTrue(updater.getConnectCount().get() >= 1);
90+
assertEquals(0, updater.getFailureCount().get());
91+
assertEquals(5, updater.getErrorCount().get());
92+
}
93+
94+
@Test
95+
void shouldRestartPollerIfAllConnectionAttemptsToStreamEndpointFail()
96+
throws IOException, InterruptedException {
97+
CountingUpdater updater = new CountingUpdater();
98+
99+
try (MockWebServer mockSvr = new MockWebServer();
100+
EventSource eventSource =
101+
new EventSource(
102+
setupMockServer(mockSvr, new FailingStreamDispatcher()),
103+
new HashMap<>(),
104+
updater,
105+
1,
106+
1)) {
107+
eventSource.start();
108+
109+
TimeUnit.SECONDS.sleep(15);
110+
}
111+
112+
// for this test, connection to the /stream endpoint will never succeed.
113+
// we expect the error handler to be called, connect handler should not be called
114+
115+
assertEquals(0, updater.getConnectCount().get());
116+
assertEquals(0, updater.getFailureCount().get());
117+
assertTrue(updater.getErrorCount().get() >= 1);
118+
}
119+
120+
@SneakyThrows
121+
private String setupMockServer(MockWebServer mockSvr, Dispatcher dispatcher) {
122+
mockSvr.setDispatcher(dispatcher);
123+
mockSvr.start();
124+
return String.format(
125+
"http://%s:%s/api/1.0/stream?cluster=%d", mockSvr.getHostName(), mockSvr.getPort(), 1);
126+
}
127+
}

src/test/resources/logback-test.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
66
<encoder>
7-
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36}.%M\(%line\) - SDK=${SDK}, Version=${version}, flag=%X{flag}, target=%X{target}, requestID=%X{requestId} - %msg%n
7+
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36}.%M\(%line\) - %msg%n
88
</pattern>
99
</encoder>
1010
</appender>

0 commit comments

Comments
 (0)