1
1
package io .javaoperatorsdk .operator .processing .event .source .informer ;
2
2
3
- import java .util .Collections ;
4
3
import java .util .LinkedHashMap ;
5
4
import java .util .Map ;
6
5
import java .util .Optional ;
7
- import java .util .Set ;
8
6
import java .util .concurrent .ConcurrentHashMap ;
9
7
10
8
import org .slf4j .Logger ;
18
16
/**
19
17
* <p>
20
18
* Temporal cache is used to solve the problem for {@link KubernetesDependentResource} that is, when
21
- * a create or update is executed the subsequent getResource opeeration might not return the
22
- * up-to-date resource from informer cache, since it is not received yet by webhook .
19
+ * a create or update is executed the subsequent getResource operation might not return the
20
+ * up-to-date resource from informer cache, since it is not received yet.
23
21
* </p>
24
22
* <p>
25
23
* The idea of the solution is, that since an update (for create is simpler) was done successfully,
36
34
*/
37
35
public class TemporaryResourceCache <T extends HasMetadata > {
38
36
37
+ static class ExpirationCache <K > {
38
+ private final LinkedHashMap <K , Long > cache ;
39
+ private final int ttlMs ;
40
+
41
+ public ExpirationCache (int maxEntries , int ttlMs ) {
42
+ this .ttlMs = ttlMs ;
43
+ this .cache = new LinkedHashMap <>() {
44
+ @ Override
45
+ protected boolean removeEldestEntry (Map .Entry <K , Long > eldest ) {
46
+ return size () > maxEntries ;
47
+ }
48
+ };
49
+ }
50
+
51
+ public void add (K key ) {
52
+ clean ();
53
+ cache .putIfAbsent (key , System .currentTimeMillis ());
54
+ }
55
+
56
+ public boolean contains (K key ) {
57
+ clean ();
58
+ return cache .get (key ) != null ;
59
+ }
60
+
61
+ void clean () {
62
+ if (!cache .isEmpty ()) {
63
+ long currentTimeMillis = System .currentTimeMillis ();
64
+ var iter = cache .entrySet ().iterator ();
65
+ // the order will already be from oldest to newest, clean a fixed number of entries to
66
+ // amortize the cost amongst multiple calls
67
+ for (int i = 0 ; i < 10 && iter .hasNext (); i ++) {
68
+ var entry = iter .next ();
69
+ if (currentTimeMillis - entry .getValue () > ttlMs ) {
70
+ iter .remove ();
71
+ }
72
+ }
73
+ }
74
+ }
75
+ }
76
+
39
77
private static final Logger log = LoggerFactory .getLogger (TemporaryResourceCache .class );
40
- private static final int MAX_RESOURCE_VERSIONS = 256 ;
41
78
42
79
private final Map <ResourceID , T > cache = new ConcurrentHashMap <>();
80
+
81
+ // keep up to the last million deletions for up to 10 minutes
82
+ private final ExpirationCache <String > tombstones = new ExpirationCache <>(1000000 , 1200000 );
43
83
private final ManagedInformerEventSource <T , ?, ?> managedInformerEventSource ;
44
84
private final boolean parseResourceVersions ;
45
- private final Set <String > knownResourceVersions ;
85
+ private final ExpirationCache <String > knownResourceVersions ;
46
86
47
87
public TemporaryResourceCache (ManagedInformerEventSource <T , ?, ?> managedInformerEventSource ,
48
88
boolean parseResourceVersions ) {
49
89
this .managedInformerEventSource = managedInformerEventSource ;
50
90
this .parseResourceVersions = parseResourceVersions ;
51
91
if (parseResourceVersions ) {
52
- knownResourceVersions = Collections .newSetFromMap (new LinkedHashMap <String , Boolean >() {
53
- @ Override
54
- protected boolean removeEldestEntry (java .util .Map .Entry <String , Boolean > eldest ) {
55
- return size () >= MAX_RESOURCE_VERSIONS ;
56
- }
57
- });
92
+ // keep up to the 50000 add/updates for up to 5 minutes
93
+ knownResourceVersions = new ExpirationCache <>(50000 , 600000 );
58
94
} else {
59
95
knownResourceVersions = null ;
60
96
}
61
97
}
62
98
63
- public synchronized void onEvent (T resource , boolean unknownState ) {
99
+ public synchronized void onDeleteEvent (T resource , boolean unknownState ) {
100
+ tombstones .add (resource .getMetadata ().getUid ());
101
+ onEvent (resource , unknownState );
102
+ }
103
+
104
+ public synchronized void onAddOrUpdateEvent (T resource ) {
105
+ onEvent (resource , false );
106
+ }
107
+
108
+ synchronized void onEvent (T resource , boolean unknownState ) {
64
109
cache .computeIfPresent (ResourceID .fromResource (resource ),
65
110
(id , cached ) -> (unknownState || !isLaterResourceVersion (id , cached , resource )) ? null
66
111
: cached );
@@ -84,20 +129,33 @@ public synchronized void putResource(T newResource, String previousResourceVersi
84
129
var cachedResource = getResourceFromCache (resourceId )
85
130
.orElse (managedInformerEventSource .get (resourceId ).orElse (null ));
86
131
87
- if ((previousResourceVersion == null && cachedResource == null )
132
+ boolean moveAhead = false ;
133
+ if (previousResourceVersion == null && cachedResource == null ) {
134
+ if (tombstones .contains (newResource .getMetadata ().getUid ())) {
135
+ log .debug (
136
+ "Won't resurrect uid {} for resource id: {}" ,
137
+ newResource .getMetadata ().getUid (), resourceId );
138
+ return ;
139
+ }
140
+ // we can skip further checks as this is a simple add and there's no previous entry to
141
+ // consider
142
+ moveAhead = true ;
143
+ }
144
+
145
+ if (moveAhead
88
146
|| (cachedResource != null
89
147
&& (cachedResource .getMetadata ().getResourceVersion ().equals (previousResourceVersion ))
90
148
|| isLaterResourceVersion (resourceId , newResource , cachedResource ))) {
91
149
log .debug (
92
150
"Temporarily moving ahead to target version {} for resource id: {}" ,
93
151
newResource .getMetadata ().getResourceVersion (), resourceId );
94
- putToCache ( newResource , resourceId );
152
+ cache . put ( resourceId , newResource );
95
153
} else if (cache .remove (resourceId ) != null ) {
96
154
log .debug ("Removed an obsolete resource from cache for id: {}" , resourceId );
97
155
}
98
156
}
99
157
100
- public boolean isKnownResourceVersion (T resource ) {
158
+ public synchronized boolean isKnownResourceVersion (T resource ) {
101
159
return knownResourceVersions != null
102
160
&& knownResourceVersions .contains (resource .getMetadata ().getResourceVersion ());
103
161
}
@@ -123,10 +181,6 @@ private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T c
123
181
return false ;
124
182
}
125
183
126
- private void putToCache (T resource , ResourceID resourceID ) {
127
- cache .put (resourceID == null ? ResourceID .fromResource (resource ) : resourceID , resource );
128
- }
129
-
130
184
public synchronized Optional <T > getResourceFromCache (ResourceID resourceID ) {
131
185
return Optional .ofNullable (cache .get (resourceID ));
132
186
}
0 commit comments