From 52f4fba1d2b58e2e44cd8275fb0cb97cb9f4edd1 Mon Sep 17 00:00:00 2001 From: nachogiljaldo Date: Wed, 3 Jul 2024 00:06:21 +0200 Subject: [PATCH 1/3] Add a listener for changes in partitions assigned to the current consumer. --- reader.go | 20 ++++++++++++++++++++ reader_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/reader.go b/reader.go index 04d90f355..f9521461c 100644 --- a/reader.go +++ b/reader.go @@ -304,6 +304,23 @@ func (r *Reader) run(cg *ConsumerGroup) { for attempt := 1; attempt <= r.config.MaxAttempts; attempt++ { gen, err = cg.Next(r.stctx) if err == nil { + if r.config.AssignmentListener != nil { + assignments := make([]GroupMemberTopic, 0, len(gen.Assignments)) + for topic, partitions := range gen.Assignments { + assignedPartitions := make([]int, 0, len(partitions)) + for _, partition := range partitions { + assignedPartitions = append(assignedPartitions, partition.ID) + } + sort.Slice(assignedPartitions, func(i, j int) bool { + return assignedPartitions[i] < assignedPartitions[j] + }) + assignments = append(assignments, GroupMemberTopic{ + Topic: topic, + Partitions: assignedPartitions, + }) + } + r.config.AssignmentListener(assignments) + } break } if errors.Is(err, r.stctx.Err()) { @@ -522,6 +539,9 @@ type ReaderConfig struct { // This flag is being added to retain backwards-compatibility, so it will be // removed in a future version of kafka-go. OffsetOutOfRangeError bool + + // AsignmentListener is called when a reassignment happens indicating what are the new partitions + AssignmentListener func(partitions []GroupMemberTopic) } // Validate method validates ReaderConfig properties. diff --git a/reader_test.go b/reader_test.go index f413d7429..47893907e 100644 --- a/reader_test.go +++ b/reader_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -890,6 +891,49 @@ func TestReaderConsumerGroup(t *testing.T) { } } +func TestAssignmentListener(t *testing.T) { + // It appears that some of the tests depend on all these tests being + // run concurrently to pass... this is brittle and should be fixed + // at some point. + t.Parallel() + + topic := makeTopic() + createTopic(t, topic, 10) + defer deleteTopic(t, topic) + + var lock sync.Mutex + assignments := make([][]GroupMemberTopic, 0) + groupID := makeGroupID() + r := NewReader(ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: topic, + GroupID: groupID, + HeartbeatInterval: 2 * time.Second, + CommitInterval: 1 * time.Second, + RebalanceTimeout: 2 * time.Second, + RetentionTime: time.Hour, + MinBytes: 1, + MaxBytes: 1e6, + AssignmentListener: func(partitions []GroupMemberTopic) { + lock.Lock() + defer lock.Unlock() + assignments = append(assignments, partitions) + }, + }) + defer r.Close() + + assert.Eventually(t, func() bool { + return reflect.DeepEqual(assignments, [][]GroupMemberTopic{ + { + GroupMemberTopic{ + Topic: topic, + Partitions: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + }, + }) + }, 10*time.Second, 100*time.Millisecond) +} + func testReaderConsumerGroupHandshake(t *testing.T, ctx context.Context, r *Reader) { prepareReader(t, context.Background(), r, makeTestSequence(5)...) From 980d2d9af18929e1c74a70b5fedaf909e467569a Mon Sep 17 00:00:00 2001 From: nachogiljaldo Date: Wed, 3 Jul 2024 00:08:18 +0200 Subject: [PATCH 2/3] Fix data race on test. --- reader_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/reader_test.go b/reader_test.go index 47893907e..6e97026a1 100644 --- a/reader_test.go +++ b/reader_test.go @@ -923,6 +923,8 @@ func TestAssignmentListener(t *testing.T) { defer r.Close() assert.Eventually(t, func() bool { + lock.Lock() + defer lock.Unlock() return reflect.DeepEqual(assignments, [][]GroupMemberTopic{ { GroupMemberTopic{ From 5e3960dc6f3d622893d46a46b4d01ad0a6db8117 Mon Sep 17 00:00:00 2001 From: nachogiljaldo Date: Wed, 11 Sep 2024 12:30:16 +0200 Subject: [PATCH 3/3] Do not sort before sending to the listener. --- reader.go | 3 --- reader_test.go | 9 ++++++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/reader.go b/reader.go index f9521461c..958a2ed87 100644 --- a/reader.go +++ b/reader.go @@ -311,9 +311,6 @@ func (r *Reader) run(cg *ConsumerGroup) { for _, partition := range partitions { assignedPartitions = append(assignedPartitions, partition.ID) } - sort.Slice(assignedPartitions, func(i, j int) bool { - return assignedPartitions[i] < assignedPartitions[j] - }) assignments = append(assignments, GroupMemberTopic{ Topic: topic, Partitions: assignedPartitions, diff --git a/reader_test.go b/reader_test.go index 6e97026a1..54b13fabc 100644 --- a/reader_test.go +++ b/reader_test.go @@ -10,6 +10,7 @@ import ( "net" "os" "reflect" + "sort" "strconv" "sync" "testing" @@ -891,7 +892,7 @@ func TestReaderConsumerGroup(t *testing.T) { } } -func TestAssignmentListener(t *testing.T) { +func TestPartitionAssignmentListener(t *testing.T) { // It appears that some of the tests depend on all these tests being // run concurrently to pass... this is brittle and should be fixed // at some point. @@ -917,6 +918,12 @@ func TestAssignmentListener(t *testing.T) { AssignmentListener: func(partitions []GroupMemberTopic) { lock.Lock() defer lock.Unlock() + // we sort the received partitions for easier comparison + for _, partition := range partitions { + sort.Slice(partition.Partitions, func(i, j int) bool { + return partition.Partitions[i] < partition.Partitions[j] + }) + } assignments = append(assignments, partitions) }, })