diff --git a/src/main/java/com/splunk/kafka/connect/KafkaRecordTracker.java b/src/main/java/com/splunk/kafka/connect/KafkaRecordTracker.java index 954348ad..f3fd5a79 100644 --- a/src/main/java/com/splunk/kafka/connect/KafkaRecordTracker.java +++ b/src/main/java/com/splunk/kafka/connect/KafkaRecordTracker.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; @@ -39,6 +40,7 @@ final class KafkaRecordTracker { private ConcurrentLinkedQueue failed; private volatile Map offsets; private Collection partitions; + private AtomicBoolean removingEventsInProgress = new AtomicBoolean(false); public KafkaRecordTracker() { all = new ConcurrentHashMap<>(); @@ -58,35 +60,58 @@ public void removeAckedEventBatches(final List batches) { log.debug("received acked event batches={}", batches); /* Loop all *assigned* partitions to find the lowest consecutive * HEC-commited offsets. A batch could contain events coming from a - * variety of topic/partitions, and scanning those events coulb be + * variety of topic/partitions, and scanning those events could be * expensive. * Note that if some events are tied to an unassigned partition those - * offsets won't be able to be commited. + * offsets won't be able to be committed. */ - for (TopicPartition tp : partitions) { - ConcurrentNavigableMap tpRecords = all.get(tp); - if (tpRecords == null) { - continue; // nothing to remove in this case - } - long offset = -1; - Iterator> iter = tpRecords.entrySet().iterator(); - for (; iter.hasNext();) { - Map.Entry e = iter.next(); - if (e.getValue().isCommitted()) { - log.debug("processing offset {}", e.getKey()); - offset = e.getKey(); - iter.remove(); - total.decrementAndGet(); - } else { - break; + // With the current implementation, we don't need to let multiple threads cleaning events in parallel. + if (removingEventsInProgress.compareAndSet(false, true)) { + try { + long countOfEventsToRemove = 0; + + for (TopicPartition tp : partitions) { + ConcurrentNavigableMap tpRecords = all.get(tp); + if (tpRecords == null) { + continue; // nothing to remove in this case + } + long offset = -1; + Iterator> iter = tpRecords.entrySet().iterator(); + for (; iter.hasNext(); ) { + Map.Entry e = iter.next(); + if (e.getValue().isCommitted()) { + log.debug("processing offset {}", e.getKey()); + offset = e.getKey(); + iter.remove(); + countOfEventsToRemove++; + } else { + break; + } + } + if (offset >= 0) { + offsets.put(tp, new OffsetAndMetadata(offset + 1)); + } } - } - if (offset >= 0) { - offsets.put(tp, new OffsetAndMetadata(offset + 1)); + decrementTotalEventCount(countOfEventsToRemove); + } finally { + removingEventsInProgress.set(false); } } } + private void decrementTotalEventCount(long countOfEventsToRemove) { + total.getAndUpdate(current -> { + if (current < countOfEventsToRemove) { + log.warn("Total event count ({}) is lower than the count ({}) we try to remove, resetting to 0", + current, + countOfEventsToRemove); + return 0; + } else { + return current - countOfEventsToRemove; + } + }); + } + public void addFailedEventBatch(final EventBatch batch) { if (!batch.isFailed()) { throw new RuntimeException("event batch was not failed"); @@ -180,7 +205,7 @@ public void cleanupAfterClosedPartitions(Collection partitions) log.warn("purge events={} from closed partitions={}", countOfEventsToRemove, partitions); all.keySet().removeAll(partitions); - total.addAndGet(-1L * countOfEventsToRemove); + decrementTotalEventCount(countOfEventsToRemove); } } } diff --git a/src/test/java/com/splunk/hecclient/UnitUtil.java b/src/test/java/com/splunk/hecclient/UnitUtil.java index 798401c6..65553e96 100644 --- a/src/test/java/com/splunk/hecclient/UnitUtil.java +++ b/src/test/java/com/splunk/hecclient/UnitUtil.java @@ -35,6 +35,15 @@ public static EventBatch createBatch() { return batch; } + public static EventBatch createMultiBatch(int count) { + EventBatch batch = new JsonEventBatch(); + for (int i = 0; i < count; i++) { + Event event = new JsonEvent("ni-" + i, "hao-" + i); + batch.add(event); + } + return batch; + } + public static EventBatch createRawEventBatch() { Event event = new RawEvent("ni", "hao"); EventBatch batch = RawEventBatch.factory().build(); diff --git a/src/test/java/com/splunk/kafka/connect/KafkaRecordTrackerTest.java b/src/test/java/com/splunk/kafka/connect/KafkaRecordTrackerTest.java index baa6f6ef..fda6b8b1 100644 --- a/src/test/java/com/splunk/kafka/connect/KafkaRecordTrackerTest.java +++ b/src/test/java/com/splunk/kafka/connect/KafkaRecordTrackerTest.java @@ -27,6 +27,10 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; public class KafkaRecordTrackerTest { @Test @@ -48,6 +52,38 @@ public void addNonFailedEventBatch() { tracker.addFailedEventBatch(batch); } + @Test + public void removeEventBatchMultiThread() { + List batches = new ArrayList<>(); + KafkaRecordTracker tracker = new KafkaRecordTracker(); + tracker.open(createTopicPartitionList(500)); + + for (int i = 0; i < 100; i++) { + EventBatch batch = UnitUtil.createMultiBatch(500); + for (int j = 0; j < 500; j++) { + batch.getEvents().get(j).setTied(createSinkRecord(j, i * 1000 + j)); + } + batch.commit(); + batches.add(batch); + tracker.addEventBatch(batch); + } + + Assert.assertEquals(50000, tracker.totalEvents()); + ExecutorService executorService = Executors.newFixedThreadPool(2); + try { + Future first = executorService.submit(() -> tracker.removeAckedEventBatches(batches)); + Future second = executorService.submit(() -> tracker.removeAckedEventBatches(batches)); + + first.get(); + second.get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } finally { + executorService.shutdown(); + } + + Assert.assertEquals(0, tracker.totalEvents()); + } @Test public void addEventBatch() { List batches = new ArrayList<>(); @@ -104,4 +140,16 @@ private List createTopicPartitionList() { tps.add(new TopicPartition("t", 1)); return tps; } + + private SinkRecord createSinkRecord(int partition, long offset) { + return new SinkRecord("t", partition, null, null, null, "ni, hao", offset); + } + + private List createTopicPartitionList(int number) { + ArrayList tps = new ArrayList<>(); + for (int i = 0; i < number; i++) { + tps.add(new TopicPartition("t", i)); + } + return tps; + } } diff --git a/target/site/jacoco/jacoco.csv b/target/site/jacoco/jacoco.csv index f2852d0b..71d43cf6 100644 --- a/target/site/jacoco/jacoco.csv +++ b/target/site/jacoco/jacoco.csv @@ -1,42 +1,42 @@ GROUP,PACKAGE,CLASS,INSTRUCTION_MISSED,INSTRUCTION_COVERED,BRANCH_MISSED,BRANCH_COVERED,LINE_MISSED,LINE_COVERED,COMPLEXITY_MISSED,COMPLEXITY_COVERED,METHOD_MISSED,METHOD_COVERED splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder.new Credentials() {...},10,0,0,0,3,0,3,0,3,0 -splunk-kafka-connect,com.splunk.hecclient,EventBatch.HttpEventBatchEntity,0,40,0,2,0,8,0,7,0,6 +splunk-kafka-connect,com.splunk.hecclient,EventBatch.HttpEventBatchEntity,0,40,0,2,0,9,0,7,0,6 splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder.new HostnameVerifier() {...},2,6,0,0,1,1,1,1,1,1 splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder.new TrustStrategy() {...},2,6,0,0,1,1,1,1,1,1 -splunk-kafka-connect,com.splunk.hecclient,Hec,38,298,4,14,9,73,4,20,0,15 -splunk-kafka-connect,com.splunk.hecclient,ConcurrentHec,23,195,1,13,5,48,1,16,0,10 +splunk-kafka-connect,com.splunk.hecclient,Hec,38,298,4,14,10,74,4,20,0,15 +splunk-kafka-connect,com.splunk.hecclient,ConcurrentHec,23,196,1,13,6,50,1,16,0,10 splunk-kafka-connect,com.splunk.hecclient,HecException,0,9,0,0,0,4,0,2,0,2 -splunk-kafka-connect,com.splunk.hecclient,LoadBalancer,47,363,8,24,12,88,7,23,1,13 -splunk-kafka-connect,com.splunk.hecclient,HecAckPoller,122,711,11,45,28,157,9,44,0,25 -splunk-kafka-connect,com.splunk.hecclient,HecAckPoller.RunAckQuery,21,26,0,0,2,8,0,2,0,2 -splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder,69,132,0,4,17,46,2,13,2,11 -splunk-kafka-connect,com.splunk.hecclient,RawEvent,11,63,0,8,2,18,0,8,0,4 -splunk-kafka-connect,com.splunk.hecclient,RawEventBatch,22,168,1,9,8,46,3,17,2,13 -splunk-kafka-connect,com.splunk.hecclient,JsonEvent,22,60,0,8,5,21,0,11,0,7 +splunk-kafka-connect,com.splunk.hecclient,LoadBalancer,48,362,8,24,12,87,7,23,1,13 +splunk-kafka-connect,com.splunk.hecclient,HecAckPoller,122,711,11,45,34,160,9,44,0,25 +splunk-kafka-connect,com.splunk.hecclient,HecAckPoller.RunAckQuery,21,26,0,0,3,8,0,2,0,2 +splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder,69,132,0,4,16,46,2,13,2,11 +splunk-kafka-connect,com.splunk.hecclient,RawEvent,11,63,0,8,3,17,0,8,0,4 +splunk-kafka-connect,com.splunk.hecclient,RawEventBatch,22,169,1,9,8,45,3,17,2,13 +splunk-kafka-connect,com.splunk.hecclient,JsonEvent,22,60,0,8,6,21,0,11,0,7 splunk-kafka-connect,com.splunk.hecclient,HecEmptyEventException,5,4,0,0,2,2,1,1,1,1 -splunk-kafka-connect,com.splunk.hecclient,HecChannel,0,116,0,12,0,36,0,22,0,16 -splunk-kafka-connect,com.splunk.hecclient,EventBatch,0,238,0,12,0,58,0,28,0,22 +splunk-kafka-connect,com.splunk.hecclient,HecChannel,0,116,0,12,0,35,0,22,0,16 +splunk-kafka-connect,com.splunk.hecclient,EventBatch,0,236,0,12,0,60,0,28,0,22 splunk-kafka-connect,com.splunk.hecclient,HecNullEventException,5,4,0,0,2,2,1,1,1,1 splunk-kafka-connect,com.splunk.hecclient,Event,3,204,0,6,1,67,1,29,1,26 splunk-kafka-connect,com.splunk.hecclient,RawEventBatch.Builder,0,45,0,0,0,13,0,7,0,7 -splunk-kafka-connect,com.splunk.hecclient,Indexer,178,421,9,19,38,95,10,23,3,16 +splunk-kafka-connect,com.splunk.hecclient,Indexer,179,420,9,19,40,96,10,23,3,16 splunk-kafka-connect,com.splunk.hecclient,EventBatch.GzipDataContentProducer,0,21,0,0,0,6,0,2,0,2 -splunk-kafka-connect,com.splunk.hecclient,HecAckPollResponse,0,43,0,4,0,8,0,5,0,3 -splunk-kafka-connect,com.splunk.hecclient,HecConfig,4,228,1,1,1,81,2,43,1,43 +splunk-kafka-connect,com.splunk.hecclient,HecAckPollResponse,0,43,0,4,0,9,0,5,0,3 +splunk-kafka-connect,com.splunk.hecclient,HecConfig,5,227,1,1,1,81,2,43,1,43 splunk-kafka-connect,com.splunk.hecclient,Indexer.new Configuration() {...},21,0,0,0,4,0,2,0,2,0 -splunk-kafka-connect,com.splunk.hecclient,HecURIBuilder,6,59,0,4,2,14,0,4,0,2 +splunk-kafka-connect,com.splunk.hecclient,HecURIBuilder,6,59,0,4,2,13,0,4,0,2 splunk-kafka-connect,com.splunk.hecclient,DoubleSerializer,0,15,0,0,0,4,0,2,0,2 splunk-kafka-connect,com.splunk.hecclient,PostResponse,0,37,0,2,0,13,0,8,0,7 -splunk-kafka-connect,com.splunk.hecclient,ResponsePoller,34,79,1,7,7,22,3,11,2,8 -splunk-kafka-connect,com.splunk.hecclient,JsonEventBatch,19,33,2,2,7,10,3,6,2,5 +splunk-kafka-connect,com.splunk.hecclient,ResponsePoller,33,80,1,7,7,23,3,11,2,8 +splunk-kafka-connect,com.splunk.hecclient,JsonEventBatch,19,33,2,2,7,9,3,6,2,5 splunk-kafka-connect,com.splunk.hecclient,EventBatch.HttpEventBatchEntity.new Enumeration() {...},0,44,0,4,0,4,0,5,0,3 splunk-kafka-connect,com.splunk.kafka.connect,HecClientWrapper,3,3,0,0,1,1,1,1,1,1 -splunk-kafka-connect,com.splunk.kafka.connect,KafkaRecordTracker,22,315,4,22,6,68,4,26,0,17 -splunk-kafka-connect,com.splunk.kafka.connect,JacksonStructModule.StructSerializer,0,36,0,2,0,6,0,3,0,2 -splunk-kafka-connect,com.splunk.kafka.connect,VersionUtils,11,86,0,10,3,25,1,11,1,6 +splunk-kafka-connect,com.splunk.kafka.connect,KafkaRecordTracker,27,351,5,25,10,76,5,29,0,19 +splunk-kafka-connect,com.splunk.kafka.connect,JacksonStructModule.StructSerializer,0,36,0,2,0,7,0,3,0,2 +splunk-kafka-connect,com.splunk.kafka.connect,VersionUtils,11,86,0,10,4,26,1,11,1,6 splunk-kafka-connect,com.splunk.kafka.connect,JacksonStructModule,0,10,0,0,0,3,0,1,0,1 splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkRecord,74,188,14,14,22,47,18,9,5,8 -splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnectorConfig,5,1337,6,72,1,236,6,47,0,14 -splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnector,57,338,3,25,12,82,4,26,1,15 +splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnectorConfig,5,1269,6,72,1,198,6,47,0,14 +splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnector,57,383,4,30,11,85,5,28,1,15 splunk-kafka-connect,com.splunk.kafka.connect,AbstractClientWrapper,0,3,0,0,0,1,0,1,0,1 -splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkTask,415,1134,58,84,66,252,48,56,3,30 +splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkTask,416,1134,58,84,68,260,48,56,3,30 diff --git a/target/site/jacoco/jacoco.xml b/target/site/jacoco/jacoco.xml index 00c8a7c6..1a1a79de 100644 --- a/target/site/jacoco/jacoco.xml +++ b/target/site/jacoco/jacoco.xml @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file