Skip to content

Commit 3a18c51

Browse files
committed
improve watch resource closing logic
1 parent a996bcc commit 3a18c51

File tree

1 file changed

+16
-26
lines changed

1 file changed

+16
-26
lines changed

util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ public class ReflectorRunnable<
5050

5151
private boolean isLastSyncResourceVersionUnavailable;
5252

53-
private Watchable<ApiType> watch;
54-
5553
private ListerWatcher<ApiType, ApiListType> listerWatcher;
5654

5755
private DeltaFIFO store;
@@ -91,16 +89,11 @@ public void run() {
9189
try {
9290
this.lastSyncResourceVersion = initialLoad();
9391
this.isLastSyncResourceVersionUnavailable = false;
94-
92+
Watchable<ApiType> watch = null;
9593
if (log.isDebugEnabled()) {
9694
log.debug("{}#Start watching with {}...", apiTypeClass, lastSyncResourceVersion);
9795
}
98-
while (true) {
99-
if (!isActive.get()) {
100-
closeWatch();
101-
return;
102-
}
103-
96+
while (isActive.get()) {
10497
try {
10598
if (log.isDebugEnabled()) {
10699
log.debug(
@@ -110,21 +103,14 @@ public void run() {
110103
long jitteredWatchTimeoutSeconds =
111104
Double.valueOf(REFLECTOR_WATCH_CLIENTSIDE_TIMEOUT.getSeconds() * (1 + Math.random()))
112105
.longValue();
113-
Watchable<ApiType> newWatch =
106+
watch =
114107
listerWatcher.watch(
115108
new CallGeneratorParams(
116109
Boolean.TRUE,
117110
lastSyncResourceVersion,
118111
Long.valueOf(jitteredWatchTimeoutSeconds).intValue()));
119112

120-
synchronized (this) {
121-
if (!isActive.get()) {
122-
newWatch.close();
123-
continue;
124-
}
125-
watch = newWatch;
126-
}
127-
watchHandler(newWatch);
113+
watchHandler(watch);
128114
} catch (WatchExpiredException e) {
129115
// Watch calls were failed due to expired resource-version. Returning
130116
// to unwind the list-watch loops so that we can respawn a new round
@@ -155,7 +141,13 @@ public void run() {
155141
this.exceptionHandler.accept(apiTypeClass, t);
156142
return;
157143
} finally {
158-
closeWatch();
144+
if (watch != null) {
145+
try {
146+
watch.close();
147+
} catch (IOException e) {
148+
log.warn("{}#Error while closing watcher", this.apiTypeClass, e);
149+
}
150+
}
159151
}
160152
}
161153
} catch (ApiException e) {
@@ -181,13 +173,6 @@ public void stop() {
181173
}
182174
}
183175

184-
private synchronized void closeWatch() throws IOException {
185-
if (watch != null) {
186-
watch.close();
187-
watch = null;
188-
}
189-
}
190-
191176
private String initialLoad() throws ApiException {
192177
ApiListType list =
193178
listerWatcher.list(
@@ -284,6 +269,11 @@ private void watchHandler(Watchable<ApiType> watch) {
284269
log.debug("{}#Receiving resourceVersion {}", apiTypeClass, lastSyncResourceVersion);
285270
}
286271
}
272+
try {
273+
watch.close();
274+
} catch (IOException e) {
275+
log.warn("{}#Error while closing watcher", this.apiTypeClass, e);
276+
}
287277
}
288278

289279
static <ApiType extends KubernetesObject> void defaultWatchErrorHandler(

0 commit comments

Comments
 (0)