Skip to content

Commit fa7626c

Browse files
authored
Implement DeleteConsumerGroupOffsets (confluentinc#1924)
* Initial commit * Fix implementation for DeleteConsumerOffsets API - Also added integration tests * Fix typos * Fix according to the comments * More fixes * Some more changes 1. Return one DeleteConsumerGroupOffsetResult rather than a list. 2. Throw exception if error occurs in one partition as well. 3. Other comments in the PR. * Update test
1 parent 0599b9c commit fa7626c

13 files changed

+647
-57
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright 2022 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
using System.Linq;
19+
using System.Collections.Generic;
20+
21+
22+
namespace Confluent.Kafka.Admin
23+
{
24+
/// <summary>
25+
/// Represents an error that occured during a delete consumer group offset request.
26+
/// </summary>
27+
public class DeleteConsumerGroupOffsetsException : KafkaException
28+
{
29+
/// <summary>
30+
/// Initializes a new instance of DeleteConsumerGroupOffsetsException.
31+
/// </summary>
32+
/// <param name="result">
33+
/// The result corresponding to all the delete consumer group offsets
34+
/// operations in the request (whether or not they were in error).
35+
/// At least one of these results will be in error.
36+
/// </param>
37+
public DeleteConsumerGroupOffsetsException(DeleteConsumerGroupOffsetsReport result)
38+
: base(new Error(ErrorCode.Local_Partial,
39+
"An error occurred deleting consumer group offset: [" + result.Group +
40+
"]: [" + String.Join(", ", result.Error.IsError ? new[] { result.Error } : result.Partitions.Where(r => r.Error.IsError).Select(r => r.Error)) +
41+
"]."))
42+
{
43+
Result = result;
44+
}
45+
46+
/// <summary>
47+
/// The result corresponding to the delete consumer group offsets
48+
/// operation in the request.
49+
/// </summary>
50+
public DeleteConsumerGroupOffsetsReport Result { get; }
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2022 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
19+
20+
namespace Confluent.Kafka.Admin
21+
{
22+
/// <summary>
23+
/// Options for the DeleteGroups method.
24+
/// </summary>
25+
public class DeleteConsumerGroupOffsetsOptions
26+
{
27+
/// <summary>
28+
/// The overall request timeout, including broker lookup, request
29+
/// transmission, operation time on broker, and response. If set
30+
/// to null, the default request timeout for the AdminClient will
31+
/// be used.
32+
///
33+
/// Default: null
34+
/// </summary>
35+
public TimeSpan? RequestTimeout { get; set; }
36+
37+
/// <summary>
38+
/// The broker's operation timeout - the maximum time to wait for
39+
/// DeleteConsumerGroupOffsetsAsync before returning a result to the application.
40+
/// If set to null, will return immediately upon triggering record
41+
/// deletion.
42+
///
43+
/// Default: null
44+
/// </summary>
45+
public TimeSpan? OperationTimeout { get; set; }
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright 2022 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System.Collections.Generic;
18+
19+
20+
namespace Confluent.Kafka.Admin
21+
{
22+
/// <summary>
23+
/// The result of delete consumer group offset request
24+
/// (including error status).
25+
/// </summary>
26+
public class DeleteConsumerGroupOffsetsReport
27+
{
28+
/// <summary>
29+
/// Consumer group id.
30+
/// </summary>
31+
public string Group { get; set; }
32+
33+
/// <summary>
34+
/// Partitions for which the offsets were reset for.
35+
/// </summary>
36+
public List<TopicPartitionOffsetError> Partitions { get; set; }
37+
38+
/// <summary>
39+
/// Error status.
40+
/// </summary>
41+
public Error Error { get; set; }
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright 2022 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System.Collections.Generic;
18+
19+
20+
namespace Confluent.Kafka.Admin
21+
{
22+
/// <summary>
23+
/// The result of delete consumer group offset request
24+
/// (including error status).
25+
/// </summary>
26+
public class DeleteConsumerGroupOffsetsResult
27+
{
28+
/// <summary>
29+
/// Consumer group id.
30+
/// </summary>
31+
public string Group { get; set; }
32+
33+
/// <summary>
34+
/// Partitions for which the offsets were reset for.
35+
/// </summary>
36+
public List<TopicPartition> Partitions { get; set; }
37+
38+
internal Error Error { get; set; }
39+
}
40+
}

src/Confluent.Kafka/AdminClient.cs

+64
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,23 @@ private List<DeleteAclsReport> extractDeleteAclsReports(IntPtr resultPtr)
207207
}).ToList();
208208
}
209209

210+
private DeleteConsumerGroupOffsetsReport extractDeleteConsumerGroupOffsetsReports(IntPtr resultPtr)
211+
{
212+
IntPtr groupsOffsetsResultPtr = Librdkafka.DeleteConsumerGroupOffsets_result_groups(resultPtr, out UIntPtr resultCount);
213+
int groupsOffsetsResultCount = (int)resultCount;
214+
IntPtr[] groupsOffsetsResultArr = new IntPtr[groupsOffsetsResultCount];
215+
Marshal.Copy(groupsOffsetsResultPtr, groupsOffsetsResultArr, 0, groupsOffsetsResultCount);
216+
217+
return new DeleteConsumerGroupOffsetsReport
218+
{
219+
Group = PtrToStringUTF8(Librdkafka.group_result_name(groupsOffsetsResultArr[0])),
220+
Error = new Error(Librdkafka.group_result_error(groupsOffsetsResultArr[0]), false),
221+
Partitions = SafeKafkaHandle.GetTopicPartitionOffsetErrorList(Librdkafka.group_result_partitions(groupsOffsetsResultArr[0]))
222+
.Select(a => new TopicPartitionOffsetError(a.Topic, a.Partition, a.Offset, a.Error))
223+
.ToList()
224+
};
225+
}
226+
210227
private Task StartPollTask(CancellationToken ct)
211228
=> Task.Factory.StartNew(() =>
212229
{
@@ -445,6 +462,39 @@ private Task StartPollTask(CancellationToken ct)
445462
}
446463
}
447464
break;
465+
466+
case Librdkafka.EventType.DeleteConsumerGroupOffsets_Result:
467+
{
468+
if (errorCode != ErrorCode.NoError)
469+
{
470+
Task.Run(() =>
471+
((TaskCompletionSource<DeleteConsumerGroupOffsetsResult>)adminClientResult).TrySetException(
472+
new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr))));
473+
break;
474+
}
475+
476+
var result = extractDeleteConsumerGroupOffsetsReports(eventPtr);
477+
478+
if (result.Error.IsError || result.Partitions.Any(r => r.Error.IsError))
479+
{
480+
Task.Run(() =>
481+
((TaskCompletionSource<DeleteConsumerGroupOffsetsResult>)adminClientResult).TrySetException(
482+
new DeleteConsumerGroupOffsetsException(result)));
483+
}
484+
else
485+
{
486+
Task.Run(() =>
487+
((TaskCompletionSource<DeleteConsumerGroupOffsetsResult>)adminClientResult).TrySetResult(
488+
new DeleteConsumerGroupOffsetsResult
489+
{
490+
Group = result.Group,
491+
Partitions = result.Partitions.Select(r => new TopicPartition(r.Topic, r.Partition)).ToList(),
492+
Error = result.Error // internal, not exposed in success case.
493+
}));
494+
}
495+
}
496+
break;
497+
448498
case Librdkafka.EventType.CreateAcls_Result:
449499
{
450500
if (errorCode != ErrorCode.NoError)
@@ -567,6 +617,7 @@ private Task StartPollTask(CancellationToken ct)
567617
{ Librdkafka.EventType.AlterConfigs_Result, typeof(TaskCompletionSource<List<AlterConfigsReport>>) },
568618
{ Librdkafka.EventType.CreatePartitions_Result, typeof(TaskCompletionSource<List<CreatePartitionsReport>>) },
569619
{ Librdkafka.EventType.DeleteRecords_Result, typeof(TaskCompletionSource<List<DeleteRecordsResult>>) },
620+
{ Librdkafka.EventType.DeleteConsumerGroupOffsets_Result, typeof(TaskCompletionSource<DeleteConsumerGroupOffsetsResult>) },
570621
{ Librdkafka.EventType.DeleteGroups_Result, typeof(TaskCompletionSource<List<DeleteGroupReport>>) },
571622
{ Librdkafka.EventType.CreateAcls_Result, typeof(TaskCompletionSource<Null>) },
572623
{ Librdkafka.EventType.DescribeAcls_Result, typeof(TaskCompletionSource<DescribeAclsResult>) },
@@ -657,6 +708,19 @@ public Task DeleteGroupsAsync(IList<string> groups, DeleteGroupsOptions options
657708
return completionSource.Task;
658709
}
659710

711+
/// <summary>
712+
/// Refer to <see cref="Confluent.Kafka.IAdminClient.DeleteConsumerGroupOffsetsAsync(String, IEnumerable{TopicPartition}, DeleteConsumerGroupOffsetsOptions)" />
713+
/// </summary>
714+
public Task<DeleteConsumerGroupOffsetsResult> DeleteConsumerGroupOffsetsAsync(String group, IEnumerable<TopicPartition> partitions, DeleteConsumerGroupOffsetsOptions options = null)
715+
{
716+
var completionSource = new TaskCompletionSource<DeleteConsumerGroupOffsetsResult>();
717+
var gch = GCHandle.Alloc(completionSource);
718+
Handle.LibrdkafkaHandle.DeleteConsumerGroupOffsets(
719+
group, partitions, options, resultQueue,
720+
GCHandle.ToIntPtr(gch));
721+
return completionSource.Task;
722+
}
723+
660724
/// <summary>
661725
/// Refer to <see cref="Confluent.Kafka.IAdminClient.CreatePartitionsAsync(IEnumerable{PartitionsSpecification}, CreatePartitionsOptions)" />
662726
/// </summary>

src/Confluent.Kafka/IAdminClient.cs

+20-1
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ Task CreatePartitionsAsync(
227227
/// The result of the delete records request.
228228
/// </returns>
229229
Task<List<DeleteRecordsResult>> DeleteRecordsAsync(IEnumerable<TopicPartitionOffset> topicPartitionOffsets, DeleteRecordsOptions options = null);
230-
230+
231231
/// <summary>
232232
/// Creates one or more ACL bindings.
233233
/// </summary>
@@ -324,6 +324,25 @@ Task CreatePartitionsAsync(
324324
/// </returns>
325325
Task<List<DeleteAclsResult>> DeleteAclsAsync(IEnumerable<AclBindingFilter> aclBindingFilters, DeleteAclsOptions options = null);
326326

327+
/// <summary>
328+
/// Delete committed offsets for a set of partitions in a consumer
329+
/// group. This will succeed at the partition level only if the group
330+
/// is not actively subscribed to the corresponding topic.
331+
/// </summary>
332+
/// <param name="group">
333+
/// Consumer group id
334+
/// </param>
335+
/// <param name="partitions">
336+
/// Enumerable of topic partitions to delete committed offsets for.
337+
/// </param>
338+
/// <param name="options">
339+
/// The options to use when deleting the committed offset.
340+
/// </param>
341+
/// <returns>
342+
/// A Task returning <see cref="Confluent.Kafka.Admin.DeleteConsumerGroupOffsetsResult"/>.
343+
/// </returns>
344+
Task<DeleteConsumerGroupOffsetsResult> DeleteConsumerGroupOffsetsAsync(String group, IEnumerable<TopicPartition> partitions, DeleteConsumerGroupOffsetsOptions options = null);
345+
327346
}
328347

329348
}

0 commit comments

Comments
 (0)