Skip to content

Commit 2446868

Browse files
Fix: record tracker does not support concurrent event batch removal
1 parent 6879f4b commit 2446868

File tree

5 files changed

+112
-35
lines changed

5 files changed

+112
-35
lines changed

src/main/java/com/splunk/kafka/connect/KafkaRecordTracker.java

+30-10
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.splunk.hecclient.Event;
1919
import com.splunk.hecclient.EventBatch;
20+
import org.apache.commons.lang3.tuple.Pair;
2021
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2122
import org.apache.kafka.common.TopicPartition;
2223
import org.apache.kafka.connect.sink.SinkRecord;
@@ -27,14 +28,15 @@
2728
import java.util.concurrent.ConcurrentMap;
2829
import java.util.concurrent.ConcurrentNavigableMap;
2930
import java.util.concurrent.ConcurrentSkipListMap;
31+
import java.util.concurrent.atomic.AtomicBoolean;
3032
import java.util.concurrent.atomic.AtomicLong;
3133

3234
import org.slf4j.Logger;
3335
import org.slf4j.LoggerFactory;
3436

3537
final class KafkaRecordTracker {
3638
private static final Logger log = LoggerFactory.getLogger(KafkaRecordTracker.class);
37-
private ConcurrentMap<TopicPartition, ConcurrentNavigableMap<Long, EventBatch>> all; // TopicPartition + Long offset represents the SinkRecord
39+
private ConcurrentMap<TopicPartition, ConcurrentNavigableMap<Long, Pair<EventBatch, AtomicBoolean>>> all; // TopicPartition + Long offset represents the SinkRecord
3840
private AtomicLong total;
3941
private ConcurrentLinkedQueue<EventBatch> failed;
4042
private volatile Map<TopicPartition, OffsetAndMetadata> offsets;
@@ -63,20 +65,24 @@ public void removeAckedEventBatches(final List<EventBatch> batches) {
6365
* Note that if some events are tied to an unassigned partition those
6466
* offsets won't be able to be commited.
6567
*/
68+
long countOfEventsToRemove = 0;
69+
6670
for (TopicPartition tp : partitions) {
67-
ConcurrentNavigableMap<Long, EventBatch> tpRecords = all.get(tp);
71+
ConcurrentNavigableMap<Long, Pair<EventBatch, AtomicBoolean>> tpRecords = all.get(tp);
6872
if (tpRecords == null) {
6973
continue; // nothing to remove in this case
7074
}
7175
long offset = -1;
72-
Iterator<Map.Entry<Long, EventBatch>> iter = tpRecords.entrySet().iterator();
76+
Iterator<Map.Entry<Long, Pair<EventBatch, AtomicBoolean>>> iter = tpRecords.entrySet().iterator();
7377
for (; iter.hasNext();) {
74-
Map.Entry<Long, EventBatch> e = iter.next();
75-
if (e.getValue().isCommitted()) {
78+
Map.Entry<Long, Pair<EventBatch, AtomicBoolean>> e = iter.next();
79+
if (e.getValue().getLeft().isCommitted()) {
7680
log.debug("processing offset {}", e.getKey());
7781
offset = e.getKey();
78-
iter.remove();
79-
total.decrementAndGet();
82+
if (e.getValue().getRight().compareAndSet(false, true)) {
83+
iter.remove();
84+
countOfEventsToRemove++;
85+
}
8086
} else {
8187
break;
8288
}
@@ -85,6 +91,20 @@ public void removeAckedEventBatches(final List<EventBatch> batches) {
8591
offsets.put(tp, new OffsetAndMetadata(offset + 1));
8692
}
8793
}
94+
decrementTotalEventCount(countOfEventsToRemove);
95+
}
96+
97+
private void decrementTotalEventCount(long countOfEventsToRemove) {
98+
total.getAndUpdate(current -> {
99+
if (current < countOfEventsToRemove) {
100+
log.warn("Total event count ({}) is lower than the count ({}) we try to remove, resetting to 0",
101+
current,
102+
countOfEventsToRemove);
103+
return 0;
104+
} else {
105+
return current - countOfEventsToRemove;
106+
}
107+
});
88108
}
89109

90110
public void addFailedEventBatch(final EventBatch batch) {
@@ -100,8 +120,8 @@ public void addEventBatch(final EventBatch batch) {
100120
if (event.getTied() instanceof SinkRecord) {
101121
final SinkRecord record = (SinkRecord) event.getTied();
102122
TopicPartition tp = new TopicPartition(record.topic(), record.kafkaPartition());
103-
ConcurrentNavigableMap<Long, EventBatch> tpRecords = all.computeIfAbsent(tp, k -> new ConcurrentSkipListMap<>());
104-
tpRecords.computeIfAbsent(record.kafkaOffset(), k -> { total.incrementAndGet(); return batch; });
123+
ConcurrentNavigableMap<Long, Pair<EventBatch, AtomicBoolean>> tpRecords = all.computeIfAbsent(tp, k -> new ConcurrentSkipListMap<>());
124+
tpRecords.computeIfAbsent(record.kafkaOffset(), k -> { total.incrementAndGet(); return Pair.of(batch, new AtomicBoolean(false)); });
105125
}
106126
}
107127
}
@@ -180,7 +200,7 @@ public void cleanupAfterClosedPartitions(Collection<TopicPartition> partitions)
180200
log.warn("purge events={} from closed partitions={}",
181201
countOfEventsToRemove, partitions);
182202
all.keySet().removeAll(partitions);
183-
total.addAndGet(-1L * countOfEventsToRemove);
203+
decrementTotalEventCount(countOfEventsToRemove);
184204
}
185205
}
186206
}

