Skip to content

Commit ca03c75

Browse files
authored
Merge pull request #556 from bradskuse/feature/commit-offsets
[#555] Add option to commit offsets
2 parents ad4f2ae + 1748681 commit ca03c75

File tree

4 files changed

+168
-18
lines changed

4 files changed

+168
-18
lines changed

kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.java

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2010-2022. Axon Framework
2+
* Copyright (c) 2010-2025. Axon Framework
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,6 +56,7 @@ public class AsyncFetcher<K, V, E> implements Fetcher<K, V, E> {
5656
private final ExecutorService executorService;
5757
private final boolean requirePoolShutdown;
5858
private final Set<FetchEventsTask<K, V, E>> activeFetchers = ConcurrentHashMap.newKeySet();
59+
private final OffsetCommitType offsetCommitType;
5960

6061
/**
6162
* Instantiate a Builder to be able to create a {@link AsyncFetcher}.
@@ -85,32 +86,34 @@ protected AsyncFetcher(Builder<K, V, E> builder) {
8586
this.pollTimeout = builder.pollTimeout;
8687
this.executorService = builder.executorService;
8788
this.requirePoolShutdown = builder.requirePoolShutdown;
89+
this.offsetCommitType = builder.offsetCommitType;
8890
}
8991

9092
/**
9193
* {@inheritDoc}
9294
*/
9395
@Override
9496
public Registration poll(Consumer<K, V> consumer,
95-
RecordConverter<K, V, E> recordConverter,
96-
EventConsumer<E> eventConsumer) {
97+
RecordConverter<K, V, E> recordConverter,
98+
EventConsumer<E> eventConsumer) {
9799
return poll(consumer, recordConverter, eventConsumer,
98-
e -> logger.warn("Error from fetching thread, should be handled properly", e));
100+
e -> logger.warn("Error from fetching thread, should be handled properly", e));
99101
}
100102

101103
/**
102104
* {@inheritDoc}
103105
*/
104106
@Override
105107
public Registration poll(Consumer<K, V> consumer, RecordConverter<K, V, E> recordConverter,
106-
EventConsumer<E> eventConsumer, RuntimeErrorHandler runtimeErrorHandler) {
108+
EventConsumer<E> eventConsumer, RuntimeErrorHandler runtimeErrorHandler) {
107109
FetchEventsTask<K, V, E> fetcherTask =
108110
new FetchEventsTask<>(consumer,
109-
pollTimeout,
110-
recordConverter,
111-
eventConsumer,
112-
activeFetchers::remove,
113-
runtimeErrorHandler);
111+
pollTimeout,
112+
recordConverter,
113+
eventConsumer,
114+
activeFetchers::remove,
115+
runtimeErrorHandler,
116+
offsetCommitType);
114117

115118
activeFetchers.add(fetcherTask);
116119
executorService.execute(fetcherTask);
@@ -148,6 +151,7 @@ public static final class Builder<K, V, E> {
148151
private Duration pollTimeout = Duration.ofMillis(DEFAULT_POLL_TIMEOUT_MS);
149152
private ExecutorService executorService = Executors.newCachedThreadPool(new AxonThreadFactory("AsyncFetcher"));
150153
private boolean requirePoolShutdown = true;
154+
private OffsetCommitType offsetCommitType = OffsetCommitType.AUTO;
151155

152156
/**
153157
* Set the {@code pollTimeout} in milliseconds for polling records from a topic. Defaults to {@code 5000}
@@ -158,11 +162,36 @@ public static final class Builder<K, V, E> {
158162
*/
159163
public Builder<K, V, E> pollTimeout(long timeoutMillis) {
160164
assertThat(timeoutMillis, timeout -> timeout > 0,
161-
"The poll timeout may not be negative [" + timeoutMillis + "]");
165+
"The poll timeout may not be negative [" + timeoutMillis + "]");
162166
this.pollTimeout = Duration.ofMillis(timeoutMillis);
163167
return this;
164168
}
165169

170+
/**
171+
* Sets the {@code offsetCommitType} defining how the {@link FetchEventsTask} will commit offsets during
172+
* processing of events.
173+
* <p>
174+
* Options are:
175+
* <ul>
176+
* <li>{@link OffsetCommitType#AUTO} - let the Kafka consumer commit offsets automatically in background.
177+
* </li>
178+
* <li>{@link OffsetCommitType#COMMIT_SYNC} - let the Kafka consumer commit offsets synchronously after
179+
* processing.</li>
180+
* <li>{@link OffsetCommitType#COMMIT_ASYNC} - let the Kafka consumer commit offsets asynchronously after
181+
* processing.</li>
182+
* </ul>
183+
* <p>
184+
* Defaults to {@code OffsetCommitType#AUTO}, meaning the offset commit task happens in the background.
185+
*
186+
* @param offsetCommitType {@link OffsetCommitType} enum to specify the offset commit type
187+
* @return the current Builder instance, for fluent interfacing
188+
*/
189+
public AsyncFetcher.Builder<K, V, E> offsetCommitType(OffsetCommitType offsetCommitType) {
190+
assertNonNull(offsetCommitType, "OffsetCommitType may not be null");
191+
this.offsetCommitType = offsetCommitType;
192+
return this;
193+
}
194+
166195
/**
167196
* Sets the {@link ExecutorService} used to start {@link FetchEventsTask} instances to poll for Kafka consumer
168197
* records. Note that the {@code executorService} should contain sufficient threads to run the necessary fetcher

kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTask.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2010-2022. Axon Framework
2+
* Copyright (c) 2010-2025. Axon Framework
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -52,6 +52,7 @@ class FetchEventsTask<K, V, E> implements Runnable {
5252
private final EventConsumer<E> eventConsumer;
5353
private final java.util.function.Consumer<FetchEventsTask<K, V, E>> closeHandler;
5454
private final RuntimeErrorHandler runtimeErrorHandler;
55+
private final OffsetCommitType offsetCommitType;
5556

5657

5758
/**
@@ -72,7 +73,8 @@ class FetchEventsTask<K, V, E> implements Runnable {
7273
RecordConverter<K, V, E> recordConverter,
7374
EventConsumer<E> eventConsumer,
7475
java.util.function.Consumer<FetchEventsTask<K, V, E>> closeHandler,
75-
RuntimeErrorHandler runtimeErrorHandler) {
76+
RuntimeErrorHandler runtimeErrorHandler,
77+
OffsetCommitType offsetCommitType) {
7678
this.consumer = nonNull(consumer, () -> "Consumer may not be null");
7779
assertThat(pollTimeout, time -> !time.isNegative(),
7880
"The poll timeout may not be negative [" + pollTimeout + "]");
@@ -81,6 +83,7 @@ class FetchEventsTask<K, V, E> implements Runnable {
8183
this.eventConsumer = eventConsumer;
8284
this.closeHandler = getOrDefault(closeHandler, task -> { /* no-op */ });
8385
this.runtimeErrorHandler = nonNull(runtimeErrorHandler, () -> "Runtime error handler may not be null");
86+
this.offsetCommitType = offsetCommitType;
8487
}
8588

8689
@Override
@@ -109,6 +112,7 @@ private void processRecords(ConsumerRecords<K, V> records) {
109112
try {
110113
if (!convertedMessages.isEmpty()) {
111114
eventConsumer.consume(convertedMessages);
115+
handleOffsetsIfRequired(convertedMessages.size());
112116
}
113117
} catch (InterruptedException e) {
114118
logger.debug("Event Consumer thread was interrupted. Shutting down", e);
@@ -117,6 +121,18 @@ private void processRecords(ConsumerRecords<K, V> records) {
117121
}
118122
}
119123

124+
private void handleOffsetsIfRequired(int messageCount) {
125+
if (OffsetCommitType.COMMIT_SYNC == offsetCommitType) {
126+
consumer.commitSync();
127+
logger.debug("Committed offsets synchronously for {} messages", messageCount);
128+
} else if (OffsetCommitType.COMMIT_ASYNC == offsetCommitType) {
129+
consumer.commitAsync();
130+
logger.debug("Committed offsets asynchronously for {} messages", messageCount);
131+
} else {
132+
logger.debug("Consumer will commit offsets in background");
133+
}
134+
}
135+
120136
/**
121137
* Shutdown this {@link FetchEventsTask}.
122138
*/
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (c) 2010-2025. Axon Framework
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.axonframework.extensions.kafka.eventhandling.consumer;
18+
19+
import org.apache.kafka.clients.consumer.KafkaConsumer;
20+
21+
/**
22+
* Enum to define how the consumer will handle committing offsets.
23+
*
24+
* @author Bradley Skuse
25+
* @since 4.11.1
26+
*/
27+
public enum OffsetCommitType {
28+
29+
/**
30+
* Kafka consumer will commit offsets automatically in the background.
31+
*/
32+
AUTO,
33+
34+
/**
35+
* Kafka consumer will commit offsets asynchronously after processing
36+
*
37+
* @see KafkaConsumer#commitAsync()
38+
*/
39+
COMMIT_ASYNC,
40+
41+
/**
42+
* Kafka consumer will commit offsets synchronously after processing
43+
*
44+
* @see KafkaConsumer#commitSync()
45+
*/
46+
COMMIT_SYNC
47+
}

kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTaskTest.java

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2010-2022. Axon Framework
2+
* Copyright (c) 2010-2025. Axon Framework
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -76,7 +76,8 @@ void setUp() {
7676
testRecordConverter,
7777
testEventConsumer,
7878
testCloseHandler,
79-
runtimeErrorHandler
79+
runtimeErrorHandler,
80+
OffsetCommitType.AUTO
8081
);
8182

8283
when(testConsumer.poll(testPollTimeout)).thenReturn(consumerRecords);
@@ -95,7 +96,8 @@ void testTaskConstructionWithInvalidConsumerShouldThrowException() {
9596
testRecordConverter,
9697
testEventConsumer,
9798
testCloseHandler,
98-
runtimeErrorHandler
99+
runtimeErrorHandler,
100+
OffsetCommitType.AUTO
99101
)
100102
);
101103
}
@@ -111,7 +113,8 @@ void testTaskConstructionWithNegativeTimeoutShouldThrowException() {
111113
testRecordConverter,
112114
testEventConsumer,
113115
testCloseHandler,
114-
runtimeErrorHandler
116+
runtimeErrorHandler,
117+
OffsetCommitType.AUTO
115118
)
116119
);
117120
}
@@ -127,7 +130,8 @@ void testFetchEventsTaskInterruptionClosesAsExpected() {
127130
testRecordConverter,
128131
failingEventConsumer,
129132
testCloseHandler,
130-
runtimeErrorHandler
133+
runtimeErrorHandler,
134+
OffsetCommitType.AUTO
131135
);
132136

