Skip to content

Commit f2e60bf

Browse files
committed
Merge branch 'trunk' into KAFKA-16907
2 parents ffed174 + 102de21 commit f2e60bf

File tree

75 files changed

+4379
-1479
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+4379
-1479
lines changed

build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -3297,6 +3297,7 @@ project(':jmh-benchmarks') {
32973297
implementation project(':clients').sourceSets.test.output
32983298
implementation project(':core').sourceSets.test.output
32993299
implementation project(':server-common').sourceSets.test.output
3300+
implementation project(':metadata').sourceSets.test.output
33003301

33013302
implementation libs.jmhCore
33023303
annotationProcessor libs.jmhGeneratorAnnProcess

checkstyle/import-control-group-coordinator.xml

+1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
<allow pkg="org.apache.kafka.image"/>
6666
<allow pkg="org.apache.kafka.server.common"/>
6767
<allow pkg="org.apache.kafka.server.record"/>
68+
<allow pkg="org.apache.kafka.server.share.persister"/>
6869
<allow pkg="org.apache.kafka.server.util"/>
6970
<allow pkg="org.apache.kafka.storage.internals.log"/>
7071
<allow pkg="org.apache.kafka.test" />

checkstyle/suppressions.xml

+2
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,8 @@
347347
files="(ShareCoordinatorServiceTest|DefaultStatePersisterTest|PersisterStateManagerTest).java"/>
348348
<suppress checks="CyclomaticComplexity"
349349
files="ShareCoordinatorShard.java"/>
350+
<suppress checks="ClassFanOutComplexity"
351+
files="(PersisterStateManagerTest|DefaultStatePersisterTest).java"/>
350352

351353
<!-- storage -->
352354
<suppress checks="CyclomaticComplexity"

clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.kafka.clients.admin.internals.FenceProducersHandler;
5858
import org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler;
5959
import org.apache.kafka.clients.admin.internals.ListOffsetsHandler;
60+
import org.apache.kafka.clients.admin.internals.ListShareGroupOffsetsHandler;
6061
import org.apache.kafka.clients.admin.internals.ListTransactionsHandler;
6162
import org.apache.kafka.clients.admin.internals.PartitionLeaderStrategy;
6263
import org.apache.kafka.clients.admin.internals.RemoveMembersFromConsumerGroupHandler;
@@ -3799,12 +3800,13 @@ public DescribeShareGroupsResult describeShareGroups(final Collection<String> gr
37993800
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
38003801
}
38013802

3802-
// To do in a follow-up PR
38033803
@Override
38043804
public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String, ListShareGroupOffsetsSpec> groupSpecs,
38053805
final ListShareGroupOffsetsOptions options) {
3806-
// To-do
3807-
throw new InvalidRequestException("The method is not yet implemented");
3806+
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Long>> future = ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet());
3807+
ListShareGroupOffsetsHandler handler = new ListShareGroupOffsetsHandler(groupSpecs, logContext);
3808+
invokeDriver(handler, future, options.timeoutMs);
3809+
return new ListShareGroupOffsetsResult(future.all());
38083810
}
38093811

