Skip to content

Commit 6c00d8e

Browse files
author
John Roesler
committed
Revert "KAFKA-10866: Add metadata to ConsumerRecords (apache#9836)"
This reverts commit fdcf8fb. Closes apache#10119 Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 91fa677 commit 6c00d8e

File tree

10 files changed

+50
-439
lines changed

10 files changed

+50
-439
lines changed

Diff for: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java

+2-101
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,11 @@
1616
*/
1717
package org.apache.kafka.clients.consumer;
1818

19-
import org.apache.kafka.clients.consumer.internals.FetchedRecords;
2019
import org.apache.kafka.common.TopicPartition;
2120
import org.apache.kafka.common.utils.AbstractIterator;
2221

2322
import java.util.ArrayList;
2423
import java.util.Collections;
25-
import java.util.HashMap;
2624
import java.util.Iterator;
2725
import java.util.List;
2826
import java.util.Map;
@@ -34,99 +32,12 @@
3432
* partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
3533
*/
3634
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
37-
public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(
38-
Collections.emptyMap(),
39-
Collections.emptyMap()
40-
);
35+
public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(Collections.emptyMap());
4136

4237
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
43-
private final Map<TopicPartition, Metadata> metadata;
4438

45-
public static final class Metadata {
46-
47-
private final long receivedTimestamp;
48-
private final Long position;
49-
private final Long endOffset;
50-
51-
public Metadata(final long receivedTimestamp,
52-
final Long position,
53-
final Long endOffset) {
54-
this.receivedTimestamp = receivedTimestamp;
55-
this.position = position;
56-
this.endOffset = endOffset;
57-
}
58-
59-
/**
60-
* @return The timestamp of the broker response that contained this metadata
61-
*/
62-
public long receivedTimestamp() {
63-
return receivedTimestamp;
64-
}
65-
66-
/**
67-
* @return The next position the consumer will fetch, or null if the consumer has no position.
68-
*/
69-
public Long position() {
70-
return position;
71-
}
72-
73-
/**
74-
* @return The lag between the next position to fetch and the current end of the partition, or
75-
* null if the end offset is not known or there is no position.
76-
*/
77-
public Long lag() {
78-
return endOffset == null || position == null ? null : endOffset - position;
79-
}
80-
81-
/**
82-
* @return The current last offset in the partition. The determination of the "last" offset
83-
* depends on the Consumer's isolation level. Under "read_uncommitted," this is the last successfully
84-
* replicated offset plus one. Under "read_committed," this is the minimum of the last successfully
85-
* replicated offset plus one or the smallest offset of any open transaction. Null if the end offset
86-
* is not known.
87-
*/
88-
public Long endOffset() {
89-
return endOffset;
90-
}
91-
92-
@Override
93-
public String toString() {
94-
return "Metadata{" +
95-
"receivedTimestamp=" + receivedTimestamp +
96-
", position=" + position +
97-
", endOffset=" + endOffset +
98-
'}';
99-
}
100-
}
101-
102-
private static <K, V> Map<TopicPartition, Metadata> extractMetadata(final FetchedRecords<K, V> fetchedRecords) {
103-
final Map<TopicPartition, Metadata> metadata = new HashMap<>();
104-
for (final Map.Entry<TopicPartition, FetchedRecords.FetchMetadata> entry : fetchedRecords.metadata().entrySet()) {
105-
metadata.put(
106-
entry.getKey(),
107-
new Metadata(
108-
entry.getValue().receivedTimestamp(),
109-
entry.getValue().position() == null ? null : entry.getValue().position().offset,
110-
entry.getValue().endOffset()
111-
)
112-
);
113-
}
114-
return metadata;
115-
}
116-
117-
public ConsumerRecords(final Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
118-
this.records = records;
119-
this.metadata = new HashMap<>();
120-
}
121-
122-
public ConsumerRecords(final Map<TopicPartition, List<ConsumerRecord<K, V>>> records,
123-
final Map<TopicPartition, Metadata> metadata) {
39+
public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
12440
this.records = records;
125-
this.metadata = metadata;
126-
}
127-
128-
ConsumerRecords(final FetchedRecords<K, V> fetchedRecords) {
129-
this(fetchedRecords.records(), extractMetadata(fetchedRecords));
13041
}
13142

