Skip to content

Commit 1c9a22d

Browse files
authored
Automatically reconnect Kubernetes watcher when closed exceptionally (#6023)
Motivation: A watcher in `KubernetesEndpointGroup` automatically reconnects when it fails to connect to the remote peer. However, it does not reconnect when a `WatcherException` is raised. ``` io.fabric8.kubernetes.client.WatcherException: too old resource version: 573375490 (573377297) at io.fabric8.kubernetes.client.dsl.internal.AbstractWatchManager.onStatus(AbstractWatchManager.java:401) at io.fabric8.kubernetes.client.dsl.internal.AbstractWatchManager.onMessage(AbstractWatchManager.java:369) at io.fabric8.kubernetes.client.dsl.internal.WatcherWebSocketListener.onMessage(WatcherWebSocketListener.java:52) at com.linecorp.armeria.client.kubernetes.ArmeriaWebSocket.onNext(ArmeriaWebSocket.java:106) at com.linecorp.armeria.client.kubernetes.ArmeriaWebSocket.onNext(ArmeriaWebSocket.java:37) at com.linecorp.armeria.common.stream.DefaultStreamMessage.notifySubscriberWithElements(DefaultStreamMessage.java:412) ... Caused by: io.fabric8.kubernetes.client.KubernetesClientException: too old resource version: 573375490 (573377297) ... 62 common frames omitted ``` I don't know why `too old resource version` was raised but an important thing is that watchers should not be stopped until `KubernetesEndpointGroup` is closed. Modifications: - Refactor `KuberntesEndpointGroup` to start watchers asynchronously. - Automatically restart `Watcher`s when `onClose(WatcherException)` is invoked. - Add more logs that I think might be useful. - Also make the log formats consistent - Debounce the update of endpoints to prevent `EndpointGroup.whenReady()` from completing with a small number of endpoints. - The purpose is to prevent a few endpoints from receiving too much traffic when a watcher is newly created. Result: `KubernetesEndpointGroup` automatically reconnects a `Watcher` when `WatcherException` is raised.
1 parent 244e5cb commit 1c9a22d

File tree

6 files changed

+397
-50
lines changed

6 files changed

+397
-50
lines changed

Diff for: kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/ArmeriaHttpClientFactory.java

+9
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package com.linecorp.armeria.client.kubernetes;
1818

1919
import com.linecorp.armeria.client.WebClientBuilder;
20+
import com.linecorp.armeria.client.websocket.WebSocketClient;
21+
import com.linecorp.armeria.client.websocket.WebSocketClientBuilder;
2022

2123
import io.fabric8.kubernetes.client.http.HttpClient;
2224

@@ -36,4 +38,11 @@ public HttpClient.Builder newBuilder() {
3638
protected void additionalConfig(WebClientBuilder builder) {
3739
// no default implementation
3840
}
41+
42+
/**
43+
* Subclasses may use this to apply additional configuration for {@link WebSocketClient}.
44+
*/
45+
protected void additionalWebSocketConfig(WebSocketClientBuilder builder) {
46+
// no default implementation
47+
}
3948
}

Diff for: kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/ArmeriaWebSocketClient.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import com.linecorp.armeria.client.RequestOptions;
3131
import com.linecorp.armeria.client.websocket.WebSocketClient;
32+
import com.linecorp.armeria.client.websocket.WebSocketClientBuilder;
3233
import com.linecorp.armeria.client.websocket.WebSocketClientHandshakeException;
3334
import com.linecorp.armeria.client.websocket.WebSocketSession;
3435
import com.linecorp.armeria.common.HttpHeaderNames;
@@ -69,10 +70,12 @@ private WebSocketClient webSocketClient() {
6970
if (webSocketClient0 != null) {
7071
return webSocketClient0;
7172
}
72-
webSocketClient0 = WebSocketClient.builder()
73-
.factory(armeriaHttpClientBuilder.clientFactory(true))
74-
.aggregateContinuation(true)
75-
.build();
73+
final WebSocketClientBuilder webSocketClientBuilder =
74+
WebSocketClient.builder()
75+
.factory(armeriaHttpClientBuilder.clientFactory(true))
76+
.aggregateContinuation(true);
77+
armeriaHttpClientBuilder.getClientFactory().additionalWebSocketConfig(webSocketClientBuilder);
78+
webSocketClient0 = webSocketClientBuilder.build();
7679
this.webSocketClient = webSocketClient0;
7780
return webSocketClient0;
7881
} finally {

0 commit comments

Comments
 (0)