38103812
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.admin.internals;
18+
19+
import org.apache.kafka.clients.admin.KafkaAdminClient;
20+
import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
21+
import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
22+
import org.apache.kafka.common.Node;
23+
import org.apache.kafka.common.TopicPartition;
24+
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
25+
import org.apache.kafka.common.protocol.Errors;
26+
import org.apache.kafka.common.requests.AbstractResponse;
27+
import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
28+
import org.apache.kafka.common.requests.DescribeShareGroupOffsetsResponse;
29+
import org.apache.kafka.common.requests.FindCoordinatorRequest;
30+
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
31+
import org.apache.kafka.common.utils.LogContext;
32+
33+
import org.slf4j.Logger;
34+
35+
import java.util.Collection;
36+
import java.util.Collections;
37+
import java.util.HashMap;
38+
import java.util.List;
39+
import java.util.Map;
40+
import java.util.Set;
41+
import java.util.stream.Collectors;
42+
43+
/**
44+
* This class is the handler for {@link KafkaAdminClient#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call
45+
*/
46+
public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Long>> {
47+
48+
private final Map<String, ListShareGroupOffsetsSpec> groupSpecs;
49+
private final Logger log;
50+
private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
51+
52+
public ListShareGroupOffsetsHandler(
53+
Map<String, ListShareGroupOffsetsSpec> groupSpecs,
54+
LogContext logContext) {
55+
this.groupSpecs = groupSpecs;
56+
this.log = logContext.logger(ListShareGroupOffsetsHandler.class);
57+
this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext);
58+
}
59+
60+
public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Long>> newFuture(Collection<String> groupIds) {
61+
return AdminApiFuture.forKeys(coordinatorKeys(groupIds));
62+
}
63+
64+
@Override
65+
public String apiName() {
66+
return "describeShareGroupOffsets";
67+
}
68+
69+
@Override
70+
public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
71+
return lookupStrategy;
72+
}
73+
74+
@Override
75+
public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int coordinatorId, Set<CoordinatorKey> keys) {
76+
List<String> groupIds = keys.stream().map(key -> {
77+
if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
78+
throw new IllegalArgumentException("Invalid group coordinator key " + key +
79+
" when building `DescribeShareGroupOffsets` request");
80+
}
81+
return key.idValue;
82+
}).collect(Collectors.toList());
83+
// The DescribeShareGroupOffsetsRequest only includes a single group ID at this point, which is likely a mistake to be fixing a follow-on PR.
84+
String groupId = groupIds.isEmpty() ? null : groupIds.get(0);
85+
if (groupId == null) {
86+
throw new IllegalArgumentException("Missing group id in request");
87+
}
88+
ListShareGroupOffsetsSpec spec = groupSpecs.get(groupId);
89+
List<DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic> topics =
90+
spec.topicPartitions().stream().map(
91+
topicPartition -> new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
92+
.setTopicName(topicPartition.topic())
93+
.setPartitions(List.of(topicPartition.partition()))
94+
).collect(Collectors.toList());
95+
DescribeShareGroupOffsetsRequestData data = new DescribeShareGroupOffsetsRequestData()
96+
.setGroupId(groupId)
97+
.setTopics(topics);
98+
return new DescribeShareGroupOffsetsRequest.Builder(data, true);
99+
}
100+
101+
@Override
102+
public ApiResult<CoordinatorKey, Map<TopicPartition, Long>> handleResponse(Node coordinator,
103+
Set<CoordinatorKey> groupIds,
104+
AbstractResponse abstractResponse) {
105+
final DescribeShareGroupOffsetsResponse response = (DescribeShareGroupOffsetsResponse) abstractResponse;
106+
final Map<CoordinatorKey, Map<TopicPartition, Long>> completed = new HashMap<>();
107+
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
108+
109+
for (CoordinatorKey groupId : groupIds) {
110+
Map<TopicPartition, Long> data = new HashMap<>();
111+
response.data().responses().stream().map(
112+
describedTopic ->
113+
describedTopic.partitions().stream().map(
114+
partition -> {
115+
if (partition.errorCode() == Errors.NONE.code())
116+
data.put(new TopicPartition(describedTopic.topicName(), partition.partitionIndex()), partition.startOffset());
117+
else
118+
log.error("Skipping return offset for topic {} partition {} due to error {}.", describedTopic.topicName(), partition.partitionIndex(), Errors.forCode(partition.errorCode()));
119+
return data;
120+
}
121+
).collect(Collectors.toList())
122+
).collect(Collectors.toList());
123+
completed.put(groupId, data);
124+
}
125+
return new ApiResult<>(completed, failed, Collections.emptyList());
126+
}
127+
128+
private static Set<CoordinatorKey> coordinatorKeys(Collection<String> groupIds) {
129+
return groupIds.stream()
130+
.map(CoordinatorKey::byGroupId)
131+
.collect(Collectors.toSet());
132+
}
133+
}

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

