Skip to content

Commit e0da323

Browse files
lburgazzolimetacosm
authored andcommitted
fix: DefaultEventHandler should not fire events if the handler is being closed #578
1 parent 3c24f70 commit e0da323

File tree

3 files changed

+56
-10
lines changed

3 files changed

+56
-10
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java

+32-9
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public void failedEvent(String uid, Event event) {}
5151
private final ExecutorService executor;
5252
private final String controllerName;
5353
private final ReentrantLock lock = new ReentrantLock();
54+
private volatile boolean running;
5455
private DefaultEventSourceManager<R> eventSourceManager;
5556

5657
public DefaultEventHandler(ConfiguredController<R> controller) {
@@ -67,6 +68,7 @@ public DefaultEventHandler(ConfiguredController<R> controller) {
6768

6869
private DefaultEventHandler(ExecutorService executor, String relatedControllerName,
6970
EventDispatcher<R> eventDispatcher, Retry retry) {
71+
this.running = true;
7072
this.executor =
7173
executor == null
7274
? new ScheduledThreadPoolExecutor(
@@ -75,27 +77,28 @@ private DefaultEventHandler(ExecutorService executor, String relatedControllerNa
7577
this.controllerName = relatedControllerName;
7678
this.eventDispatcher = eventDispatcher;
7779
this.retry = retry;
78-
eventBuffer = new EventBuffer();
79-
}
80-
81-
public void setEventSourceManager(DefaultEventSourceManager<R> eventSourceManager) {
82-
this.eventSourceManager = eventSourceManager;
80+
this.eventBuffer = new EventBuffer();
8381
}
8482

8583
public static void setEventMonitor(EventMonitor monitor) {
8684
DefaultEventHandler.monitor = monitor;
8785
}
8886

89-
public interface EventMonitor {
90-
void processedEvent(String uid, Event event);
91-
92-
void failedEvent(String uid, Event event);
87+
public void setEventSourceManager(DefaultEventSourceManager<R> eventSourceManager) {
88+
this.eventSourceManager = eventSourceManager;
9389
}
9490

9591
@Override
9692
public void handleEvent(Event event) {
93+
9794
try {
9895
lock.lock();
96+
97+
if (!this.running) {
98+
log.debug("Skipping event: {} because the event handler is shutting down", event);
99+
return;
100+
}
101+
99102
log.debug("Received event: {}", event);
100103

101104
final Predicate<CustomResource> selector = event.getCustomResourcesSelector();
@@ -109,6 +112,16 @@ public void handleEvent(Event event) {
109112
}
110113
}
111114

115+
@Override
116+
public void close() {
117+
try {
118+
lock.lock();
119+
this.running = false;
120+
} finally {
121+
lock.unlock();
122+
}
123+
}
124+
112125
private void executeBufferedEvents(String customResourceUid) {
113126
boolean newEventForResourceId = eventBuffer.containsEvents(customResourceUid);
114127
boolean controllerUnderExecution = isControllerUnderExecution(customResourceUid);
@@ -143,6 +156,10 @@ void eventProcessingFinished(
143156
ExecutionScope<R> executionScope, PostExecutionControl<R> postExecutionControl) {
144157
try {
145158
lock.lock();
159+
if (!running) {
160+
return;
161+
}
162+
146163
log.debug(
147164
"Event processing finished. Scope: {}, PostExecutionControl: {}",
148165
executionScope,
@@ -279,6 +296,12 @@ private void unsetUnderExecution(String customResourceUid) {
279296
underProcessing.remove(customResourceUid);
280297
}
281298

299+
public interface EventMonitor {
300+
void processedEvent(String uid, Event event);
301+
302+
void failedEvent(String uid, Event event);
303+
}
304+
282305
private class ControllerExecution implements Runnable {
283306
private final ExecutionScope<R> executionScope;
284307

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java

+7
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ public DefaultEventSourceManager(ConfiguredController<R> controller) {
5454
public void close() {
5555
try {
5656
lock.lock();
57+
58+
try {
59+
defaultEventHandler.close();
60+
} catch (Exception e) {
61+
log.warn("Error closing event handler", e);
62+
}
63+
5764
for (var entry : eventSources.entrySet()) {
5865
try {
5966
log.debug("Closing {} -> {}", entry.getKey(), entry.getValue());

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,15 @@
2323
import static io.javaoperatorsdk.operator.TestUtils.testCustomResource;
2424
import static org.assertj.core.api.Assertions.assertThat;
2525
import static org.mockito.ArgumentMatchers.eq;
26-
import static org.mockito.Mockito.*;
26+
import static org.mockito.Mockito.any;
27+
import static org.mockito.Mockito.doAnswer;
28+
import static org.mockito.Mockito.doCallRealMethod;
29+
import static org.mockito.Mockito.mock;
30+
import static org.mockito.Mockito.never;
31+
import static org.mockito.Mockito.timeout;
32+
import static org.mockito.Mockito.times;
33+
import static org.mockito.Mockito.verify;
34+
import static org.mockito.Mockito.when;
2735

2836
class DefaultEventHandlerTest {
2937

@@ -224,6 +232,14 @@ public void scheduleTimedEventIfInstructedByPostExecutionControl() {
224232
.scheduleOnce(any(), eq(testDelay));
225233
}
226234

235+
@Test
236+
public void doNotFireEventsIfClosing() {
237+
defaultEventHandler.close();
238+
defaultEventHandler.handleEvent(prepareCREvent());
239+
240+
verify(eventDispatcherMock, timeout(50).times(0)).handleExecution(any());
241+
}
242+
227243
private void waitMinimalTime() {
228244
try {
229245
Thread.sleep(50);

0 commit comments

Comments
 (0)