Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 848: Added support for DescribeConsumerGroup for consumer protocol groups #4941

Open
wants to merge 16 commits into
base: master
Choose a base branch
from

Conversation

PratRanj07
Copy link
Contributor

This PR intends to add support for calling DescribeConsumerGroups for groups created with the new consumer protocol and add backward compatibility to describe group created with the old Classic Protocol.
The working of the API remains similar. Users will continue to call the existing DescribeConsumerGroups api but internally we will be calling the new ConsumerGroupDescribe api to fetch the results. If we get GRP_NOT_FOUND or UNSUPPORTED_VERSION error, then in that case we will call the old DescribeConsumerGroup api.

@PratRanj07 PratRanj07 requested a review from a team as a code owner December 27, 2024 11:22
@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass comments.

src/rdkafka.h Outdated
Comment on lines 9013 to 9041
/**
* @brief Gets target assignment of \p member.
*
* @param member The group member.
*
* @return The target assignment.
*
* @remark The lifetime of the returned memory is the same
* as the lifetime of the \p member object.
*/
RD_EXPORT
const rd_kafka_MemberAssignment_t *rd_kafka_MemberDescription_target_assignment(
const rd_kafka_MemberDescription_t *member);

/**
* @brief Gets target assigned partitions of a member \p assignment.
*
* @param assignment The group member assignment.
*
* @return The target assigned partitions.
*
* @remark The lifetime of the returned memory is the same
* as the lifetime of the \p assignment object.
*/
RD_EXPORT
const rd_kafka_topic_partition_list_t *
rd_kafka_MemberAssignment_target_partitions(
const rd_kafka_MemberAssignment_t *assignment);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are we doing for the old protocol on these new functions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is giving NULL as default value in these cases. I have updated it in the comments

@@ -2,6 +2,7 @@

librdkafka v2.8.0 is a maintenance release:

* DescribeConsumerGroup now supports new consumer protocol groups (#4922).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add that new fields type and target assignment is added. Target assignment is only applicable for Consumer protocol. Explain about the default values as well.

Comment on lines +179 to +180
const rd_kafka_topic_partition_list_t *target_topic_partitions =
rd_kafka_MemberAssignment_partitions(target_assignment);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will give NULL pointer exception as target_assignment is only applicable for consumer protocol. It should be NULL for classic protocol and default value should be NULL.

Comment on lines +181 to +189
if (!target_topic_partitions) {
printf(" No target assignment\n");
} else if (target_topic_partitions->cnt == 0) {
printf(" Empty target assignment\n");
} else {
printf(" Target assignment:\n");
print_partition_list(stdout, target_topic_partitions, 0,
" ");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different for classic protocol.

[RD_KAFKAP_ListTransactions] = "ListTransactions",
[RD_KAFKAP_AllocateProducerIds] = "AllocateProducerIds",
[RD_KAFKAP_ConsumerGroupHeartbeat] = "ConsumerGroupHeartbeat",
[RD_KAFKAP_ConsumerGroupDescribe] = "ConsumerGroupDescribeRequest",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better add this at the end.

*/
rd_kafka_resp_err_t
rd_kafka_ConsumerGroupDescribeRequest(rd_kafka_broker_t *rkb,
const rd_list_t *groups /*(char*)*/,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be char ** and accept count as well. Check older API.

rd_kafka_resp_cb_t *resp_cb,
void *opaque) {
rd_kafka_buf_t *rkbuf;
int16_t maxApiVersion = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this variable. Used only once.

Comment on lines 6024 to 6029
int16_t ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_ConsumerGroupDescribe, 0, maxApiVersion, NULL);
size_t ofGroupsArrayCnt;
int grp_ids_cnt = rd_list_cnt(groups);
int i, include_authorized_operations;
char *group;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int16_t ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_ConsumerGroupDescribe, 0, maxApiVersion, NULL);
size_t ofGroupsArrayCnt;
int grp_ids_cnt = rd_list_cnt(groups);
int i, include_authorized_operations;
char *group;
size_t ofGroupsArrayCnt;
int grp_ids_cnt = rd_list_cnt(groups);
int i, include_authorized_operations;
char *group;
int16_t ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_ConsumerGroupDescribe, 0, maxApiVersion, NULL);

Comment on lines 6040 to 6045
rkbuf = rd_kafka_buf_new_flexver_request(
rkb, RD_KAFKAP_ConsumerGroupDescribe, 1,
4 /* rd_kafka_buf_write_arraycnt_pos */ +
1 /* IncludeAuthorizedOperations */ + 1 /* tags */ +
32 * grp_ids_cnt /* Groups */,
rd_true /* flexver */);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alignment

Comment on lines +8565 to +8569
if (newgroupres->type == RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER &&
newgroupres->error &&
(newgroupres->error->code == RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND ||
newgroupres->error->code ==
RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for memory leak in this condition as well.

@PratRanj07 PratRanj07 requested a review from pranavrth February 12, 2025 11:38
Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still reviewing. Few comments. Check for possible SegFaults.

src->assignment.partitions);
return rd_kafka_MemberDescription_new(
src->client_id, src->consumer_id, src->group_instance_id, src->host,
src->assignment.partitions, src->target_assignment.partitions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will give a SegFault if target assignment is NULL

Comment on lines +7808 to 7811
if (member->target_assignment.partitions)
rd_kafka_topic_partition_list_destroy(
member->target_assignment.partitions);
rd_free(member);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same changes of SegFault.

Comment on lines +8129 to +8132
RD_LIST_FOREACH(group, groups, i) {
groups_arr[i] = rd_list_elem(groups, i);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this. You can directly use groups->rl_elems

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants