Skip to content

Commit adb0963

Browse files
authored
fix: run event source start on specific thread pool (#1606)
Fixes #1603
1 parent b81a192 commit adb0963

File tree

8 files changed

+96
-26
lines changed

8 files changed

+96
-26
lines changed

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java

+39-9
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@
55
import java.util.concurrent.Callable;
66
import java.util.concurrent.ExecutionException;
77
import java.util.concurrent.ExecutorService;
8+
import java.util.concurrent.Executors;
89
import java.util.concurrent.Future;
910
import java.util.concurrent.TimeUnit;
1011
import java.util.concurrent.TimeoutException;
12+
import java.util.function.Function;
13+
import java.util.stream.Collectors;
14+
import java.util.stream.Stream;
1115

1216
import org.slf4j.Logger;
1317
import org.slf4j.LoggerFactory;
@@ -60,6 +64,31 @@ public synchronized static ExecutorServiceManager instance() {
6064
return instance;
6165
}
6266

67+
public static <T> void executeAndWaitForAllToComplete(Stream<T> stream,
68+
Function<T, Void> task, Function<T, String> threadNamer) {
69+
final var instrumented = new InstrumentedExecutorService(
70+
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
71+
72+
try {
73+
instrumented.invokeAll(stream.parallel().map(item -> (Callable<Void>) () -> {
74+
// change thread name for easier debugging
75+
final var thread = Thread.currentThread();
76+
final var name = thread.getName();
77+
thread.setName(threadNamer.apply(item));
78+
try {
79+
task.apply(item);
80+
return null;
81+
} finally {
82+
// restore original name
83+
thread.setName(name);
84+
}
85+
}).collect(Collectors.toList()));
86+
shutdown(instrumented);
87+
} catch (InterruptedException e) {
88+
Thread.currentThread().interrupt();
89+
}
90+
}
91+
6392
public ExecutorService executorService() {
6493
return executor;
6594
}
@@ -71,17 +100,18 @@ public ExecutorService workflowExecutorService() {
71100
private void doStop() {
72101
try {
73102
log.debug("Closing executor");
74-
executor.shutdown();
75-
workflowExecutor.shutdown();
76-
if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
77-
workflowExecutor.shutdownNow(); // if we timed out, waiting, cancel everything
78-
}
79-
if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
80-
executor.shutdownNow(); // if we timed out, waiting, cancel everything
81-
}
82-
103+
shutdown(executor);
104+
shutdown(workflowExecutor);
83105
} catch (InterruptedException e) {
84106
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
107+
Thread.currentThread().interrupt();
108+
}
109+
}
110+
111+
private static void shutdown(ExecutorService executorService) throws InterruptedException {
112+
executorService.shutdown();
113+
if (!executorService.awaitTermination(instance().terminationTimeoutSeconds, TimeUnit.SECONDS)) {
114+
executorService.shutdownNow(); // if we timed out, waiting, cancel everything
85115
}
86116
}
87117

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -360,8 +360,8 @@ public void changeNamespaces(Set<String> namespaces) {
360360
}
361361

362362
public synchronized void startEventProcessing() {
363-
log.info("Started event processing for controller: {}", configuration.getName());
364363
eventProcessor.start();
364+
log.info("Started event processing for controller: {}", configuration.getName());
365365
}
366366

367367
private void throwMissingCRDException(String crdName, String specVersion, String controllerName) {

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

+30-10
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.util.List;
55
import java.util.Objects;
66
import java.util.Set;
7+
import java.util.function.Function;
78
import java.util.stream.Collectors;
89
import java.util.stream.Stream;
910

@@ -13,6 +14,7 @@
1314
import io.fabric8.kubernetes.api.model.HasMetadata;
1415
import io.javaoperatorsdk.operator.MissingCRDException;
1516
import io.javaoperatorsdk.operator.OperatorException;
17+
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
1618
import io.javaoperatorsdk.operator.api.config.NamespaceChangeable;
1719
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
1820
import io.javaoperatorsdk.operator.processing.Controller;
@@ -65,20 +67,36 @@ public void postProcessDefaultEventSourcesAfterProcessorInitializer() {
6567
@Override
6668
public synchronized void start() {
6769
startEventSource(eventSources.namedControllerResourceEventSource());
68-
eventSources.additionalNamedEventSources()
69-
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
70-
.parallel().forEach(this::startEventSource);
71-
eventSources.additionalNamedEventSources()
72-
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT))
73-
.parallel().forEach(this::startEventSource);
70+
71+
ExecutorServiceManager.executeAndWaitForAllToComplete(
72+
eventSources.additionalNamedEventSources()
73+
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)),
74+
this::startEventSource,
75+
getThreadNamer("start"));
76+
77+
ExecutorServiceManager.executeAndWaitForAllToComplete(
78+
eventSources.additionalNamedEventSources()
79+
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)),
80+
this::startEventSource,
81+
getThreadNamer("start"));
82+
}
83+
84+
private static Function<NamedEventSource, String> getThreadNamer(String stage) {
85+
return es -> {
86+
final var name = es.name();
87+
return es.priority() + " " + stage + " -> "
88+
+ (es.isNameSet() ? name + " " + es.original().getClass().getSimpleName() : name);
89+
};
7490
}
7591

