Skip to content

Commit 8c5d933

Browse files
authored
Merge pull request #431 from ludovic-boutros/fix-concurrent-delete-in-record-tracker
Fix: record tracker does not support concurrent event batch removal
2 parents 6879f4b + 62c39d6 commit 8c5d933

File tree

5 files changed

+129
-47
lines changed

5 files changed

+129
-47
lines changed

src/main/java/com/splunk/kafka/connect/KafkaRecordTracker.java

+47-22
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.ConcurrentMap;
2828
import java.util.concurrent.ConcurrentNavigableMap;
2929
import java.util.concurrent.ConcurrentSkipListMap;
30+
import java.util.concurrent.atomic.AtomicBoolean;
3031
import java.util.concurrent.atomic.AtomicLong;
3132

3233
import org.slf4j.Logger;
@@ -39,6 +40,7 @@ final class KafkaRecordTracker {
3940
private ConcurrentLinkedQueue<EventBatch> failed;
4041
private volatile Map<TopicPartition, OffsetAndMetadata> offsets;
4142
private Collection<TopicPartition> partitions;
43+
private AtomicBoolean removingEventsInProgress = new AtomicBoolean(false);
4244

4345
public KafkaRecordTracker() {
4446
all = new ConcurrentHashMap<>();
@@ -58,35 +60,58 @@ public void removeAckedEventBatches(final List<EventBatch> batches) {
5860
log.debug("received acked event batches={}", batches);
5961
/* Loop all *assigned* partitions to find the lowest consecutive
6062
* HEC-commited offsets. A batch could contain events coming from a
61-
* variety of topic/partitions, and scanning those events coulb be
63+
* variety of topic/partitions, and scanning those events could be
6264
* expensive.
6365
* Note that if some events are tied to an unassigned partition those
64-
* offsets won't be able to be commited.
66+
* offsets won't be able to be committed.
6567
*/
66-
for (TopicPartition tp : partitions) {
67-
ConcurrentNavigableMap<Long, EventBatch> tpRecords = all.get(tp);
68-
if (tpRecords == null) {
69-
continue; // nothing to remove in this case
70-
}
71-
long offset = -1;
72-
Iterator<Map.Entry<Long, EventBatch>> iter = tpRecords.entrySet().iterator();
73-
for (; iter.hasNext();) {
74-
Map.Entry<Long, EventBatch> e = iter.next();
75-
if (e.getValue().isCommitted()) {
76-
log.debug("processing offset {}", e.getKey());
77-
offset = e.getKey();
78-
iter.remove();
79-
total.decrementAndGet();
80-
} else {
81-
break;
68+
// With the current implementation, we don't need to let multiple threads cleaning events in parallel.
69+
if (removingEventsInProgress.compareAndSet(false, true)) {
70+
try {
71+
long countOfEventsToRemove = 0;
72+
73+
for (TopicPartition tp : partitions) {
74+
ConcurrentNavigableMap<Long, EventBatch> tpRecords = all.get(tp);
75+
if (tpRecords == null) {
76+
continue; // nothing to remove in this case
77+
}
78+
long offset = -1;
79+
Iterator<Map.Entry<Long, EventBatch>> iter = tpRecords.entrySet().iterator();
80+
for (; iter.hasNext(); ) {
81+
Map.Entry<Long, EventBatch> e = iter.next();
82+
if (e.getValue().isCommitted()) {
83+
log.debug("processing offset {}", e.getKey());
84+
offset = e.getKey();
85+
iter.remove();
86+
countOfEventsToRemove++;
87+
} else {
88+
break;
89+
}
90+
}
91+
if (offset >= 0) {
92+
offsets.put(tp, new OffsetAndMetadata(offset + 1));
93+
}
8294
}
83-
}
84-
if (offset >= 0) {
85-
offsets.put(tp, new OffsetAndMetadata(offset + 1));
95+
decrementTotalEventCount(countOfEventsToRemove);
96+
} finally {
97+
removingEventsInProgress.set(false);
8698
}
8799
}
88100
}
89101

102+
private void decrementTotalEventCount(long countOfEventsToRemove) {
103+
total.getAndUpdate(current -> {
104+
if (current < countOfEventsToRemove) {
105+
log.warn("Total event count ({}) is lower than the count ({}) we try to remove, resetting to 0",
106+
current,
107+
countOfEventsToRemove);
108+
return 0;
109+
} else {
110+
return current - countOfEventsToRemove;
111+
}
112+
});
113+
}
114+
90115
public void addFailedEventBatch(final EventBatch batch) {
91116
if (!batch.isFailed()) {
92117
throw new RuntimeException("event batch was not failed");
@@ -180,7 +205,7 @@ public void cleanupAfterClosedPartitions(Collection<TopicPartition> partitions)
180205
log.warn("purge events={} from closed partitions={}",
181206
countOfEventsToRemove, partitions);
182207
all.keySet().removeAll(partitions);
183-
total.addAndGet(-1L * countOfEventsToRemove);
208+
decrementTotalEventCount(countOfEventsToRemove);
184209
}
185210
}
186211
}

src/test/java/com/splunk/hecclient/UnitUtil.java

+9
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,15 @@ public static EventBatch createBatch() {
3535
return batch;
3636
}
3737

38+
public static EventBatch createMultiBatch(int count) {
39+
EventBatch batch = new JsonEventBatch();
40+
for (int i = 0; i < count; i++) {
41+
Event event = new JsonEvent("ni-" + i, "hao-" + i);
42+
batch.add(event);
43+
}
44+
return batch;
45+
}
46+
3847
public static EventBatch createRawEventBatch() {
3948
Event event = new RawEvent("ni", "hao");
4049
EventBatch batch = RawEventBatch.factory().build();

src/test/java/com/splunk/kafka/connect/KafkaRecordTrackerTest.java

+48
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
import java.util.Collection;
2828
import java.util.List;
2929
import java.util.Map;
30+
import java.util.concurrent.ExecutionException;
31+
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.Executors;
33+
import java.util.concurrent.Future;
3034

3135
public class KafkaRecordTrackerTest {
3236
@Test
@@ -48,6 +52,38 @@ public void addNonFailedEventBatch() {
4852
tracker.addFailedEventBatch(batch);
4953
}
5054

55+
@Test
56+
public void removeEventBatchMultiThread() {
57+
List<EventBatch> batches = new ArrayList<>();
58+
KafkaRecordTracker tracker = new KafkaRecordTracker();
59+
tracker.open(createTopicPartitionList(500));
60+
61+
for (int i = 0; i < 100; i++) {
62+
EventBatch batch = UnitUtil.createMultiBatch(500);
63+
for (int j = 0; j < 500; j++) {
64+
batch.getEvents().get(j).setTied(createSinkRecord(j, i * 1000 + j));
65+
}
66+
batch.commit();
67+
batches.add(batch);
68+
tracker.addEventBatch(batch);
69+
}
70+
71+
Assert.assertEquals(50000, tracker.totalEvents());
72+
ExecutorService executorService = Executors.newFixedThreadPool(2);
73+
try {
74+
Future<?> first = executorService.submit(() -> tracker.removeAckedEventBatches(batches));
75+
Future<?> second = executorService.submit(() -> tracker.removeAckedEventBatches(batches));
76+
77+
first.get();
78+
second.get();
79+
} catch (ExecutionException | InterruptedException e) {
80+
throw new RuntimeException(e);
81+
} finally {
82+
executorService.shutdown();
83+
}
84+
85+
Assert.assertEquals(0, tracker.totalEvents());
86+
}
5187
@Test
5288
public void addEventBatch() {
5389
List<EventBatch> batches = new ArrayList<>();
@@ -104,4 +140,16 @@ private List<TopicPartition> createTopicPartitionList() {
104140
tps.add(new TopicPartition("t", 1));
105141
return tps;
106142
}
143+
144+
private SinkRecord createSinkRecord(int partition, long offset) {
145+
return new SinkRecord("t", partition, null, null, null, "ni, hao", offset);
146+
}
147+
148+
private List<TopicPartition> createTopicPartitionList(int number) {
149+
ArrayList<TopicPartition> tps = new ArrayList<>();
150+
for (int i = 0; i < number; i++) {
151+
tps.add(new TopicPartition("t", i));
152+
}
153+
return tps;
154+
}
107155
}

target/site/jacoco/jacoco.csv

+24-24
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,42 @@
11
GROUP,PACKAGE,CLASS,INSTRUCTION_MISSED,INSTRUCTION_COVERED,BRANCH_MISSED,BRANCH_COVERED,LINE_MISSED,LINE_COVERED,COMPLEXITY_MISSED,COMPLEXITY_COVERED,METHOD_MISSED,METHOD_COVERED
22
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder.new Credentials() {...},10,0,0,0,3,0,3,0,3,0
3-
splunk-kafka-connect,com.splunk.hecclient,EventBatch.HttpEventBatchEntity,0,40,0,2,0,8,0,7,0,6
3+
splunk-kafka-connect,com.splunk.hecclient,EventBatch.HttpEventBatchEntity,0,40,0,2,0,9,0,7,0,6
44
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder.new HostnameVerifier() {...},2,6,0,0,1,1,1,1,1,1
55
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder.new TrustStrategy() {...},2,6,0,0,1,1,1,1,1,1
6-
splunk-kafka-connect,com.splunk.hecclient,Hec,38,298,4,14,9,73,4,20,0,15
7-
splunk-kafka-connect,com.splunk.hecclient,ConcurrentHec,23,195,1,13,5,48,1,16,0,10
6+
splunk-kafka-connect,com.splunk.hecclient,Hec,38,298,4,14,10,74,4,20,0,15
7+
splunk-kafka-connect,com.splunk.hecclient,ConcurrentHec,23,196,1,13,6,50,1,16,0,10
88
splunk-kafka-connect,com.splunk.hecclient,HecException,0,9,0,0,0,4,0,2,0,2
9-
splunk-kafka-connect,com.splunk.hecclient,LoadBalancer,47,363,8,24,12,88,7,23,1,13
10-
splunk-kafka-connect,com.splunk.hecclient,HecAckPoller,122,711,11,45,28,157,9,44,0,25
11-
splunk-kafka-connect,com.splunk.hecclient,HecAckPoller.RunAckQuery,21,26,0,0,2,8,0,2,0,2
12-
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder,69,132,0,4,17,46,2,13,2,11
13-
splunk-kafka-connect,com.splunk.hecclient,RawEvent,11,63,0,8,2,18,0,8,0,4
14-
splunk-kafka-connect,com.splunk.hecclient,RawEventBatch,22,168,1,9,8,46,3,17,2,13
15-
splunk-kafka-connect,com.splunk.hecclient,JsonEvent,22,60,0,8,5,21,0,11,0,7
9+
splunk-kafka-connect,com.splunk.hecclient,LoadBalancer,48,362,8,24,12,87,7,23,1,13
10+
splunk-kafka-connect,com.splunk.hecclient,HecAckPoller,122,711,11,45,34,160,9,44,0,25
11+
splunk-kafka-connect,com.splunk.hecclient,HecAckPoller.RunAckQuery,21,26,0,0,3,8,0,2,0,2
12+
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder,69,132,0,4,16,46,2,13,2,11
13+
splunk-kafka-connect,com.splunk.hecclient,RawEvent,11,63,0,8,3,17,0,8,0,4
14+
splunk-kafka-connect,com.splunk.hecclient,RawEventBatch,22,169,1,9,8,45,3,17,2,13
15+
splunk-kafka-connect,com.splunk.hecclient,JsonEvent,22,60,0,8,6,21,0,11,0,7
1616
splunk-kafka-connect,com.splunk.hecclient,HecEmptyEventException,5,4,0,0,2,2,1,1,1,1
17-
splunk-kafka-connect,com.splunk.hecclient,HecChannel,0,116,0,12,0,36,0,22,0,16
18-
splunk-kafka-connect,com.splunk.hecclient,EventBatch,0,238,0,12,0,58,0,28,0,22
17+
splunk-kafka-connect,com.splunk.hecclient,HecChannel,0,116,0,12,0,35,0,22,0,16
18+
splunk-kafka-connect,com.splunk.hecclient,EventBatch,0,236,0,12,0,60,0,28,0,22
1919
splunk-kafka-connect,com.splunk.hecclient,HecNullEventException,5,4,0,0,2,2,1,1,1,1
2020
splunk-kafka-connect,com.splunk.hecclient,Event,3,204,0,6,1,67,1,29,1,26
2121
splunk-kafka-connect,com.splunk.hecclient,RawEventBatch.Builder,0,45,0,0,0,13,0,7,0,7
22-
splunk-kafka-connect,com.splunk.hecclient,Indexer,178,421,9,19,38,95,10,23,3,16
22+
splunk-kafka-connect,com.splunk.hecclient,Indexer,179,420,9,19,40,96,10,23,3,16
2323
splunk-kafka-connect,com.splunk.hecclient,EventBatch.GzipDataContentProducer,0,21,0,0,0,6,0,2,0,2
24-
splunk-kafka-connect,com.splunk.hecclient,HecAckPollResponse,0,43,0,4,0,8,0,5,0,3
25-
splunk-kafka-connect,com.splunk.hecclient,HecConfig,4,228,1,1,1,81,2,43,1,43
24+
splunk-kafka-connect,com.splunk.hecclient,HecAckPollResponse,0,43,0,4,0,9,0,5,0,3
25+
splunk-kafka-connect,com.splunk.hecclient,HecConfig,5,227,1,1,1,81,2,43,1,43
2626
splunk-kafka-connect,com.splunk.hecclient,Indexer.new Configuration() {...},21,0,0,0,4,0,2,0,2,0
27-
splunk-kafka-connect,com.splunk.hecclient,HecURIBuilder,6,59,0,4,2,14,0,4,0,2
27+
splunk-kafka-connect,com.splunk.hecclient,HecURIBuilder,6,59,0,4,2,13,0,4,0,2
2828
splunk-kafka-connect,com.splunk.hecclient,DoubleSerializer,0,15,0,0,0,4,0,2,0,2
2929
splunk-kafka-connect,com.splunk.hecclient,PostResponse,0,37,0,2,0,13,0,8,0,7
30-
splunk-kafka-connect,com.splunk.hecclient,ResponsePoller,34,79,1,7,7,22,3,11,2,8
31-
splunk-kafka-connect,com.splunk.hecclient,JsonEventBatch,19,33,2,2,7,10,3,6,2,5
30+
splunk-kafka-connect,com.splunk.hecclient,ResponsePoller,33,80,1,7,7,23,3,11,2,8
31+
splunk-kafka-connect,com.splunk.hecclient,JsonEventBatch,19,33,2,2,7,9,3,6,2,5
3232
splunk-kafka-connect,com.splunk.hecclient,EventBatch.HttpEventBatchEntity.new Enumeration() {...},0,44,0,4,0,4,0,5,0,3
3333
splunk-kafka-connect,com.splunk.kafka.connect,HecClientWrapper,3,3,0,0,1,1,1,1,1,1
34-
splunk-kafka-connect,com.splunk.kafka.connect,KafkaRecordTracker,22,315,4,22,6,68,4,26,0,17
35-
splunk-kafka-connect,com.splunk.kafka.connect,JacksonStructModule.StructSerializer,0,36,0,2,0,6,0,3,0,2
36-
splunk-kafka-connect,com.splunk.kafka.connect,VersionUtils,11,86,0,10,3,25,1,11,1,6
34+
splunk-kafka-connect,com.splunk.kafka.connect,KafkaRecordTracker,27,351,5,25,10,76,5,29,0,19
35+
splunk-kafka-connect,com.splunk.kafka.connect,JacksonStructModule.StructSerializer,0,36,0,2,0,7,0,3,0,2
36+
splunk-kafka-connect,com.splunk.kafka.connect,VersionUtils,11,86,0,10,4,26,1,11,1,6
3737
splunk-kafka-connect,com.splunk.kafka.connect,JacksonStructModule,0,10,0,0,0,3,0,1,0,1
3838
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkRecord,74,188,14,14,22,47,18,9,5,8
39-
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnectorConfig,5,1337,6,72,1,236,6,47,0,14
40-
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnector,57,338,3,25,12,82,4,26,1,15
39+
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnectorConfig,5,1269,6,72,1,198,6,47,0,14
40+
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnector,57,383,4,30,11,85,5,28,1,15
4141
splunk-kafka-connect,com.splunk.kafka.connect,AbstractClientWrapper,0,3,0,0,0,1,0,1,0,1
42-
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkTask,415,1134,58,84,66,252,48,56,3,30
42+
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkTask,416,1134,58,84,68,260,48,56,3,30

target/site/jacoco/jacoco.xml

+1-1
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)