13243
/**
@@ -142,16 +53,6 @@ public List<ConsumerRecord<K, V>> records(TopicPartition partition) {
14253
return Collections.unmodifiableList(recs);
14354
}
14455

145-
/**
146-
* Get the updated metadata returned by the brokers along with this record set.
147-
* May be empty or partial depending on the responses from the broker during this particular poll.
148-
* May also include metadata for additional partitions than the ones for which there are records
149-
* in this {@code ConsumerRecords} object.
150-
*/
151-
public Map<TopicPartition, Metadata> metadata() {
152-
return Collections.unmodifiableMap(metadata);
153-
}
154-
15556
/**
15657
* Get just the records for the given topic
15758
*/

Diff for: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
2828
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
2929
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
30-
import org.apache.kafka.clients.consumer.internals.FetchedRecords;
3130
import org.apache.kafka.clients.consumer.internals.Fetcher;
3231
import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
3332
import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics;
@@ -1235,7 +1234,7 @@ private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetad
12351234
}
12361235
}
12371236

1238-
final FetchedRecords<K, V> records = pollForFetches(timer);
1237+
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
12391238
if (!records.isEmpty()) {
12401239
// before returning the fetched records, we can send off the next round of fetches
12411240
// and avoid block waiting for their responses to enable pipelining while the user
@@ -1269,12 +1268,12 @@ boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitFo
12691268
/**
12701269
* @throws KafkaException if the rebalance callback throws exception
12711270
*/
1272-
private FetchedRecords<K, V> pollForFetches(Timer timer) {
1271+
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
12731272
long pollTimeout = coordinator == null ? timer.remainingMs() :
12741273
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
12751274

12761275
// if data is available already, return it immediately
1277-
final FetchedRecords<K, V> records = fetcher.fetchedRecords();
1276+
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
12781277
if (!records.isEmpty()) {
12791278
return records;
12801279
}

Diff for: clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java

+1-16
Original file line numberDiff line numberDiff line change
@@ -218,21 +218,7 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
218218
}
219219

220220
toClear.forEach(p -> this.records.remove(p));
221-
222-
final Map<TopicPartition, ConsumerRecords.Metadata> metadata = new HashMap<>();
223-
for (final TopicPartition partition : subscriptions.assignedPartitions()) {
224-
if (subscriptions.hasValidPosition(partition) && endOffsets.containsKey(partition)) {
225-
final SubscriptionState.FetchPosition position = subscriptions.position(partition);
226-
final long offset = position.offset;
227-
final long endOffset = endOffsets.get(partition);
228-
metadata.put(
229-
partition,
230-
new ConsumerRecords.Metadata(System.currentTimeMillis(), offset, endOffset)
231-
);
232-
}
233-
}
234-
235-
return new ConsumerRecords<>(results, metadata);
221+
return new ConsumerRecords<>(results);
236222
}
237223

238224
public synchronized void addRecord(ConsumerRecord<K, V> record) {
@@ -243,7 +229,6 @@ public synchronized void addRecord(ConsumerRecord<K, V> record) {
243229
throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer");
244230
List<ConsumerRecord<K, V>> recs = this.records.computeIfAbsent(tp, k -> new ArrayList<>());
245231
recs.add(record);
246-
endOffsets.compute(tp, (ignore, offset) -> offset == null ? record.offset() : Math.max(offset, record.offset()));
247232
}
248233

249234
/**

Diff for: clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchedRecords.java

-102
This file was deleted.

Diff for: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

+17-28
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ public void onSuccess(ClientResponse resp) {
318318
short responseVersion = resp.requestHeader().apiVersion();
319319

320320
completedFetches.add(new CompletedFetch(partition, partitionData,
321-
metricAggregator, batches, fetchOffset, responseVersion, resp.receivedTimeMs()));
321+
metricAggregator, batches, fetchOffset, responseVersion));
322322
}
323323
}
324324

@@ -597,8 +597,8 @@ private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition
597597
* the defaultResetPolicy is NONE
598598
* @throws TopicAuthorizationException If there is TopicAuthorization error in fetchResponse.
599599
*/
600-
public FetchedRecords<K, V> fetchedRecords() {
601-
FetchedRecords<K, V> fetched = new FetchedRecords<>();
600+
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
601+
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
602602
Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque<>();
603603
int recordsRemaining = maxPollRecords;
604604

@@ -636,28 +636,20 @@ public FetchedRecords<K, V> fetchedRecords() {
636636
} else {
637637
List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineFetch, recordsRemaining);
638638

639-
TopicPartition partition = nextInLineFetch.partition;
640-
641-
// This can be false when a rebalance happened before fetched records
642-
// are returned to the consumer's poll call
643-
if (subscriptions.isAssigned(partition)) {
644-
645-
// initializeCompletedFetch, above, has already persisted the metadata from the fetch in the
646-
// SubscriptionState, so we can just read it out, which in particular lets us re-use the logic
647-
// for determining the end offset
648-
final long receivedTimestamp = nextInLineFetch.receivedTimestamp;
649-
final Long endOffset = subscriptions.logEndOffset(partition, isolationLevel);
650-
final FetchPosition fetchPosition = subscriptions.position(partition);
651-
652-
final FetchedRecords.FetchMetadata metadata =
653-
new FetchedRecords.FetchMetadata(receivedTimestamp, fetchPosition, endOffset);
654-
655-
fetched.addMetadata(partition, metadata);
656-
657-
}
658-
659639
if (!records.isEmpty()) {
660-
fetched.addRecords(partition, records);
640+
TopicPartition partition = nextInLineFetch.partition;
641+
List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
642+
if (currentRecords == null) {
643+
fetched.put(partition, records);
644+
} else {
645+
// this case shouldn't usually happen because we only send one fetch at a time per partition,
646+
// but it might conceivably happen in some rare cases (such as partition leader changes).
647+
// we have to copy to a new list because the old one may be immutable
648+
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
649+
newRecords.addAll(currentRecords);
650+
newRecords.addAll(records);
651+
fetched.put(partition, newRecords);
652+
}
661653
recordsRemaining -= records.size();
662654
}
663655
}
@@ -1466,7 +1458,6 @@ private class CompletedFetch {
14661458
private final FetchResponse.PartitionData<Records> partitionData;
14671459
private final FetchResponseMetricAggregator metricAggregator;
14681460
private final short responseVersion;
1469-
private final long receivedTimestamp;
14701461

14711462
private int recordsRead;
14721463
private int bytesRead;
@@ -1485,15 +1476,13 @@ private CompletedFetch(TopicPartition partition,
14851476
FetchResponseMetricAggregator metricAggregator,
14861477
Iterator<? extends RecordBatch> batches,
14871478
Long fetchOffset,
1488-
short responseVersion,
1489-
final long receivedTimestamp) {
1479+
short responseVersion) {
14901480
this.partition = partition;
14911481
this.partitionData = partitionData;
14921482
this.metricAggregator = metricAggregator;
14931483
this.batches = batches;
14941484
this.nextFetchOffset = fetchOffset;
14951485
this.responseVersion = responseVersion;
1496-
this.receivedTimestamp = receivedTimestamp;
14971486
this.lastEpoch = Optional.empty();
14981487
this.abortedProducerIds = new HashSet<>();
14991488
this.abortedTransactions = abortedTransactions(partitionData);

0 commit comments

Comments
 (0)