src/test/java/com/splunk/hecclient/UnitUtil.java

+9
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,15 @@ public static EventBatch createBatch() {
3535
return batch;
3636
}
3737

38+
public static EventBatch createMultiBatch(int count) {
39+
EventBatch batch = new JsonEventBatch();
40+
for (int i = 0; i < count; i++) {
41+
Event event = new JsonEvent("ni-" + i, "hao-" + i);
42+
batch.add(event);
43+
}
44+
return batch;
45+
}
46+
3847
public static EventBatch createRawEventBatch() {
3948
Event event = new RawEvent("ni", "hao");
4049
EventBatch batch = RawEventBatch.factory().build();

src/test/java/com/splunk/kafka/connect/KafkaRecordTrackerTest.java

+48
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
import java.util.Collection;
2828
import java.util.List;
2929
import java.util.Map;
30+
import java.util.concurrent.ExecutionException;
31+
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.Executors;
33+
import java.util.concurrent.Future;
3034

3135
public class KafkaRecordTrackerTest {
3236
@Test
@@ -48,6 +52,38 @@ public void addNonFailedEventBatch() {
4852
tracker.addFailedEventBatch(batch);
4953
}
5054

55+
@Test
56+
public void removeEventBatchMultiThread() {
57+
List<EventBatch> batches = new ArrayList<>();
58+
KafkaRecordTracker tracker = new KafkaRecordTracker();
59+
tracker.open(createTopicPartitionList(500));
60+
61+
for (int i = 0; i < 100; i++) {
62+
EventBatch batch = UnitUtil.createMultiBatch(500);
63+
for (int j = 0; j < 500; j++) {
64+
batch.getEvents().get(j).setTied(createSinkRecord(j, i * 1000 + j));
65+
}
66+
batch.commit();
67+
batches.add(batch);
68+
tracker.addEventBatch(batch);
69+
}
70+
71+
Assert.assertEquals(50000, tracker.totalEvents());
72+
ExecutorService executorService = Executors.newFixedThreadPool(2);
73+
try {
74+
Future<?> first = executorService.submit(() -> tracker.removeAckedEventBatches(batches));
75+
Future<?> second = executorService.submit(() -> tracker.removeAckedEventBatches(batches));
76+
77+
first.get();
78+
second.get();
79+
} catch (ExecutionException | InterruptedException e) {
80+
throw new RuntimeException(e);
81+
} finally {
82+
executorService.shutdown();
83+
}
84+
85+
Assert.assertEquals(0, tracker.totalEvents());
86+
}
5187
@Test
5288
public void addEventBatch() {
5389
List<EventBatch> batches = new ArrayList<>();
@@ -104,4 +140,16 @@ private List<TopicPartition> createTopicPartitionList() {
104140
tps.add(new TopicPartition("t", 1));
105141
return tps;
106142
}
143+
144+
private SinkRecord createSinkRecord(int partition, long offset) {
145+
return new SinkRecord("t", partition, null, null, null, "ni, hao", offset);
146+
}
147+
148+
private List<TopicPartition> createTopicPartitionList(int number) {
149+
ArrayList<TopicPartition> tps = new ArrayList<>();
150+
for (int i = 0; i < number; i++) {
151+
tps.add(new TopicPartition("t", i));
152+
}
153+
return tps;
154+
}
107155
}

target/site/jacoco/jacoco.csv

+24-24
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,42 @@
11
GROUP,PACKAGE,CLASS,INSTRUCTION_MISSED,INSTRUCTION_COVERED,BRANCH_MISSED,BRANCH_COVERED,LINE_MISSED,LINE_COVERED,COMPLEXITY_MISSED,COMPLEXITY_COVERED,METHOD_MISSED,METHOD_COVERED
22
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder.new Credentials() {...},10,0,0,0,3,0,3,0,3,0
3-
splunk-kafka-connect,com.splunk.hecclient,EventBatch.HttpEventBatchEntity,0,40,0,2,0,8,0,7,0,6
3+
splunk-kafka-connect,com.splunk.hecclient,EventBatch.HttpEventBatchEntity,0,40,0,2,0,9,0,7,0,6
44
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder.new HostnameVerifier() {...},2,6,0,0,1,1,1,1,1,1
55
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder.new TrustStrategy() {...},2,6,0,0,1,1,1,1,1,1
6-
splunk-kafka-connect,com.splunk.hecclient,Hec,38,298,4,14,9,73,4,20,0,15
7-
splunk-kafka-connect,com.splunk.hecclient,ConcurrentHec,23,195,1,13,5,48,1,16,0,10
6+
splunk-kafka-connect,com.splunk.hecclient,Hec,38,298,4,14,10,74,4,20,0,15
7+
splunk-kafka-connect,com.splunk.hecclient,ConcurrentHec,23,196,1,13,6,50,1,16,0,10
88
splunk-kafka-connect,com.splunk.hecclient,HecException,0,9,0,0,0,4,0,2,0,2
9-
splunk-kafka-connect,com.splunk.hecclient,LoadBalancer,47,363,8,24,12,88,7,23,1,13
10-
splunk-kafka-connect,com.splunk.hecclient,HecAckPoller,122,711,11,45,28,157,9,44,0,25
11-
splunk-kafka-connect,com.splunk.hecclient,HecAckPoller.RunAckQuery,21,26,0,0,2,8,0,2,0,2
12-
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder,69,132,0,4,17,46,2,13,2,11
13-
splunk-kafka-connect,com.splunk.hecclient,RawEvent,11,63,0,8,2,18,0,8,0,4
14-
splunk-kafka-connect,com.splunk.hecclient,RawEventBatch,22,168,1,9,8,46,3,17,2,13
15-
splunk-kafka-connect,com.splunk.hecclient,JsonEvent,22,60,0,8,5,21,0,11,0,7
9+
splunk-kafka-connect,com.splunk.hecclient,LoadBalancer,48,362,8,24,12,87,7,23,1,13
10+
splunk-kafka-connect,com.splunk.hecclient,HecAckPoller,122,711,11,45,34,160,9,44,0,25
11+
splunk-kafka-connect,com.splunk.hecclient,HecAckPoller.RunAckQuery,21,26,0,0,3,8,0,2,0,2
12+
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder,69,132,0,4,16,46,2,13,2,11
13+
splunk-kafka-connect,com.splunk.hecclient,RawEvent,11,63,0,8,3,17,0,8,0,4
14+
splunk-kafka-connect,com.splunk.hecclient,RawEventBatch,22,169,1,9,8,45,3,17,2,13
15+
splunk-kafka-connect,com.splunk.hecclient,JsonEvent,22,60,0,8,6,21,0,11,0,7
1616
splunk-kafka-connect,com.splunk.hecclient,HecEmptyEventException,5,4,0,0,2,2,1,1,1,1
17-
splunk-kafka-connect,com.splunk.hecclient,HecChannel,0,116,0,12,0,36,0,22,0,16
18-
splunk-kafka-connect,com.splunk.hecclient,EventBatch,0,238,0,12,0,58,0,28,0,22
17+
splunk-kafka-connect,com.splunk.hecclient,HecChannel,0,116,0,12,0,35,0,22,0,16
18+
splunk-kafka-connect,com.splunk.hecclient,EventBatch,0,236,0,12,0,60,0,28,0,22
1919
splunk-kafka-connect,com.splunk.hecclient,HecNullEventException,5,4,0,0,2,2,1,1,1,1
2020
splunk-kafka-connect,com.splunk.hecclient,Event,3,204,0,6,1,67,1,29,1,26
2121
splunk-kafka-connect,com.splunk.hecclient,RawEventBatch.Builder,0,45,0,0,0,13,0,7,0,7
22-
splunk-kafka-connect,com.splunk.hecclient,Indexer,178,421,9,19,38,95,10,23,3,16
22+
splunk-kafka-connect,com.splunk.hecclient,Indexer,179,420,9,19,40,96,10,23,3,16
2323
splunk-kafka-connect,com.splunk.hecclient,EventBatch.GzipDataContentProducer,0,21,0,0,0,6,0,2,0,2
24-
splunk-kafka-connect,com.splunk.hecclient,HecAckPollResponse,0,43,0,4,0,8,0,5,0,3
25-
splunk-kafka-connect,com.splunk.hecclient,HecConfig,4,228,1,1,1,81,2,43,1,43
24+
splunk-kafka-connect,com.splunk.hecclient,HecAckPollResponse,0,43,0,4,0,9,0,5,0,3
25+
splunk-kafka-connect,com.splunk.hecclient,HecConfig,5,227,1,1,1,81,2,43,1,43
2626
splunk-kafka-connect,com.splunk.hecclient,Indexer.new Configuration() {...},21,0,0,0,4,0,2,0,2,0
27-
splunk-kafka-connect,com.splunk.hecclient,HecURIBuilder,6,59,0,4,2,14,0,4,0,2
27+
splunk-kafka-connect,com.splunk.hecclient,HecURIBuilder,6,59,0,4,2,13,0,4,0,2
2828
splunk-kafka-connect,com.splunk.hecclient,DoubleSerializer,0,15,0,0,0,4,0,2,0,2
2929
splunk-kafka-connect,com.splunk.hecclient,PostResponse,0,37,0,2,0,13,0,8,0,7
30-
splunk-kafka-connect,com.splunk.hecclient,ResponsePoller,34,79,1,7,7,22,3,11,2,8
31-
splunk-kafka-connect,com.splunk.hecclient,JsonEventBatch,19,33,2,2,7,10,3,6,2,5
30+
splunk-kafka-connect,com.splunk.hecclient,ResponsePoller,33,80,1,7,7,23,3,11,2,8
31+
splunk-kafka-connect,com.splunk.hecclient,JsonEventBatch,19,33,2,2,7,9,3,6,2,5
3232
splunk-kafka-connect,com.splunk.hecclient,EventBatch.HttpEventBatchEntity.new Enumeration() {...},0,44,0,4,0,4,0,5,0,3
3333
splunk-kafka-connect,com.splunk.kafka.connect,HecClientWrapper,3,3,0,0,1,1,1,1,1,1
34-
splunk-kafka-connect,com.splunk.kafka.connect,KafkaRecordTracker,22,315,4,22,6,68,4,26,0,17
35-
splunk-kafka-connect,com.splunk.kafka.connect,JacksonStructModule.StructSerializer,0,36,0,2,0,6,0,3,0,2
36-
splunk-kafka-connect,com.splunk.kafka.connect,VersionUtils,11,86,0,10,3,25,1,11,1,6
34+
splunk-kafka-connect,com.splunk.kafka.connect,KafkaRecordTracker,27,351,5,25,10,76,5,29,0,19
35+
splunk-kafka-connect,com.splunk.kafka.connect,JacksonStructModule.StructSerializer,0,36,0,2,0,7,0,3,0,2
36+
splunk-kafka-connect,com.splunk.kafka.connect,VersionUtils,11,86,0,10,4,26,1,11,1,6
3737
splunk-kafka-connect,com.splunk.kafka.connect,JacksonStructModule,0,10,0,0,0,3,0,1,0,1
3838
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkRecord,74,188,14,14,22,47,18,9,5,8
39-
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnectorConfig,5,1337,6,72,1,236,6,47,0,14
40-
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnector,57,338,3,25,12,82,4,26,1,15
39+
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnectorConfig,5,1269,6,72,1,198,6,47,0,14
40+
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnector,57,383,4,30,11,85,5,28,1,15
4141
splunk-kafka-connect,com.splunk.kafka.connect,AbstractClientWrapper,0,3,0,0,0,1,0,1,0,1
42-
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkTask,415,1134,58,84,66,252,48,56,3,30
42+
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkTask,416,1134,58,84,68,260,48,56,3,30

target/site/jacoco/jacoco.xml

+1-1
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)