Skip to content

Commit b81a192

Browse files
authored
feat: expose event source metadata (#1617)
* feat: early initialization of event sources * feat: expose event source metadata * fix: properly return underlying EventSource when needed, add tests * fix: remove use of deprecated method, add javadoc * fix: remove dependent metadata on EventSourceMetadata * fix: throw instead of return null (which shouldn't actually occur here) * fix: do not expose EventSource on EventSourceMetadata
1 parent a7e3ace commit b81a192

File tree

11 files changed

+298
-61
lines changed

11 files changed

+298
-61
lines changed

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public Optional<RetryInfo> getRetryInfo() {
3434

3535
@Override
3636
public <T> Set<T> getSecondaryResources(Class<T> expectedType) {
37-
return controller.getEventSourceManager().getEventSourcesFor(expectedType).stream()
37+
return controller.getEventSourceManager().getResourceEventSourcesFor(expectedType).stream()
3838
.map(es -> es.getSecondaryResources(primaryResource))
3939
.flatMap(Set::stream)
4040
.collect(Collectors.toSet());

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,14 @@ public Controller(Reconciler<P> reconciler,
8383
isCleaner = reconciler instanceof Cleaner;
8484
managedWorkflow =
8585
ManagedWorkflow.workflowFor(kubernetesClient, configuration.getDependentResources());
86+
8687
eventSourceManager = new EventSourceManager<>(this);
8788
eventProcessor = new EventProcessor<>(eventSourceManager);
8889
eventSourceManager.postProcessDefaultEventSourcesAfterProcessorInitializer();
90+
91+
final var context = new EventSourceContext<>(
92+
eventSourceManager.getControllerResourceEventSource(), configuration, kubernetesClient);
93+
initAndRegisterEventSources(context);
8994
}
9095

9196
@Override
@@ -241,8 +246,7 @@ public void initAndRegisterEventSources(EventSourceContext<P> context) {
241246
.map(EventSourceReferencer.class::cast)
242247
.forEach(dr -> {
243248
try {
244-
((EventSourceReferencer<P>) dr)
245-
.resolveEventSource(eventSourceManager);
249+
((EventSourceReferencer<P>) dr).resolveEventSource(eventSourceManager);
246250
} catch (EventSourceNotFoundException e) {
247251
unresolvable.computeIfAbsent(e.getEventSourceName(), s -> new ArrayList<>()).add(dr);
248252
}
@@ -318,10 +322,6 @@ public synchronized void start(boolean startEventProcessor) throws OperatorExcep
318322
try {
319323
// check that the custom resource is known by the cluster if configured that way
320324
validateCRDWithLocalModelIfRequired(resClass, controllerName, crdName, specVersion);
321-
final var context = new EventSourceContext<>(
322-
eventSourceManager.getControllerResourceEventSource(), configuration, kubernetesClient);
323-
324-
initAndRegisterEventSources(context);
325325
eventSourceManager.start();
326326
if (startEventProcessor) {
327327
eventProcessor.start();

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

+31-19
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.Objects;
66
import java.util.Set;
77
import java.util.stream.Collectors;
8+
import java.util.stream.Stream;
89

910
import org.slf4j.Logger;
1011
import org.slf4j.LoggerFactory;
@@ -126,8 +127,9 @@ public final synchronized void registerEventSource(String name, EventSource even
126127
if (name == null || name.isBlank()) {
127128
name = EventSourceInitializer.generateNameFor(eventSource);
128129
}
129-
eventSources.add(name, eventSource);
130-
eventSource.setEventHandler(controller.getEventProcessor());
130+
final var named = new NamedEventSource(eventSource, name);
131+
eventSources.add(named);
132+
named.setEventHandler(controller.getEventProcessor());
131133
} catch (IllegalStateException | MissingCRDException e) {
132134
throw e; // leave untouched
133135
} catch (Exception e) {
@@ -138,22 +140,24 @@ public final synchronized void registerEventSource(String name, EventSource even
138140

139141
@SuppressWarnings("unchecked")
140142
public void broadcastOnResourceEvent(ResourceAction action, P resource, P oldResource) {
141-
eventSources.additionalNamedEventSources().forEach(eventSource -> {
142-
if (eventSource.original() instanceof ResourceEventAware) {
143-
var lifecycleAwareES = ((ResourceEventAware<P>) eventSource.original());
144-
switch (action) {
145-
case ADDED:
146-
lifecycleAwareES.onResourceCreated(resource);
147-
break;
148-
case UPDATED:
149-
lifecycleAwareES.onResourceUpdated(resource, oldResource);
150-
break;
151-
case DELETED:
152-
lifecycleAwareES.onResourceDeleted(resource);
153-
break;
154-
}
155-
}
156-
});
143+
eventSources.additionalNamedEventSources()
144+
.map(NamedEventSource::original)
145+
.forEach(source -> {
146+
if (source instanceof ResourceEventAware) {
147+
var lifecycleAwareES = ((ResourceEventAware<P>) source);
148+
switch (action) {
149+
case ADDED:
150+
lifecycleAwareES.onResourceCreated(resource);
151+
break;
152+
case UPDATED:
153+
lifecycleAwareES.onResourceUpdated(resource, oldResource);
154+
break;
155+
case DELETED:
156+
lifecycleAwareES.onResourceDeleted(resource);
157+
break;
158+
}
159+
}
160+
});
157161
}
158162

159163
public void changeNamespaces(Set<String> namespaces) {
@@ -174,6 +178,11 @@ public Set<EventSource> getRegisteredEventSources() {
174178
.collect(Collectors.toCollection(LinkedHashSet::new));
175179
}
176180

181+
@SuppressWarnings("unused")
182+
public Stream<? extends EventSourceMetadata> getNamedEventSourcesStream() {
183+
return eventSources.flatMappedSources();
184+
}
185+
177186
public ControllerResourceEventSource<P> getControllerResourceEventSource() {
178187
return eventSources.controllerResourceEventSource();
179188
}
@@ -187,9 +196,12 @@ public <R> List<ResourceEventSource<R, P>> getResourceEventSourcesFor(Class<R> d
187196
return eventSources.getEventSources(dependentType);
188197
}
189198

199+
/**
200+
* @deprecated Use {@link #getResourceEventSourceFor(Class)} instead
201+
*/
190202
@Deprecated
191203
public <R> List<ResourceEventSource<R, P>> getEventSourcesFor(Class<R> dependentType) {
192-
return eventSources.getEventSources(dependentType);
204+
return getResourceEventSourcesFor(dependentType);
193205
}
194206

195207
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.javaoperatorsdk.operator.processing.event;
2+
3+
import java.util.Optional;
4+
5+
public interface EventSourceMetadata {
6+
String name();
7+
8+
Class<?> type();
9+
10+
Optional<Class<?>> resourceType();
11+
12+
Optional<?> configuration();
13+
}

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java

+40-20
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3-
import java.util.*;
3+
import java.util.Collections;
4+
import java.util.HashMap;
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.Objects;
48
import java.util.concurrent.ConcurrentNavigableMap;
59
import java.util.concurrent.ConcurrentSkipListMap;
610
import java.util.stream.Collectors;
@@ -20,15 +24,14 @@ class EventSources<R extends HasMetadata> {
2024
public static final String RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME =
2125
"RetryAndRescheduleTimerEventSource";
2226

23-
private final ConcurrentNavigableMap<String, Map<String, EventSource>> sources =
27+
private final ConcurrentNavigableMap<String, Map<String, NamedEventSource>> sources =
2428
new ConcurrentSkipListMap<>();
2529
private final TimerEventSource<R> retryAndRescheduleTimerEventSource = new TimerEventSource<>();
2630
private ControllerResourceEventSource<R> controllerResourceEventSource;
2731

2832

29-
ControllerResourceEventSource<R> initControllerEventSource(Controller<R> controller) {
33+
void initControllerEventSource(Controller<R> controller) {
3034
controllerResourceEventSource = new ControllerResourceEventSource<>(controller);
31-
return controllerResourceEventSource;
3235
}
3336

3437
ControllerResourceEventSource<R> controllerResourceEventSource() {
@@ -49,7 +52,7 @@ public Stream<NamedEventSource> additionalNamedEventSources() {
4952
Stream<EventSource> additionalEventSources() {
5053
return Stream.concat(
5154
Stream.of(retryEventSource()).filter(Objects::nonNull),
52-
sources.values().stream().flatMap(c -> c.values().stream()));
55+
flatMappedSources().map(NamedEventSource::original));
5356
}
5457

5558
NamedEventSource namedControllerResourceEventSource() {
@@ -58,29 +61,32 @@ NamedEventSource namedControllerResourceEventSource() {
5861
}
5962

6063
Stream<NamedEventSource> flatMappedSources() {
61-
return sources.values().stream().flatMap(c -> c.entrySet().stream()
62-
.map(esEntry -> new NamedEventSource(esEntry.getValue(), esEntry.getKey())));
64+
return sources.values().stream().flatMap(c -> c.values().stream());
6365
}
6466

6567
public void clear() {
6668
sources.clear();
6769
}
6870

69-
public boolean contains(String name, EventSource source) {
71+
private NamedEventSource existing(String name, EventSource source) {
7072
final var eventSources = sources.get(keyFor(source));
7173
if (eventSources == null || eventSources.isEmpty()) {
72-
return false;
74+
return null;
7375
}
74-
return eventSources.containsKey(name);
76+
return eventSources.get(name);
7577
}
7678

77-
public void add(String name, EventSource eventSource) {
78-
if (contains(name, eventSource)) {
79-
throw new IllegalArgumentException("An event source is already registered for the "
80-
+ keyAsString(getResourceType(eventSource), name)
79+
public void add(NamedEventSource eventSource) {
80+
final var name = eventSource.name();
81+
final var original = eventSource.original();
82+
final var existing = existing(name, original);
83+
if (existing != null && !eventSource.equals(existing)) {
84+
throw new IllegalArgumentException("Event source " + existing.original()
85+
+ " is already registered for the "
86+
+ keyAsString(getResourceType(original), name)
8187
+ " class/name combination");
8288
}
83-
sources.computeIfAbsent(keyFor(eventSource), k -> new HashMap<>()).put(name, eventSource);
89+
sources.computeIfAbsent(keyFor(original), k -> new HashMap<>()).put(name, eventSource);
8490
}
8591

8692
@SuppressWarnings("rawtypes")
@@ -91,6 +97,10 @@ private Class<?> getResourceType(EventSource source) {
9197
}
9298

9399
private String keyFor(EventSource source) {
100+
if (source instanceof NamedEventSource) {
101+
source = ((NamedEventSource) source).original();
102+
}
103+
94104
return keyFor(getResourceType(source));
95105
}
96106

@@ -100,16 +110,20 @@ private String keyFor(Class<?> dependentType) {
100110

101111
@SuppressWarnings("unchecked")
102112
public <S> ResourceEventSource<S, R> get(Class<S> dependentType, String name) {
113+
if (dependentType == null) {
114+
throw new IllegalArgumentException("Must pass a dependent type to retrieve event sources");
115+
}
116+
103117
final var sourcesForType = sources.get(keyFor(dependentType));
104118
if (sourcesForType == null || sourcesForType.isEmpty()) {
105119
throw new IllegalArgumentException(
106120
"There is no event source found for class:" + dependentType.getName());
107121
}
108122

109123
final var size = sourcesForType.size();
110-
final EventSource source;
124+
NamedEventSource source;
111125
if (size == 1 && name == null) {
112-
source = sourcesForType.values().stream().findFirst().orElse(null);
126+
source = sourcesForType.values().stream().findFirst().orElseThrow();
113127
} else {
114128
if (name == null || name.isBlank()) {
115129
throw new IllegalArgumentException("There are multiple EventSources registered for type "
@@ -125,15 +139,16 @@ public <S> ResourceEventSource<S, R> get(Class<S> dependentType, String name) {
125139
}
126140
}
127141

128-
if (!(source instanceof ResourceEventSource)) {
142+
EventSource original = source.original();
143+
if (!(original instanceof ResourceEventSource)) {
129144
throw new IllegalArgumentException(source + " associated with "
130145
+ keyAsString(dependentType, name) + " is not a "
131146
+ ResourceEventSource.class.getSimpleName());
132147
}
133-
final var res = (ResourceEventSource<S, R>) source;
148+
final var res = (ResourceEventSource<S, R>) original;
134149
final var resourceClass = res.resourceType();
135150
if (!resourceClass.isAssignableFrom(dependentType)) {
136-
throw new IllegalArgumentException(source + " associated with "
151+
throw new IllegalArgumentException(original + " associated with "
137152
+ keyAsString(dependentType, name)
138153
+ " is handling " + resourceClass.getName() + " resources but asked for "
139154
+ dependentType.getName());
@@ -151,7 +166,12 @@ private String keyAsString(Class dependentType, String name) {
151166
@SuppressWarnings("unchecked")
152167
public <S> List<ResourceEventSource<S, R>> getEventSources(Class<S> dependentType) {
153168
final var sourcesForType = sources.get(keyFor(dependentType));
169+
if (sourcesForType == null) {
170+
return Collections.emptyList();
171+
}
172+
154173
return sourcesForType.values().stream()
174+
.map(NamedEventSource::original)
155175
.filter(ResourceEventSource.class::isInstance)
156176
.map(es -> (ResourceEventSource<S, R>) es)
157177
.collect(Collectors.toList());

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java

+33-1
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

33
import java.util.Objects;
4+
import java.util.Optional;
45

56
import io.javaoperatorsdk.operator.OperatorException;
7+
import io.javaoperatorsdk.operator.processing.event.source.Configurable;
68
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
79
import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;
10+
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
811

9-
class NamedEventSource implements EventSource {
12+
class NamedEventSource implements EventSource, EventSourceMetadata {
1013

1114
private final EventSource original;
1215
private final String name;
@@ -35,6 +38,35 @@ public String name() {
3538
return name;
3639
}
3740

41+
@Override
42+
public Class<?> type() {
43+
return original.getClass();
44+
}
45+
46+
@Override
47+
@SuppressWarnings({"rawtypes", "unchecked"})
48+
public Optional<Class<?>> resourceType() {
49+
if (original instanceof ResourceEventSource) {
50+
ResourceEventSource resourceEventSource = (ResourceEventSource) original;
51+
return Optional.of(resourceEventSource.resourceType());
52+
}
53+
return Optional.empty();
54+
}
55+
56+
@Override
57+
@SuppressWarnings("rawtypes")
58+
public Optional<?> configuration() {
59+
if (original instanceof Configurable) {
60+
Configurable configurable = (Configurable) original;
61+
return Optional.ofNullable(configurable.configuration());
62+
}
63+
return Optional.empty();
64+
}
65+
66+
public EventSource eventSource() {
67+
return original;
68+
}
69+
3870
@Override
3971
public String toString() {
4072
return original + " named: '" + name + "'}";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package io.javaoperatorsdk.operator.processing.event.source;
2+
3+
public interface Configurable<C> {
4+
C configuration();
5+
}

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package io.javaoperatorsdk.operator.processing.event.source.informer;
22

3-
import java.util.*;
3+
import java.util.HashMap;
4+
import java.util.List;
5+
import java.util.Map;
6+
import java.util.Optional;
7+
import java.util.Set;
48
import java.util.concurrent.ConcurrentHashMap;
59
import java.util.function.Function;
610
import java.util.function.Predicate;
@@ -72,6 +76,10 @@ void initSources(MixedOperation<T, KubernetesResourceList<T>, Resource<T>> clien
7276
}
7377
}
7478

79+
C configuration() {
80+
return configuration;
81+
}
82+
7583
public void changeNamespaces(Set<String> namespaces) {
7684
var sourcesToRemove = sources.keySet().stream()
7785
.filter(k -> !namespaces.contains(k)).collect(Collectors.toSet());

0 commit comments

Comments
 (0)