Skip to content

Commit

Permalink
KAFKA-18764: Throttle on share state RPCs.
Browse files Browse the repository at this point in the history
  • Loading branch information
smjn committed Feb 10, 2025
1 parent e53af1a commit f97b0be
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -110,4 +112,16 @@ public static DeleteShareGroupStateResponseData toErrorResponseData(Uuid topicId
.setErrorCode(error.code())
.setErrorMessage(errorMessage)))));
}

public static DeleteShareGroupStateResponseData toGlobalErrorResponse(DeleteShareGroupStateRequestData request, Errors error) {
List<DeleteShareGroupStateResponseData.DeleteStateResult> deleteStateResults = new ArrayList<>();
request.topics().forEach(topicData -> {
List<DeleteShareGroupStateResponseData.PartitionResult> partitionResults = new ArrayList<>();
topicData.partitions().forEach(partitionData -> partitionResults.add(
toErrorResponsePartitionResult(partitionData.partition(), error, error.message()))
);
deleteStateResults.add(toResponseDeleteStateResult(topicData.topicId(), partitionResults));
});
return new DeleteShareGroupStateResponseData().setResults(deleteStateResults);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -112,4 +114,16 @@ public static ReadShareGroupStateResponseData.ReadStateResult toResponseReadStat
.setTopicId(topicId)
.setPartitions(partitionResults);
}

public static ReadShareGroupStateResponseData toGlobalErrorResponse(ReadShareGroupStateRequestData request, Errors error) {
List<ReadShareGroupStateResponseData.ReadStateResult> readStateResults = new ArrayList<>();
request.topics().forEach(topicData -> {
List<ReadShareGroupStateResponseData.PartitionResult> partitionResults = new ArrayList<>();
topicData.partitions().forEach(partitionData -> partitionResults.add(
toErrorResponsePartitionResult(partitionData.partition(), error, error.message()))
);
readStateResults.add(toResponseReadStateResult(topicData.topicId(), partitionResults));
});
return new ReadShareGroupStateResponseData().setResults(readStateResults);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -121,4 +123,16 @@ public static ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult toRe
.setTopicId(topicId)
.setPartitions(partitionResults);
}

public static ReadShareGroupStateSummaryResponseData toGlobalErrorResponse(ReadShareGroupStateSummaryRequestData request, Errors error) {
List<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> readStateSummaryResults = new ArrayList<>();
request.topics().forEach(topicData -> {
List<ReadShareGroupStateSummaryResponseData.PartitionResult> partitionResults = new ArrayList<>();
topicData.partitions().forEach(partitionData -> partitionResults.add(
toErrorResponsePartitionResult(partitionData.partition(), error, error.message()))
);
readStateSummaryResults.add(toResponseReadStateSummaryResult(topicData.topicId(), partitionResults));
});
return new ReadShareGroupStateSummaryResponseData().setResults(readStateSummaryResults);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -107,4 +109,16 @@ public static WriteShareGroupStateResponseData.PartitionResult toResponsePartiti
return new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId);
}

public static WriteShareGroupStateResponseData toGlobalErrorResponse(WriteShareGroupStateRequestData request, Errors error) {
List<WriteShareGroupStateResponseData.WriteStateResult> writeStateResults = new ArrayList<>();
request.topics().forEach(topicData -> {
List<WriteShareGroupStateResponseData.PartitionResult> partitionResults = new ArrayList<>();
topicData.partitions().forEach(partitionData -> partitionResults.add(
toErrorResponsePartitionResult(partitionData.partition(), error, error.message()))
);
writeStateResults.add(toResponseWriteStateResult(topicData.topicId(), partitionResults));
});
return new WriteShareGroupStateResponseData().setResults(writeStateResults);
}
}
47 changes: 41 additions & 6 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3109,7 +3109,14 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleReadShareGroupStateRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val readShareGroupStateRequest = request.body[ReadShareGroupStateRequest]

authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
if (!authorizeClusterOperation(request, CLUSTER_ACTION)) {
requestHelper.sendMaybeThrottle(request, new ReadShareGroupStateResponse(
ReadShareGroupStateResponse.toGlobalErrorResponse(
readShareGroupStateRequest.data(),
Errors.CLUSTER_AUTHORIZATION_FAILED
)))
return CompletableFuture.completedFuture[Unit](())
}

shareCoordinator match {
case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
Expand All @@ -3130,7 +3137,14 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleWriteShareGroupStateRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val writeShareRequest = request.body[WriteShareGroupStateRequest]

authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
if (!authorizeClusterOperation(request, CLUSTER_ACTION)) {
requestHelper.sendMaybeThrottle(request, new WriteShareGroupStateResponse(
WriteShareGroupStateResponse.toGlobalErrorResponse(
writeShareRequest.data(),
Errors.CLUSTER_AUTHORIZATION_FAILED
)))
return CompletableFuture.completedFuture[Unit](())
}

shareCoordinator match {
case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
Expand All @@ -3148,14 +3162,23 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

def handleDeleteShareGroupStateRequest(request: RequestChannel.Request): Unit = {
def handleDeleteShareGroupStateRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val deleteShareGroupStateRequest = request.body[DeleteShareGroupStateRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)

if (!authorizeClusterOperation(request, CLUSTER_ACTION)) {
requestHelper.sendMaybeThrottle(request, new DeleteShareGroupStateResponse(
DeleteShareGroupStateResponse.toGlobalErrorResponse(
deleteShareGroupStateRequest.data(),
Errors.CLUSTER_AUTHORIZATION_FAILED
)))
return CompletableFuture.completedFuture[Unit](())
}

shareCoordinator match {
case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
deleteShareGroupStateRequest.getErrorResponse(requestThrottleMs,
new ApiException("Share coordinator is not enabled.")))
CompletableFuture.completedFuture[Unit](())

case Some(coordinator) => coordinator.deleteState(request.context, deleteShareGroupStateRequest.data)
.handle[Unit] { (response, exception) =>
Expand All @@ -3168,9 +3191,17 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

def handleReadShareGroupStateSummaryRequest(request: RequestChannel.Request): Unit = {
def handleReadShareGroupStateSummaryRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val readShareGroupStateSummaryRequest = request.body[ReadShareGroupStateSummaryRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)

if (!authorizeClusterOperation(request, CLUSTER_ACTION)) {
requestHelper.sendMaybeThrottle(request, new ReadShareGroupStateSummaryResponse(
ReadShareGroupStateSummaryResponse.toGlobalErrorResponse(
readShareGroupStateSummaryRequest.data(),
Errors.CLUSTER_AUTHORIZATION_FAILED
)))
return CompletableFuture.completedFuture[Unit](())
}

shareCoordinator match {
case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
Expand Down Expand Up @@ -3482,6 +3513,10 @@ class KafkaApis(val requestChannel: RequestChannel,
}
request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes
}

def authorizeClusterOperation(request: RequestChannel.Request, operation: AclOperation): Boolean = {
authHelper.authorize(request.context, operation, CLUSTER, CLUSTER_NAME)
}
}

object KafkaApis {
Expand Down

0 comments on commit f97b0be

Please sign in to comment.