Skip to content

Commit f089269

Browse files
authored
fix: polling event source improvements (#901)
1 parent e0f7b10 commit f089269

File tree

2 files changed

+41
-13
lines changed

2 files changed

+41
-13
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java

+35-9
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,35 @@
1111
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1212
import io.javaoperatorsdk.operator.processing.event.source.CachingEventSource;
1313

14+
/**
15+
* <p>
16+
* Pols resource (on contrary to {@link PerResourcePollingEventSource}) not per resource bases but
17+
* instead to calls supplier periodically and independently of the number of state of custom
18+
* resources managed by the operator. It is called on start (synced). This means that when the
19+
* reconciler first time executed on startup a poll already happened before. So if the cache does
20+
* not contain the target resource it means it is not created yet or was deleted while an operator
21+
* was not running.
22+
* </p>
23+
* <p>
24+
* Another caveat with this is if the cached object is checked in the reconciler and created since
25+
* not in the cache it should be manually added to the cache, since it can happen that the
26+
* reconciler is triggered before the cache is propagated with the new resource from a scheduled
27+
* execution. See {@link PollingEventSource##put(ResourceID, Object)}.
28+
* </p>
29+
* So the generic workflow in reconciler should be:
30+
*
31+
* <ul>
32+
* <li>Check if the cache contains the resource.</li>
33+
* <li>If cache contains the resource reconcile it - compare with target state, update if necessary
34+
* </li>
35+
* <li>if cache not contains the resource create it.</li>
36+
* <li>If the resource was created or updated, put the new version of the resource manually to the
37+
* cache.</li>
38+
* </ul>
39+
*
40+
* @param <T> type of the polled resource
41+
* @param <P> primary resource type
42+
*/
1443
public class PollingEventSource<T, P extends HasMetadata> extends CachingEventSource<T, P> {
1544

1645
private static final Logger log = LoggerFactory.getLogger(PollingEventSource.class);
@@ -29,6 +58,7 @@ public PollingEventSource(Supplier<Map<ResourceID, T>> supplier,
2958
@Override
3059
public void start() throws OperatorException {
3160
super.start();
61+
getStateAndFillCache();
3262
timer.schedule(new TimerTask() {
3363
@Override
3464
public void run() {
@@ -47,6 +77,10 @@ protected void getStateAndFillCache() {
4777
cache.keys().filter(e -> !values.containsKey(e)).forEach(super::handleDelete);
4878
}
4979

80+
public void put(ResourceID key, T resource) {
81+
cache.put(key, resource);
82+
}
83+
5084
@Override
5185
public void stop() throws OperatorException {
5286
super.stop();
@@ -61,15 +95,7 @@ public void stop() throws OperatorException {
6195
*/
6296
@Override
6397
public Optional<T> getAssociated(P primary) {
64-
return getValueFromCacheOrSupplier(ResourceID.fromResource(primary));
98+
return getCachedValue(ResourceID.fromResource(primary));
6599
}
66100

67-
public Optional<T> getValueFromCacheOrSupplier(ResourceID resourceID) {
68-
var resource = getCachedValue(resourceID);
69-
if (resource.isPresent()) {
70-
return resource;
71-
}
72-
getStateAndFillCache();
73-
return getCachedValue(resourceID);
74-
}
75101
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,18 @@ class PollingEventSourceTest
2121
AbstractEventSourceTestBase<PollingEventSource<SampleExternalResource, HasMetadata>, EventHandler> {
2222

2323
private Supplier<Map<ResourceID, SampleExternalResource>> supplier = mock(Supplier.class);
24+
private PollingEventSource<SampleExternalResource, HasMetadata> pollingEventSource =
25+
new PollingEventSource<>(supplier, 50, SampleExternalResource.class);
2426

2527
@BeforeEach
2628
public void setup() {
27-
setUpSource(new PollingEventSource<>(supplier, 50, SampleExternalResource.class));
29+
setUpSource(pollingEventSource, false);
2830
}
2931

3032
@Test
3133
public void pollsAndProcessesEvents() throws InterruptedException {
3234
when(supplier.get()).thenReturn(testResponseWithTwoValues());
33-
35+
pollingEventSource.start();
3436
Thread.sleep(100);
3537

3638
verify(eventHandler, times(2)).handleEvent(any());
@@ -40,7 +42,7 @@ public void pollsAndProcessesEvents() throws InterruptedException {
4042
public void propagatesEventForRemovedResources() throws InterruptedException {
4143
when(supplier.get()).thenReturn(testResponseWithTwoValues())
4244
.thenReturn(testResponseWithOneValue());
43-
45+
pollingEventSource.start();
4446
Thread.sleep(150);
4547

4648
verify(eventHandler, times(3)).handleEvent(any());
@@ -49,7 +51,7 @@ public void propagatesEventForRemovedResources() throws InterruptedException {
4951
@Test
5052
public void doesNotPropagateEventIfResourceNotChanged() throws InterruptedException {
5153
when(supplier.get()).thenReturn(testResponseWithTwoValues());
52-
54+
pollingEventSource.start();
5355
Thread.sleep(250);
5456

5557
verify(eventHandler, times(2)).handleEvent(any());

0 commit comments

Comments
 (0)