23
23
import java .util .NoSuchElementException ;
24
24
25
25
/**
26
- * Merges two iterators. Assumes each of them is sorted by key
26
+ * AbstractMergedSortedCacheStoreIterator is an abstract class for merging two sorted iterators, one from a cache and
27
+ * the other from a store. It ensures the merged results maintain sorted order while resolving conflicts between cache
28
+ * and store entries.
27
29
*
28
- * @param <K>
29
- * @param <V>
30
+ * <p>This iterator is used for state stores in Kafka Streams, which have an (optional) caching layer that needs to be
31
+ * "merged" with the underlying state. It handles common scenarios like skipping records with cached tombstones (deleted
32
+ * entries) and preferring cache entries over store entries when conflicts arise.</p>
33
+ *
34
+ * @param <K> The type of the resulting merged key.
35
+ * @param <KS> The type of the store key.
36
+ * @param <V> The type of the resulting merged value.
37
+ * @param <VS> The type of the store value.
30
38
*/
31
39
abstract class AbstractMergedSortedCacheStoreIterator <K , KS , V , VS > implements KeyValueIterator <K , V > {
32
40
private final PeekingKeyValueIterator <Bytes , LRUCacheEntry > cacheIterator ;
33
41
private final KeyValueIterator <KS , VS > storeIterator ;
34
42
private final boolean forward ;
35
43
44
+ /**
45
+ * Constructs an AbstractMergedSortedCacheStoreIterator.
46
+ *
47
+ * @param cacheIterator The iterator for the cache, assumed to be sorted by key.
48
+ * @param storeIterator The iterator for the store, assumed to be sorted by key.
49
+ * @param forward The direction of iteration. True for forward, false for reverse.
50
+ */
36
51
AbstractMergedSortedCacheStoreIterator (final PeekingKeyValueIterator <Bytes , LRUCacheEntry > cacheIterator ,
37
52
final KeyValueIterator <KS , VS > storeIterator ,
38
53
final boolean forward ) {
@@ -41,20 +56,79 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V, VS> implements K
41
56
this .forward = forward ;
42
57
}
43
58
59
+ /**
60
+ * Compares the keys from the cache and store to determine their ordering.
61
+ *
62
+ * @param cacheKey The key from the cache.
63
+ * @param storeKey The key from the store.
64
+ *
65
+ * @return A negative integer, zero, or a positive integer as the cache key is less than,
66
+ * equal to, or greater than the store key.
67
+ */
44
68
abstract int compare (final Bytes cacheKey , final KS storeKey );
45
69
70
+ /**
71
+ * Deserializes a store key into a generic merged key type.
72
+ *
73
+ * @param key The store key to deserialize.
74
+ *
75
+ * @return The deserialized key.
76
+ */
46
77
abstract K deserializeStoreKey (final KS key );
47
78
79
+ /**
80
+ * Deserializes a key-value pair from the store into a generic merged key-value pair.
81
+ *
82
+ * @param pair The key-value pair from the store.
83
+ *
84
+ * @return The deserialized key-value pair.
85
+ */
48
86
abstract KeyValue <K , V > deserializeStorePair (final KeyValue <KS , VS > pair );
49
87
88
+ /**
89
+ * Deserializes a cache key into a generic merged key type.
90
+ *
91
+ * @param cacheKey The cache key to deserialize.
92
+ *
93
+ * @return The deserialized key.
94
+ */
50
95
abstract K deserializeCacheKey (final Bytes cacheKey );
51
96
97
+ /**
98
+ * Deserializes a cache entry into a generic value type.
99
+ *
100
+ * @param cacheEntry The cache entry to deserialize.
101
+ *
102
+ * @return The deserialized value.
103
+ */
52
104
abstract V deserializeCacheValue (final LRUCacheEntry cacheEntry );
53
105
106
+ /**
107
+ * Checks if a cache entry is a tombstone (representing a deleted value).
108
+ *
109
+ * @param nextFromCache The cache entry to check.
110
+ *
111
+ * @return True if the cache entry is a tombstone, false otherwise.
112
+ */
54
113
private boolean isDeletedCacheEntry (final KeyValue <Bytes , LRUCacheEntry > nextFromCache ) {
55
114
return nextFromCache .value .value () == null ;
56
115
}
57
116
117
+ /**
118
+ * Determines if there are more entries to iterate over, resolving conflicts between cache and store entries (e.g.,
119
+ * skipping tombstones).
120
+ *
121
+ * <p>Conflict resolution scenarios:</p>
122
+ *
123
+ * <ul>
124
+ * <li><b>Cache contains a tombstone for a key:</b> Skip both the cache tombstone and the corresponding store entry (if exists).</li>
125
+ * <li><b>Cache contains a value for a key present in the store:</b> Prefer the cache value and skip the store entry.</li>
126
+ * <li><b>Cache key is unique:</b> Return the cache value.</li>
127
+ * <li><b>Store key is unique:</b> Return the store value.</li>
128
+ * </ul>
129
+ *
130
+ * @return True if there are more entries, false otherwise.
131
+ */
58
132
@ Override
59
133
public boolean hasNext () {
60
134
// skip over items deleted from cache, and corresponding store items if they have the same key
@@ -86,6 +160,13 @@ public boolean hasNext() {
86
160
return cacheIterator .hasNext () || storeIterator .hasNext ();
87
161
}
88
162
163
+ /**
164
+ * Retrieves the next key-value pair in the merged iteration.
165
+ *
166
+ * @return The next key-value pair.
167
+ *
168
+ * @throws NoSuchElementException If there are no more elements to iterate.
169
+ */
89
170
@ Override
90
171
public KeyValue <K , V > next () {
91
172
if (!hasNext ()) {
@@ -107,6 +188,15 @@ public KeyValue<K, V> next() {
107
188
return chooseNextValue (nextCacheKey , nextStoreKey , comparison );
108
189
}
109
190
191
+ /**
192
+ * Resolves which source (cache or store) to fetch the next key-value pair when a comparison is performed.
193
+ *
194
+ * @param nextCacheKey The next key from the cache.
195
+ * @param nextStoreKey The next key from the store.
196
+ * @param comparison The comparison result between the cache and store keys.
197
+ *
198
+ * @return The next key-value pair.
199
+ */
110
200
private KeyValue <K , V > chooseNextValue (final Bytes nextCacheKey ,
111
201
final KS nextStoreKey ,
112
202
final int comparison ) {
@@ -133,6 +223,15 @@ private KeyValue<K, V> chooseNextValue(final Bytes nextCacheKey,
133
223
}
134
224
}
135
225
226
+ /**
227
+ * Fetches the next value from the store, ensuring it matches the expected key.
228
+ *
229
+ * @param nextStoreKey The expected next key from the store.
230
+ *
231
+ * @return The next key-value pair from the store.
232
+ *
233
+ * @throws IllegalStateException If the key does not match the expected key.
234
+ */
136
235
private KeyValue <K , V > nextStoreValue (final KS nextStoreKey ) {
137
236
final KeyValue <KS , VS > next = storeIterator .next ();
138
237
@@ -143,6 +242,15 @@ private KeyValue<K, V> nextStoreValue(final KS nextStoreKey) {
143
242
return deserializeStorePair (next );
144
243
}
145
244
245
+ /**
246
+ * Fetches the next value from the cache, ensuring it matches the expected key.
247
+ *
248
+ * @param nextCacheKey The expected next key from the cache.
249
+ *
250
+ * @return The next key-value pair from the cache.
251
+ *
252
+ * @throws IllegalStateException If the key does not match the expected key.
253
+ */
146
254
private KeyValue <K , V > nextCacheValue (final Bytes nextCacheKey ) {
147
255
final KeyValue <Bytes , LRUCacheEntry > next = cacheIterator .next ();
148
256
@@ -153,6 +261,13 @@ private KeyValue<K, V> nextCacheValue(final Bytes nextCacheKey) {
153
261
return KeyValue .pair (deserializeCacheKey (next .key ), deserializeCacheValue (next .value ));
154
262
}
155
263
264
+ /**
265
+ * Peeks at the next key in the merged iteration without advancing the iterator.
266
+ *
267
+ * @return The next key in the iteration.
268
+ *
269
+ * @throws NoSuchElementException If there are no more elements to peek.
270
+ */
156
271
@ Override
157
272
public K peekNextKey () {
158
273
if (!hasNext ()) {
@@ -174,6 +289,18 @@ public K peekNextKey() {
174
289
return chooseNextKey (nextCacheKey , nextStoreKey , comparison );
175
290
}
176
291
292
+ /**
293
+ * Determines the next key to return from the merged iteration based on the comparison of the cache and store keys.
294
+ * Resolves conflicts by considering the iteration direction and ensuring the merged order is maintained.
295
+ *
296
+ * @param nextCacheKey The next key from the cache.
297
+ * @param nextStoreKey The next key from the store.
298
+ * @param comparison The comparison result between the cache and store keys. A negative value indicates the cache
299
+ * key is smaller, zero indicates equality, and a positive value indicates the store key is
300
+ * smaller.
301
+ *
302
+ * @return The next key to return from the merged iteration.
303
+ */
177
304
private K chooseNextKey (final Bytes nextCacheKey ,
178
305
final KS nextStoreKey ,
179
306
final int comparison ) {
@@ -200,6 +327,9 @@ private K chooseNextKey(final Bytes nextCacheKey,
200
327
}
201
328
}
202
329
330
+ /**
331
+ * Closes the iterators and releases any associated resources.
332
+ */
203
333
@ Override
204
334
public void close () {
205
335
cacheIterator .close ();
0 commit comments