1
- package io .javaoperatorsdk .operator .processing ;
1
+ package io .javaoperatorsdk .operator .processing . event ;
2
2
3
3
import java .util .HashMap ;
4
4
import java .util .HashSet ;
14
14
15
15
import io .fabric8 .kubernetes .api .model .HasMetadata ;
16
16
import io .javaoperatorsdk .operator .OperatorException ;
17
- import io .javaoperatorsdk .operator .api .LifecycleAware ;
18
17
import io .javaoperatorsdk .operator .api .config .ConfigurationService ;
19
18
import io .javaoperatorsdk .operator .api .config .ExecutorServiceManager ;
20
19
import io .javaoperatorsdk .operator .api .monitoring .Metrics ;
21
20
import io .javaoperatorsdk .operator .api .reconciler .RetryInfo ;
22
- import io .javaoperatorsdk .operator .processing .event . Event ;
23
- import io .javaoperatorsdk .operator .processing .event . EventHandler ;
24
- import io .javaoperatorsdk .operator .processing .event . EventSourceManager ;
25
- import io .javaoperatorsdk .operator .processing .event .ResourceID ;
26
- import io .javaoperatorsdk .operator .processing .event .internal . ResourceAction ;
27
- import io .javaoperatorsdk .operator .processing .event .internal . ResourceEvent ;
21
+ import io .javaoperatorsdk .operator .processing .LifecycleAware ;
22
+ import io .javaoperatorsdk .operator .processing .MDCUtils ;
23
+ import io .javaoperatorsdk .operator .processing .ResourceCache ;
24
+ import io .javaoperatorsdk .operator .processing .event .source . ResourceAction ;
25
+ import io .javaoperatorsdk .operator .processing .event .source . ResourceEvent ;
26
+ import io .javaoperatorsdk .operator .processing .event .source . TimerEventSource ;
28
27
import io .javaoperatorsdk .operator .processing .retry .GenericRetry ;
29
28
import io .javaoperatorsdk .operator .processing .retry .Retry ;
30
29
import io .javaoperatorsdk .operator .processing .retry .RetryExecution ;
36
35
* Event handler that makes sure that events are processed in a "single threaded" way per resource
37
36
* UID, while buffering events which are received during an execution.
38
37
*/
39
- public class EventProcessor <R extends HasMetadata >
40
- implements EventHandler , LifecycleAware {
38
+ class EventProcessor <R extends HasMetadata > implements EventHandler , LifecycleAware {
41
39
42
40
private static final Logger log = LoggerFactory .getLogger (EventProcessor .class );
43
41
@@ -51,32 +49,34 @@ public class EventProcessor<R extends HasMetadata>
51
49
private final Metrics metrics ;
52
50
private volatile boolean running ;
53
51
private final ResourceCache <R > resourceCache ;
54
- private EventSourceManager <R > eventSourceManager ;
52
+ private final EventSourceManager <R > eventSourceManager ;
55
53
private final EventMarker eventMarker ;
56
54
57
- public EventProcessor (Controller <R > controller , ResourceCache < R > resourceCache ) {
55
+ EventProcessor (EventSourceManager <R > eventSourceManager ) {
58
56
this (
59
- resourceCache ,
57
+ eventSourceManager . getControllerResourceEventSource () ,
60
58
ExecutorServiceManager .instance ().executorService (),
61
- controller .getConfiguration ().getName (),
62
- new ReconciliationDispatcher <>(controller ),
63
- GenericRetry .fromConfiguration (controller .getConfiguration ().getRetryConfiguration ()),
64
- controller .getConfiguration ().getConfigurationService ().getMetrics (),
65
- new EventMarker ());
59
+ eventSourceManager .getController ().getConfiguration ().getName (),
60
+ new ReconciliationDispatcher <>(eventSourceManager .getController ()),
61
+ GenericRetry .fromConfiguration (
62
+ eventSourceManager .getController ().getConfiguration ().getRetryConfiguration ()),
63
+ eventSourceManager .getController ().getConfiguration ().getConfigurationService ()
64
+ .getMetrics (),
65
+ eventSourceManager );
66
66
}
67
67
68
68
EventProcessor (ReconciliationDispatcher <R > reconciliationDispatcher ,
69
- ResourceCache <R > resourceCache ,
69
+ EventSourceManager <R > eventSourceManager ,
70
70
String relatedControllerName ,
71
- Retry retry , EventMarker eventMarker ) {
72
- this (resourceCache , null , relatedControllerName , reconciliationDispatcher , retry , null ,
73
- eventMarker );
71
+ Retry retry ) {
72
+ this (eventSourceManager . getControllerResourceEventSource () , null , relatedControllerName ,
73
+ reconciliationDispatcher , retry , null , eventSourceManager );
74
74
}
75
75
76
76
private EventProcessor (ResourceCache <R > resourceCache , ExecutorService executor ,
77
77
String relatedControllerName ,
78
78
ReconciliationDispatcher <R > reconciliationDispatcher , Retry retry , Metrics metrics ,
79
- EventMarker eventMarker ) {
79
+ EventSourceManager < R > eventSourceManager ) {
80
80
this .running = true ;
81
81
this .executor =
82
82
executor == null
@@ -88,11 +88,12 @@ private EventProcessor(ResourceCache<R> resourceCache, ExecutorService executor,
88
88
this .retry = retry ;
89
89
this .resourceCache = resourceCache ;
90
90
this .metrics = metrics != null ? metrics : Metrics .NOOP ;
91
- this .eventMarker = eventMarker ;
91
+ this .eventMarker = new EventMarker ();
92
+ this .eventSourceManager = eventSourceManager ;
92
93
}
93
94
94
- public void setEventSourceManager ( EventSourceManager < R > eventSourceManager ) {
95
- this . eventSourceManager = eventSourceManager ;
95
+ EventMarker getEventMarker ( ) {
96
+ return eventMarker ;
96
97
}
97
98
98
99
@ Override
@@ -243,9 +244,12 @@ private boolean isCacheReadyForInstantReconciliation(ExecutionScope<R> execution
243
244
244
245
private void reScheduleExecutionIfInstructed (PostExecutionControl <R > postExecutionControl ,
245
246
R customResource ) {
246
- postExecutionControl .getReScheduleDelay ().ifPresent (delay -> eventSourceManager
247
- .getRetryAndRescheduleTimerEventSource ()
248
- .scheduleOnce (customResource , delay ));
247
+ postExecutionControl .getReScheduleDelay ()
248
+ .ifPresent (delay -> retryEventSource ().scheduleOnce (customResource , delay ));
249
+ }
250
+
251
+ TimerEventSource <R > retryEventSource () {
252
+ return eventSourceManager .retryEventSource ();
249
253
}
250
254
251
255
/**
@@ -275,9 +279,7 @@ private void handleRetryOnException(ExecutionScope<R> executionScope,
275
279
delay ,
276
280
customResourceID );
277
281
metrics .failedReconciliation (customResourceID , exception );
278
- eventSourceManager
279
- .getRetryAndRescheduleTimerEventSource ()
280
- .scheduleOnce (executionScope .getResource (), delay );
282
+ retryEventSource ().scheduleOnce (executionScope .getResource (), delay );
281
283
},
282
284
() -> log .error ("Exhausted retries for {}" , executionScope ));
283
285
}
@@ -289,9 +291,7 @@ private void cleanupOnSuccessfulExecution(ExecutionScope<R> executionScope) {
289
291
if (isRetryConfigured ()) {
290
292
retryState .remove (executionScope .getCustomResourceID ());
291
293
}
292
- eventSourceManager
293
- .getRetryAndRescheduleTimerEventSource ()
294
- .cancelOnceSchedule (executionScope .getCustomResourceID ());
294
+ retryEventSource ().cancelOnceSchedule (executionScope .getCustomResourceID ());
295
295
}
296
296
297
297
private RetryExecution getOrInitRetryExecution (ExecutionScope <R > executionScope ) {
0 commit comments