133137
Thread taskRunner = new Thread(testSubjectWithFailingEventConsumer);
@@ -148,6 +152,8 @@ void testFetchEventsTaskPollsConvertsAndConsumesRecords() throws InterruptedExce
148152
verify(testConsumer, atLeastOnceWithTimeout).poll(testPollTimeout);
149153
verify(testRecordConverter, atLeastOnceWithTimeout).convert(consumerRecords);
150154
verify(testEventConsumer, atLeastOnceWithTimeout).consume(Collections.singletonList(kafkaEventMessage));
155+
verify(testConsumer, never()).commitSync();
156+
verify(testConsumer, never()).commitAsync();
151157

152158
taskRunner.interrupt();
153159
}
@@ -163,6 +169,8 @@ void testFetchEventsTaskPollsDoesNotCallEventConsumerForZeroConvertedEvents() {
163169
verify(testConsumer, atLeastOnceWithTimeout).poll(testPollTimeout);
164170
verify(testRecordConverter, atLeastOnceWithTimeout).convert(consumerRecords);
165171
verifyNoMoreInteractions(testEventConsumer);
172+
verify(testConsumer, never()).commitSync();
173+
verify(testConsumer, never()).commitAsync();
166174

167175
taskRunner.interrupt();
168176
}
@@ -177,4 +185,54 @@ void testCloseCallsProvidedCloseHandler() {
177185
assertWithin(Duration.ofMillis(TIMEOUT_MILLIS), () -> assertTrue(expectedToBeClosed.get()));
178186
verify(testConsumer, timeout(TIMEOUT_MILLIS)).close();
179187
}
188+
189+
@Test
190+
void testFetchEventsTaskPollsConvertsAndConsumesRecordsAndCommitOffsetsSync() throws InterruptedException {
191+
VerificationMode atLeastOnceWithTimeout = timeout(TIMEOUT_MILLIS).atLeastOnce();
192+
193+
FetchEventsTask<String, String, KafkaEventMessage> testSubjectWithCommitOffsets = new FetchEventsTask<>(
194+
testConsumer,
195+
testPollTimeout,
196+
testRecordConverter,
197+
testEventConsumer,
198+
testCloseHandler,
199+
runtimeErrorHandler,
200+
OffsetCommitType.COMMIT_SYNC
201+
);
202+
203+
Thread taskRunner = new Thread(testSubjectWithCommitOffsets);
204+
taskRunner.start();
205+
206+
verify(testConsumer, atLeastOnceWithTimeout).poll(testPollTimeout);
207+
verify(testRecordConverter, atLeastOnceWithTimeout).convert(consumerRecords);
208+
verify(testEventConsumer, atLeastOnceWithTimeout).consume(Collections.singletonList(kafkaEventMessage));
209+
verify(testConsumer, atLeastOnceWithTimeout).commitSync();
210+
211+
taskRunner.interrupt();
212+
}
213+
214+
@Test
215+
void testFetchEventsTaskPollsConvertsAndConsumesRecordsAndCommitOffsetsAsync() throws InterruptedException {
216+
VerificationMode atLeastOnceWithTimeout = timeout(TIMEOUT_MILLIS).atLeastOnce();
217+
218+
FetchEventsTask<String, String, KafkaEventMessage> testSubjectWithCommitOffsets = new FetchEventsTask<>(
219+
testConsumer,
220+
testPollTimeout,
221+
testRecordConverter,
222+
testEventConsumer,
223+
testCloseHandler,
224+
runtimeErrorHandler,
225+
OffsetCommitType.COMMIT_ASYNC
226+
);
227+
228+
Thread taskRunner = new Thread(testSubjectWithCommitOffsets);
229+
taskRunner.start();
230+
231+
verify(testConsumer, atLeastOnceWithTimeout).poll(testPollTimeout);
232+
verify(testRecordConverter, atLeastOnceWithTimeout).convert(consumerRecords);
233+
verify(testEventConsumer, atLeastOnceWithTimeout).consume(Collections.singletonList(kafkaEventMessage));
234+
verify(testConsumer, atLeastOnceWithTimeout).commitAsync();
235+
236+
taskRunner.interrupt();
237+
}
180238
}

0 commit comments

Comments
 (0)