7692
@Override
7793
public synchronized void stop() {
7894
stopEventSource(eventSources.namedControllerResourceEventSource());
79-
eventSources.additionalNamedEventSources().parallel().forEach(this::stopEventSource);
95+
ExecutorServiceManager.executeAndWaitForAllToComplete(
96+
eventSources.additionalNamedEventSources(),
97+
this::stopEventSource,
98+
getThreadNamer("stop"));
8099
eventSources.clear();
81-
82100
}
83101

84102
@SuppressWarnings("rawtypes")
@@ -94,7 +112,7 @@ private void logEventSourceEvent(NamedEventSource eventSource, String event) {
94112
}
95113
}
96114

97-
private void startEventSource(NamedEventSource eventSource) {
115+
private Void startEventSource(NamedEventSource eventSource) {
98116
try {
99117
logEventSourceEvent(eventSource, "Starting");
100118
eventSource.start();
@@ -104,16 +122,18 @@ private void startEventSource(NamedEventSource eventSource) {
104122
} catch (Exception e) {
105123
throw new OperatorException("Couldn't start source " + eventSource.name(), e);
106124
}
125+
return null;
107126
}
108127

109-
private void stopEventSource(NamedEventSource eventSource) {
128+
private Void stopEventSource(NamedEventSource eventSource) {
110129
try {
111130
logEventSourceEvent(eventSource, "Stopping");
112131
eventSource.stop();
113132
logEventSourceEvent(eventSource, "Stopped");
114133
} catch (Exception e) {
115134
log.warn("Error closing {} -> {}", eventSource.name(), e);
116135
}
136+
return null;
117137
}
118138

119139
public final void registerEventSource(EventSource eventSource) throws OperatorException {

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java

+7
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.util.Optional;
55

66
import io.javaoperatorsdk.operator.OperatorException;
7+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
78
import io.javaoperatorsdk.operator.processing.event.source.Configurable;
89
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
910
import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;
@@ -13,10 +14,12 @@ class NamedEventSource implements EventSource, EventSourceMetadata {
1314

1415
private final EventSource original;
1516
private final String name;
17+
private final boolean nameSet;
1618

1719
NamedEventSource(EventSource original, String name) {
1820
this.original = original;
1921
this.name = name;
22+
nameSet = !name.equals(EventSourceInitializer.generateNameFor(original));
2023
}
2124

2225
@Override
@@ -95,4 +98,8 @@ public int hashCode() {
9598
public EventSourceStartPriority priority() {
9699
return original.priority();
97100
}
101+
102+
public boolean isNameSet() {
103+
return nameSet;
104+
}
98105
}

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ public class InformerManager<T extends HasMetadata, C extends ResourceConfigurat
4545

4646
@Override
4747
public void start() throws OperatorException {
48-
sources.values().parallelStream().forEach(LifecycleAware::start);
48+
// make sure informers are all started before proceeding further
49+
sources.values().parallelStream().forEach(InformerWrapper::start);
4950
}
5051

5152
void initSources(MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client,
@@ -113,7 +114,6 @@ private InformerWrapper<T> createEventSource(
113114

114115
@Override
115116
public void stop() {
116-
log.info("Stopping {}", this);
117117
sources.forEach((ns, source) -> {
118118
try {
119119
log.debug("Stopping informer for namespace: {} -> {}", ns, source);

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,11 @@ public void start() throws OperatorException {
6464
if (!configService.stopOnInformerErrorDuringStartup()) {
6565
informer.exceptionHandler((b, t) -> !ExceptionHandler.isDeserializationException(t));
6666
}
67+
// change thread name for easier debugging
68+
final var thread = Thread.currentThread();
69+
final var name = thread.getName();
6770
try {
71+
thread.setName(informerInfo() + " " + thread.getId());
6872
var start = informer.start();
6973
// note that in case we don't put here timeout and stopOnInformerErrorDuringStartup is
7074
// false, and there is a rbac issue the get never returns; therefore operator never really
@@ -81,6 +85,9 @@ public void start() throws OperatorException {
8185
} catch (InterruptedException e) {
8286
Thread.currentThread().interrupt();
8387
throw new IllegalStateException(e);
88+
} finally {
89+
// restore original name
90+
thread.setName(name);
8491
}
8592

8693
} catch (Exception e) {
@@ -143,6 +150,10 @@ public List<T> byIndex(String indexName, String indexKey) {
143150

144151
@Override
145152
public String toString() {
146-
return "InformerWrapper [" + versionedFullResourceName() + "] (" + informer + ')';
153+
return informerInfo() + " (" + informer + ')';
154+
}
155+
156+
private String informerInfo() {
157+
return "InformerWrapper [" + versionedFullResourceName() + "]";
147158
}
148159
}

Diff for: operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import org.junit.jupiter.api.Test;
66

7+
import io.fabric8.kubernetes.api.model.ConfigMap;
78
import io.fabric8.kubernetes.api.model.HasMetadata;
89
import io.javaoperatorsdk.operator.MockKubernetesClient;
910
import io.javaoperatorsdk.operator.OperatorException;
@@ -171,9 +172,9 @@ void changesNamespacesOnControllerAndInformerEventSources() {
171172
}
172173

173174
private EventSourceManager initManager() {
174-
final var configuration = MockControllerConfiguration.forResource(HasMetadata.class);
175+
final var configuration = MockControllerConfiguration.forResource(ConfigMap.class);
175176
final Controller controller = new Controller(mock(Reconciler.class), configuration,
176-
MockKubernetesClient.client(HasMetadata.class));
177+
MockKubernetesClient.client(ConfigMap.class));
177178
return new EventSourceManager(controller);
178179
}
179180
}

Diff for: operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.fabric8.kubernetes.api.model.apps.Deployment;
1111
import io.fabric8.kubernetes.client.KubernetesClient;
1212
import io.javaoperatorsdk.operator.MockKubernetesClient;
13+
import io.javaoperatorsdk.operator.OperatorException;
1314
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
1415
import io.javaoperatorsdk.operator.api.config.InformerStoppedHandler;
1516
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
@@ -259,7 +260,7 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() {
259260

260261
// by default informer fails to start if there is an exception in the client on start.
261262
// Throws the exception further.
262-
assertThrows(RuntimeException.class, () -> informerEventSource.start());
263+
assertThrows(OperatorException.class, () -> informerEventSource.start());
263264
verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception));
264265
} finally {
265266
ConfigurationServiceProvider.reset();

0 commit comments

Comments
 (0)