Skip to content

Commit ffed174

Browse files
committed
Merge branch 'trunk' into KAFKA-16907
2 parents 9f3dcef + 00dddee commit ffed174

File tree

46 files changed

+4621
-402
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+4621
-402
lines changed

.github/actions/run-gradle/action.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ runs:
5858
timeout ${TIMEOUT_MINUTES}m ./gradlew --build-cache --continue --no-scan \
5959
-PtestLoggingEvents=started,passed,skipped,failed \
6060
-PmaxParallelForks=2 \
61-
-PmaxTestRetries=1 -PmaxTestRetryFailures=3 \
61+
-PmaxTestRetries=1 -PmaxTestRetryFailures=10 \
6262
-PmaxQuarantineTestRetries=3 -PmaxQuarantineTestRetryFailures=0 \
6363
-Pkafka.test.catalog.file=$TEST_CATALOG \
6464
-PcommitId=xxxxxxxxxxxxxxxx \
@@ -72,4 +72,4 @@ runs:
7272
name: ${{ inputs.build-scan-artifact-name }}
7373
path: ~/.gradle/build-scan-data
7474
compression-level: 9
75-
if-no-files-found: ignore
75+
if-no-files-found: ignore

clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2656,6 +2656,7 @@ CreateAclsRequest.Builder createRequest(int timeoutMs) {
26562656

26572657
@Override
26582658
void handleResponse(AbstractResponse abstractResponse) {
2659+
handleNotControllerError(abstractResponse);
26592660
CreateAclsResponse response = (CreateAclsResponse) abstractResponse;
26602661
List<AclCreationResult> responses = response.results();
26612662
Iterator<AclCreationResult> iter = responses.iterator();
@@ -2708,6 +2709,7 @@ DeleteAclsRequest.Builder createRequest(int timeoutMs) {
27082709

27092710
@Override
27102711
void handleResponse(AbstractResponse abstractResponse) {
2712+
handleNotControllerError(abstractResponse);
27112713
DeleteAclsResponse response = (DeleteAclsResponse) abstractResponse;
27122714
List<DeleteAclsResponseData.DeleteAclsFilterResult> results = response.filterResults();
27132715
Iterator<DeleteAclsResponseData.DeleteAclsFilterResult> iter = results.iterator();
@@ -2926,6 +2928,7 @@ public IncrementalAlterConfigsRequest.Builder createRequest(int timeoutMs) {
29262928

29272929
@Override
29282930
public void handleResponse(AbstractResponse abstractResponse) {
2931+
handleNotControllerError(abstractResponse);
29292932
IncrementalAlterConfigsResponse response = (IncrementalAlterConfigsResponse) abstractResponse;
29302933
Map<ConfigResource, ApiError> errors = IncrementalAlterConfigsResponse.fromResponseData(response.data());
29312934
for (Map.Entry<ConfigResource, KafkaFutureImpl<Void>> entry : futures.entrySet()) {
@@ -4089,8 +4092,11 @@ void handleFailure(Throwable throwable) {
40894092
}
40904093

40914094
private void handleNotControllerError(AbstractResponse response) throws ApiException {
4095+
// When sending requests directly to the follower controller, it might return NOT_LEADER_OR_FOLLOWER error.
40924096
if (response.errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
40934097
handleNotControllerError(Errors.NOT_CONTROLLER);
4098+
} else if (metadataManager.usingBootstrapControllers() && response.errorCounts().containsKey(Errors.NOT_LEADER_OR_FOLLOWER)) {
4099+
handleNotControllerError(Errors.NOT_LEADER_OR_FOLLOWER);
40944100
}
40954101
}
40964102

@@ -4652,6 +4658,7 @@ DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
46524658

46534659
@Override
46544660
void handleResponse(AbstractResponse response) {
4661+
handleNotControllerError(response);
46554662
final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
46564663
if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
46574664
throw Errors.forCode(quorumResponse.data().errorCode()).exception(quorumResponse.data().errorMessage());
@@ -4849,6 +4856,7 @@ AddRaftVoterRequest.Builder createRequest(int timeoutMs) {
48494856

48504857
@Override
48514858
void handleResponse(AbstractResponse response) {
4859+
handleNotControllerError(response);
48524860
AddRaftVoterResponse addResponse = (AddRaftVoterResponse) response;
48534861
if (addResponse.data().errorCode() != Errors.NONE.code()) {
48544862
ApiError error = new ApiError(
@@ -4893,6 +4901,7 @@ RemoveRaftVoterRequest.Builder createRequest(int timeoutMs) {
48934901

48944902
@Override
48954903
void handleResponse(AbstractResponse response) {
4904+
handleNotControllerError(response);
48964905
RemoveRaftVoterResponse addResponse = (RemoveRaftVoterResponse) response;
48974906
if (addResponse.data().errorCode() != Errors.NONE.code()) {
48984907
ApiError error = new ApiError(

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
159159
/**
160160
* If the member is currently leaving the group after a call to {@link #leaveGroup()} or
161161
* {@link #leaveGroupOnClose()}, this will have a future that will complete when the ongoing leave operation
162-
* completes (callbacks executed and heartbeat request to leave is sent out). This will be empty is the
162+
* completes (callbacks executed and heartbeat request to leave is sent out). This will be empty if the
163163
* member is not leaving.
164164
*/
165165
private Optional<CompletableFuture<Void>> leaveGroupInProgress = Optional.empty();

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,6 @@ protected synchronized boolean retainTopic(String topic, boolean isInternal, lon
9494
if (isInternal && !includeInternalTopics)
9595
return false;
9696

97-
return subscription.matchesSubscribedPattern(topic);
97+
return subscription.matchesSubscribedPattern(topic) || subscription.isAssignedFromRe2j(topic);
9898
}
9999
}

0 commit comments

Comments
 (0)