+7
Original file line numberDiff line numberDiff line change
@@ -1782,6 +1782,13 @@ public void close() {
17821782
* timeout. If the consumer is unable to complete offset commits and gracefully leave the group
17831783
* before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
17841784
* used to interrupt close.
1785+
* <p>
1786+
* The actual maximum wait time is bounded by the {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} setting, which
1787+
* only applies to operations performed with the broker (coordinator-related requests and
1788+
* fetch sessions). Even if a larger timeout is specified, the consumer will not wait longer than
1789+
* {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} for these requests to complete during the close operation.
1790+
* Note that the execution time of callbacks (such as {@link OffsetCommitCallback} and
1791+
* {@link ConsumerRebalanceListener}) does not consume time from the close timeout.
17851792
*
17861793
* @param timeout The maximum time to wait for consumer to close gracefully. The value must be
17871794
* non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java

+21-14
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
246246
private final ConsumerMetadata metadata;
247247
private final Metrics metrics;
248248
private final long retryBackoffMs;
249+
private final int requestTimeoutMs;
249250
private final Duration defaultApiTimeoutMs;
250251
private final boolean autoCommitEnabled;
251252
private volatile boolean closed = false;
@@ -324,6 +325,7 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
324325
this.metrics = createMetrics(config, time, reporters);
325326
this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics);
326327
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
328+
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
327329

328330
List<ConsumerInterceptor<K, V>> interceptorList = configuredConsumerInterceptors(config);
329331
this.interceptors = new ConsumerInterceptors<>(interceptorList, metrics);
@@ -447,6 +449,7 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
447449
SubscriptionState subscriptions,
448450
ConsumerMetadata metadata,
449451
long retryBackoffMs,
452+
int requestTimeoutMs,
450453
int defaultApiTimeoutMs,
451454
String groupId,
452455
boolean autoCommitEnabled) {
@@ -466,6 +469,7 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
466469
this.groupMetadata.set(initializeGroupMetadata(groupId, Optional.empty()));
467470
this.metadata = metadata;
468471
this.retryBackoffMs = retryBackoffMs;
472+
this.requestTimeoutMs = requestTimeoutMs;
469473
this.defaultApiTimeoutMs = Duration.ofMillis(defaultApiTimeoutMs);
470474
this.deserializers = deserializers;
471475
this.applicationEventHandler = applicationEventHandler;
@@ -499,6 +503,7 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
499503
this.interceptors = new ConsumerInterceptors<>(Collections.emptyList(), metrics);
500504
this.metadata = metadata;
501505
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
506+
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
502507
this.defaultApiTimeoutMs = Duration.ofMillis(config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG));
503508
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer, metrics);
504509
this.clientTelemetryReporter = Optional.empty();
@@ -1326,7 +1331,7 @@ private void close(Duration timeout, boolean swallowException) {
13261331
// We are already closing with a timeout, don't allow wake-ups from here on.
13271332
wakeupTrigger.disableWakeups();
13281333

1329-
final Timer closeTimer = time.timer(timeout);
1334+
final Timer closeTimer = createTimerForCloseRequests(timeout);
13301335
clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
13311336
closeTimer.update();
13321337
// Prepare shutting down the network thread
@@ -1337,7 +1342,7 @@ private void close(Duration timeout, boolean swallowException) {
13371342
swallow(log, Level.ERROR, "Failed to stop finding coordinator",
13381343
this::stopFindCoordinatorOnClose, firstException);
13391344
swallow(log, Level.ERROR, "Failed to release group assignment",
1340-
() -> runRebalanceCallbacksOnClose(closeTimer), firstException);
1345+
this::runRebalanceCallbacksOnClose, firstException);
13411346
swallow(log, Level.ERROR, "Failed to leave group while closing consumer",
13421347
() -> leaveGroupOnClose(closeTimer), firstException);
13431348
swallow(log, Level.ERROR, "Failed invoking asynchronous commit callbacks while closing consumer",
@@ -1368,6 +1373,12 @@ private void close(Duration timeout, boolean swallowException) {
13681373
}
13691374
}
13701375

1376+
private Timer createTimerForCloseRequests(Duration timeout) {
1377+
// this.time could be null if an exception occurs in constructor prior to setting the this.time field
1378+
final Time time = (this.time == null) ? Time.SYSTEM : this.time;
1379+
return time.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
1380+
}
1381+
13711382
private void autoCommitOnClose(final Timer timer) {
13721383
if (groupMetadata.get().isEmpty())
13731384
return;
@@ -1378,7 +1389,7 @@ private void autoCommitOnClose(final Timer timer) {
13781389
applicationEventHandler.add(new CommitOnCloseEvent());
13791390
}
13801391

1381-
private void runRebalanceCallbacksOnClose(final Timer timer) {
1392+
private void runRebalanceCallbacksOnClose() {
13821393
if (groupMetadata.get().isEmpty())
13831394
return;
13841395

@@ -1393,19 +1404,15 @@ private void runRebalanceCallbacksOnClose(final Timer timer) {
13931404
SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
13941405
droppedPartitions.addAll(assignedPartitions);
13951406

1396-
try {
1397-
final Exception error;
1407+
final Exception error;
13981408

1399-
if (memberEpoch > 0)
1400-
error = rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions);
1401-
else
1402-
error = rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions);
1409+
if (memberEpoch > 0)
1410+
error = rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions);
1411+
else
1412+
error = rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions);
14031413

1404-
if (error != null)
1405-
throw ConsumerUtils.maybeWrapAsKafkaException(error);
1406-
} finally {
1407-
timer.update();
1408-
}
1414+
if (error != null)
1415+
throw ConsumerUtils.maybeWrapAsKafkaException(error);
14091416
}
14101417

