Skip to content

Commit 9bb3f07

Browse files
authored
feat: enable configuring a handler to listen to informers stopping (#1493)
1 parent 58b2e63 commit 9bb3f07

File tree

12 files changed

+211
-46
lines changed

12 files changed

+211
-46
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

+3
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,7 @@ default Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
148148
return Optional.empty();
149149
}
150150

151+
default Optional<InformerStoppedHandler> getInformerStoppedHandler() {
152+
return Optional.empty();
153+
}
151154
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java

+12
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class ConfigurationServiceOverrider {
2626
private ExecutorService executorService;
2727
private ExecutorService workflowExecutorService;
2828
private LeaderElectionConfiguration leaderElectionConfiguration;
29+
private InformerStoppedHandler informerStoppedHandler;
2930

3031
ConfigurationServiceOverrider(ConfigurationService original) {
3132
this.original = original;
@@ -93,6 +94,11 @@ public ConfigurationServiceOverrider withLeaderElectionConfiguration(
9394
return this;
9495
}
9596

97+
public ConfigurationServiceOverrider withInformerStoppedHandler(InformerStoppedHandler handler) {
98+
this.informerStoppedHandler = handler;
99+
return this;
100+
}
101+
96102
public ConfigurationService build() {
97103
return new BaseConfigurationService(original.getVersion(), cloner, objectMapper) {
98104
@Override
@@ -159,6 +165,12 @@ public Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
159165
return leaderElectionConfiguration != null ? Optional.of(leaderElectionConfiguration)
160166
: original.getLeaderElectionConfiguration();
161167
}
168+
169+
@Override
170+
public Optional<InformerStoppedHandler> getInformerStoppedHandler() {
171+
return informerStoppedHandler != null ? Optional.of(informerStoppedHandler)
172+
: original.getInformerStoppedHandler();
173+
}
162174
};
163175
}
164176

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.javaoperatorsdk.operator.api.config;
2+
3+
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
4+
5+
public interface InformerStoppedHandler {
6+
7+
@SuppressWarnings("rawtypes")
8+
void onStop(SharedIndexInformer informer, Throwable ex);
9+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java

+22
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.fabric8.kubernetes.client.informers.cache.Cache;
1414
import io.javaoperatorsdk.operator.OperatorException;
1515
import io.javaoperatorsdk.operator.ReconcilerUtils;
16+
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
1617
import io.javaoperatorsdk.operator.processing.LifecycleAware;
1718
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1819
import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache;
@@ -32,6 +33,27 @@ public InformerWrapper(SharedIndexInformer<T> informer) {
3233
public void start() throws OperatorException {
3334
try {
3435
informer.run();
36+
37+
// register stopped handler if we have one defined
38+
ConfigurationServiceProvider.instance().getInformerStoppedHandler()
39+
.ifPresent(ish -> {
40+
final var stopped = informer.stopped();
41+
if (stopped != null) {
42+
stopped.handle((res, ex) -> {
43+
ish.onStop(informer, ex);
44+
return null;
45+
});
46+
} else {
47+
final var apiTypeClass = informer.getApiTypeClass();
48+
final var fullResourceName =
49+
HasMetadata.getFullResourceName(apiTypeClass);
50+
final var version = HasMetadata.getVersion(apiTypeClass);
51+
throw new IllegalStateException(
52+
"Cannot retrieve 'stopped' callback to listen to informer stopping for informer for "
53+
+ fullResourceName + "/" + version);
54+
}
55+
});
56+
3557
} catch (Exception e) {
3658
ReconcilerUtils.handleKubernetesClientException(e,
3759
HasMetadata.getFullResourceName(informer.getApiTypeClass()));

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/MockKubernetesClient.java

+22
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package io.javaoperatorsdk.operator;
22

3+
import java.util.concurrent.CompletableFuture;
4+
import java.util.function.Consumer;
5+
36
import io.fabric8.kubernetes.api.model.HasMetadata;
47
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
58
import io.fabric8.kubernetes.client.GenericKubernetesClient;
@@ -14,12 +17,18 @@
1417
import static org.mockito.ArgumentMatchers.anyLong;
1518
import static org.mockito.ArgumentMatchers.anyString;
1619
import static org.mockito.ArgumentMatchers.nullable;
20+
import static org.mockito.Mockito.doAnswer;
1721
import static org.mockito.Mockito.mock;
1822
import static org.mockito.Mockito.when;
1923

2024
public class MockKubernetesClient {
2125

2226
public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz) {
27+
return client(clazz, null);
28+
}
29+
30+
public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz,
31+
Consumer<Void> informerRunBehavior) {
2332
final var client = mock(GenericKubernetesClient.class);
2433
MixedOperation<T, KubernetesResourceList<T>, Resource<T>> resources =
2534
mock(MixedOperation.class);
@@ -34,6 +43,19 @@ public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz) {
3443
when(resources.inAnyNamespace()).thenReturn(inAnyNamespace);
3544
when(inAnyNamespace.withLabelSelector(nullable(String.class))).thenReturn(filterable);
3645
SharedIndexInformer<T> informer = mock(SharedIndexInformer.class);
46+
CompletableFuture<Void> stopped = new CompletableFuture<>();
47+
when(informer.stopped()).thenReturn(stopped);
48+
if (informerRunBehavior != null) {
49+
doAnswer(invocation -> {
50+
try {
51+
informerRunBehavior.accept(null);
52+
} catch (Exception e) {
53+
stopped.completeExceptionally(e);
54+
}
55+
return null;
56+
}).when(informer).run();
57+
}
58+
doAnswer(invocation -> null).when(informer).stop();
3759
Indexer mockIndexer = mock(Indexer.class);
3860
when(informer.getIndexer()).thenReturn(mockIndexer);
3961
when(filterable.runnableInformer(anyLong())).thenReturn(informer);

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java

+4
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ public <R extends HasMetadata> R clone(R object) {
112112
.withTerminationTimeoutSeconds(100)
113113
.withMetrics(new Metrics() {})
114114
.withLeaderElectionConfiguration(new LeaderElectionConfiguration("newLease", "newLeaseNS"))
115+
.withInformerStoppedHandler((informer, ex) -> {
116+
})
115117
.build();
116118

117119
assertNotEquals(config.closeClientOnStop(), overridden.closeClientOnStop());
@@ -128,5 +130,7 @@ public <R extends HasMetadata> R clone(R object) {
128130
assertNotEquals(config.getObjectMapper(), overridden.getObjectMapper());
129131
assertNotEquals(config.getLeaderElectionConfiguration(),
130132
overridden.getLeaderElectionConfiguration());
133+
assertNotEquals(config.getInformerStoppedHandler(),
134+
overridden.getLeaderElectionConfiguration());
131135
}
132136
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

+25-19
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,9 @@
99
import io.fabric8.kubernetes.api.model.ObjectMeta;
1010
import io.fabric8.kubernetes.api.model.apps.Deployment;
1111
import io.fabric8.kubernetes.client.KubernetesClient;
12-
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
13-
import io.fabric8.kubernetes.client.dsl.FilterWatchListMultiDeletable;
14-
import io.fabric8.kubernetes.client.dsl.MixedOperation;
15-
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
16-
import io.fabric8.kubernetes.client.informers.cache.Indexer;
12+
import io.javaoperatorsdk.operator.MockKubernetesClient;
13+
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
14+
import io.javaoperatorsdk.operator.api.config.InformerStoppedHandler;
1715
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
1816
import io.javaoperatorsdk.operator.processing.event.EventHandler;
1917
import io.javaoperatorsdk.operator.processing.event.ResourceID;
@@ -22,6 +20,8 @@
2220

2321
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
2422
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.ArgumentMatchers.eq;
24+
import static org.mockito.Mockito.atLeastOnce;
2525
import static org.mockito.Mockito.mock;
2626
import static org.mockito.Mockito.never;
2727
import static org.mockito.Mockito.times;
@@ -36,28 +36,15 @@ class InformerEventSourceTest {
3636
private static final String NEXT_RESOURCE_VERSION = "2";
3737

3838
private InformerEventSource<Deployment, TestCustomResource> informerEventSource;
39-
private final KubernetesClient clientMock = mock(KubernetesClient.class);
39+
private final KubernetesClient clientMock = MockKubernetesClient.client(Deployment.class);
4040
private final TemporaryResourceCache<Deployment> temporaryResourceCacheMock =
4141
mock(TemporaryResourceCache.class);
4242
private final EventHandler eventHandlerMock = mock(EventHandler.class);
43-
private final MixedOperation crClientMock = mock(MixedOperation.class);
44-
private final FilterWatchListMultiDeletable specificResourceClientMock =
45-
mock(FilterWatchListMultiDeletable.class);
46-
private final FilterWatchListDeletable labeledResourceClientMock =
47-
mock(FilterWatchListDeletable.class);
48-
private final SharedIndexInformer informer = mock(SharedIndexInformer.class);
4943
private final InformerConfiguration<Deployment> informerConfiguration =
5044
mock(InformerConfiguration.class);
5145

5246
@BeforeEach
5347
void setup() {
54-
when(clientMock.resources(any())).thenReturn(crClientMock);
55-
when(crClientMock.inAnyNamespace()).thenReturn(specificResourceClientMock);
56-
when(specificResourceClientMock.withLabelSelector((String) null))
57-
.thenReturn(labeledResourceClientMock);
58-
when(labeledResourceClientMock.runnableInformer(0)).thenReturn(informer);
59-
when(informer.getIndexer()).thenReturn(mock(Indexer.class));
60-
6148
when(informerConfiguration.getEffectiveNamespaces())
6249
.thenReturn(DEFAULT_NAMESPACES_SET);
6350
when(informerConfiguration.getSecondaryToPrimaryMapper())
@@ -256,6 +243,25 @@ void filtersOnDeleteEvents() {
256243
verify(eventHandlerMock, never()).handleEvent(any());
257244
}
258245

246+
@Test
247+
void informerStoppedHandlerShouldBeCalledWhenInformerStops() {
248+
try {
249+
final var exception = new RuntimeException("Informer stopped exceptionally!");
250+
final var informerStoppedHandler = mock(InformerStoppedHandler.class);
251+
ConfigurationServiceProvider
252+
.overrideCurrent(
253+
overrider -> overrider.withInformerStoppedHandler(informerStoppedHandler));
254+
informerEventSource = new InformerEventSource<>(informerConfiguration,
255+
MockKubernetesClient.client(Deployment.class, unused -> {
256+
throw exception;
257+
}));
258+
informerEventSource.start();
259+
verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception));
260+
} finally {
261+
ConfigurationServiceProvider.reset();
262+
}
263+
}
264+
259265
Deployment testDeployment() {
260266
Deployment deployment = new Deployment();
261267
deployment.setMetadata(new ObjectMeta());

operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/AbstractOperatorExtension.java

+4-10
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.Locale;
88
import java.util.UUID;
99
import java.util.concurrent.TimeUnit;
10+
import java.util.function.Consumer;
1011

1112
import org.awaitility.Awaitility;
1213
import org.junit.jupiter.api.extension.*;
@@ -22,7 +23,7 @@
2223
import io.fabric8.kubernetes.client.dsl.Resource;
2324
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil;
2425
import io.fabric8.kubernetes.client.utils.Utils;
25-
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
26+
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
2627
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
2728

2829
public abstract class AbstractOperatorExtension implements HasKubernetesClient,
@@ -35,7 +36,6 @@ public abstract class AbstractOperatorExtension implements HasKubernetesClient,
3536
public static final int CRD_READY_WAIT = 2000;
3637

3738
private final KubernetesClient kubernetesClient = new DefaultKubernetesClient();
38-
protected final ConfigurationService configurationService;
3939
protected final List<HasMetadata> infrastructure;
4040
protected Duration infrastructureTimeout;
4141
protected final boolean oneNamespacePerClass;
@@ -45,14 +45,11 @@ public abstract class AbstractOperatorExtension implements HasKubernetesClient,
4545
protected String namespace;
4646

4747
protected AbstractOperatorExtension(
48-
ConfigurationService configurationService,
4948
List<HasMetadata> infrastructure,
5049
Duration infrastructureTimeout,
5150
boolean oneNamespacePerClass,
5251
boolean preserveNamespaceOnError,
5352
boolean waitForNamespaceDeletion) {
54-
55-
this.configurationService = configurationService;
5653
this.infrastructure = infrastructure;
5754
this.infrastructureTimeout = infrastructureTimeout;
5855
this.oneNamespacePerClass = oneNamespacePerClass;
@@ -214,16 +211,13 @@ protected void deleteOperator() {
214211

215212
@SuppressWarnings("unchecked")
216213
public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
217-
protected ConfigurationService configurationService;
218214
protected final List<HasMetadata> infrastructure;
219215
protected Duration infrastructureTimeout;
220216
protected boolean preserveNamespaceOnError;
221217
protected boolean waitForNamespaceDeletion;
222218
protected boolean oneNamespacePerClass;
223219

224220
protected AbstractBuilder() {
225-
this.configurationService = ConfigurationServiceProvider.instance();
226-
227221
this.infrastructure = new ArrayList<>();
228222
this.infrastructureTimeout = Duration.ofMinutes(1);
229223

@@ -255,8 +249,8 @@ public T oneNamespacePerClass(boolean value) {
255249
return (T) this;
256250
}
257251

258-
public T withConfigurationService(ConfigurationService value) {
259-
configurationService = value;
252+
public T withConfigurationService(Consumer<ConfigurationServiceOverrider> overrider) {
253+
ConfigurationServiceProvider.overrideCurrent(overrider);
260254
return (T) this;
261255
}
262256

operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/ClusterDeployedOperatorExtension.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import io.fabric8.kubernetes.api.model.HasMetadata;
2020
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding;
21-
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
2221

2322
public class ClusterDeployedOperatorExtension extends AbstractOperatorExtension {
2423

@@ -29,15 +28,14 @@ public class ClusterDeployedOperatorExtension extends AbstractOperatorExtension
2928
private final Duration operatorDeploymentTimeout;
3029

3130
private ClusterDeployedOperatorExtension(
32-
ConfigurationService configurationService,
3331
List<HasMetadata> operatorDeployment,
3432
Duration operatorDeploymentTimeout,
3533
List<HasMetadata> infrastructure,
3634
Duration infrastructureTimeout,
3735
boolean preserveNamespaceOnError,
3836
boolean waitForNamespaceDeletion,
3937
boolean oneNamespacePerClass) {
40-
super(configurationService, infrastructure, infrastructureTimeout, oneNamespacePerClass,
38+
super(infrastructure, infrastructureTimeout, oneNamespacePerClass,
4139
preserveNamespaceOnError,
4240
waitForNamespaceDeletion);
4341
this.operatorDeployment = operatorDeployment;
@@ -137,7 +135,6 @@ public Builder withOperatorDeployment(HasMetadata... hms) {
137135

138136
public ClusterDeployedOperatorExtension build() {
139137
return new ClusterDeployedOperatorExtension(
140-
configurationService,
141138
operatorDeployment,
142139
deploymentTimeout,
143140
infrastructure,

0 commit comments

Comments
 (0)