14111418
private void leaveGroupOnClose(final Timer timer) {

clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateRequest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public static class Builder extends AbstractRequest.Builder<DeleteShareGroupStat
3434
private final DeleteShareGroupStateRequestData data;
3535

3636
public Builder(DeleteShareGroupStateRequestData data) {
37-
this(data, false);
37+
this(data, true);
3838
}
3939

4040
public Builder(DeleteShareGroupStateRequestData data, boolean enableUnstableLastVersion) {

clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsRequest.java

+20
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,24 @@ public static DescribeShareGroupOffsetsRequest parse(ByteBuffer buffer, short ve
8686
version
8787
);
8888
}
89+
90+
public static List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic> getErrorDescribeShareGroupOffsets(
91+
List<DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic> topics,
92+
Errors error
93+
) {
94+
return topics.stream()
95+
.map(
96+
requestTopic -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
97+
.setTopicName(requestTopic.topicName())
98+
.setPartitions(
99+
requestTopic.partitions().stream().map(
100+
partition -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
101+
.setPartitionIndex(partition)
102+
.setErrorCode(error.code())
103+
.setErrorMessage(error.message())
104+
.setStartOffset(0)
105+
).collect(Collectors.toList())
106+
)
107+
).collect(Collectors.toList());
108+
}
89109
}

clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java

+26
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import org.apache.kafka.common.config.ConfigException;
2020

2121
import org.junit.jupiter.api.Test;
22+
import org.junit.jupiter.params.ParameterizedTest;
23+
import org.junit.jupiter.params.provider.MethodSource;
2224
import org.mockito.MockedConstruction;
2325
import org.mockito.MockedStatic;
2426

@@ -27,8 +29,10 @@
2729
import java.net.UnknownHostException;
2830
import java.util.Collections;
2931
import java.util.List;
32+
import java.util.stream.Stream;
3033

3134
import static java.util.Arrays.asList;
35+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
3236
import static org.junit.jupiter.api.Assertions.assertEquals;
3337
import static org.junit.jupiter.api.Assertions.assertFalse;
3438
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -95,6 +99,28 @@ public void testParseAndValidateAddressesWithReverseLookup() {
9599
}
96100
}
97101

102+
@Test
103+
public void testValidBrokerAddress() {
104+
List<String> validBrokerAddress = List.of("localhost:9997", "localhost:9998", "localhost:9999");
105+
assertDoesNotThrow(() -> ClientUtils.parseAndValidateAddresses(validBrokerAddress, ClientDnsLookup.USE_ALL_DNS_IPS));
106+
}
107+
108+
static Stream<List<String>> provideInvalidBrokerAddressTestCases() {
109+
return Stream.of(
110+
List.of("localhost:9997\nlocalhost:9998\nlocalhost:9999"),
111+
List.of("localhost:9997", "localhost:9998", " localhost:9999"),
112+
// Intentionally provide a single string, as users may provide space-separated brokers, which will be parsed as a single string.
113+
List.of("localhost:9997 localhost:9998 localhost:9999")
114+
);
115+
}
116+
117+
@ParameterizedTest
118+
@MethodSource("provideInvalidBrokerAddressTestCases")
119+
public void testInvalidBrokerAddress(List<String> addresses) {
120+
assertThrows(ConfigException.class,
121+
() -> ClientUtils.parseAndValidateAddresses(addresses, ClientDnsLookup.USE_ALL_DNS_IPS));
122+
}
123+
98124
@Test
99125
public void testInvalidConfig() {
100126
assertThrows(IllegalArgumentException.class,

0 commit comments

Comments
 (0)