Skip to content

Commit 8091163

Browse files
committed
(#1308) - optionally fail commit attempts for generations that are not valid anymore.
1 parent 7b9c99d commit 8091163

File tree

7 files changed

+345
-77
lines changed

7 files changed

+345
-77
lines changed

batch.go

+3
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,9 @@ func (batch *Batch) ReadMessage() (Message, error) {
232232
msg.HighWaterMark = batch.highWaterMark
233233
msg.Time = makeTime(timestamp)
234234
msg.Headers = headers
235+
if batch.conn != nil {
236+
msg.GenerationId = batch.conn.generationId
237+
}
235238

236239
return msg, err
237240
}

commit.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,20 @@ package kafka
33
// A commit represents the instruction of publishing an update of the last
44
// offset read by a program for a topic and partition.
55
type commit struct {
6-
topic string
7-
partition int
8-
offset int64
6+
topic string
7+
partition int
8+
offset int64
9+
generationId int32
910
}
1011

1112
// makeCommit builds a commit value from a message, the resulting commit takes
1213
// its topic, partition, and offset from the message.
1314
func makeCommit(msg Message) commit {
1415
return commit{
15-
topic: msg.Topic,
16-
partition: msg.Partition,
17-
offset: msg.Offset + 1,
16+
topic: msg.Topic,
17+
partition: msg.Partition,
18+
offset: msg.Offset + 1,
19+
generationId: msg.GenerationId,
1820
}
1921
}
2022

conn.go

+6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
var (
1818
errInvalidWriteTopic = errors.New("writes must NOT set Topic on kafka.Message")
1919
errInvalidWritePartition = errors.New("writes must NOT set Partition on kafka.Message")
20+
21+
undefinedGenerationId int32 = -1
2022
)
2123

2224
// Conn represents a connection to a kafka broker.
@@ -65,6 +67,8 @@ type Conn struct {
6567
apiVersions atomic.Value // apiVersionMap
6668

6769
transactionalID *string
70+
71+
generationId int32
6872
}
6973

7074
type apiVersionMap map[apiKey]ApiVersion
@@ -182,6 +186,7 @@ func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
182186
offset: FirstOffset,
183187
requiredAcks: -1,
184188
transactionalID: emptyToNullable(config.TransactionalID),
189+
generationId: undefinedGenerationId,
185190
}
186191

187192
c.wb.w = &c.wbuf
@@ -388,6 +393,7 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error
388393
return joinGroupResponseV1{}, Error(response.ErrorCode)
389394
}
390395

396+
c.generationId = response.GenerationID
391397
return response, nil
392398
}
393399

consumergroup.go

+18-12
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,12 @@ func (g *Generation) Start(fn func(ctx context.Context)) {
416416
// consumer group coordinator. This can be used to reset the consumer to
417417
// explicit offsets.
418418
func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error {
419+
return g.CommitOffsetsForGenID(g.ID, offsets)
420+
}
421+
422+
// CommitOffsetsForGenID commits the provided topic+partition+offset combos to the
423+
// consumer group coordinator specifying the given genID.
424+
func (g *Generation) CommitOffsetsForGenID(genID int32, offsets map[string]map[int]int64) error {
419425
if len(offsets) == 0 {
420426
return nil
421427
}
@@ -434,7 +440,7 @@ func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error {
434440

435441
request := offsetCommitRequestV2{
436442
GroupID: g.GroupID,
437-
GenerationID: g.ID,
443+
GenerationID: genID,
438444
MemberID: g.MemberID,
439445
RetentionTime: g.retentionMillis,
440446
Topics: topics,
@@ -925,12 +931,12 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) {
925931
// the leader. Otherwise, GroupMemberAssignments will be nil.
926932
//
927933
// Possible kafka error codes returned:
928-
// * GroupLoadInProgress:
929-
// * GroupCoordinatorNotAvailable:
930-
// * NotCoordinatorForGroup:
931-
// * InconsistentGroupProtocol:
932-
// * InvalidSessionTimeout:
933-
// * GroupAuthorizationFailed:
934+
// * GroupLoadInProgress:
935+
// * GroupCoordinatorNotAvailable:
936+
// * NotCoordinatorForGroup:
937+
// * InconsistentGroupProtocol:
938+
// * InvalidSessionTimeout:
939+
// * GroupAuthorizationFailed:
934940
func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
935941
request, err := cg.makeJoinGroupRequestV1(memberID)
936942
if err != nil {
@@ -1073,11 +1079,11 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember
10731079
// Readers subscriptions topic => partitions
10741080
//
10751081
// Possible kafka error codes returned:
1076-
// * GroupCoordinatorNotAvailable:
1077-
// * NotCoordinatorForGroup:
1078-
// * IllegalGeneration:
1079-
// * RebalanceInProgress:
1080-
// * GroupAuthorizationFailed:
1082+
// * GroupCoordinatorNotAvailable:
1083+
// * NotCoordinatorForGroup:
1084+
// * IllegalGeneration:
1085+
// * RebalanceInProgress:
1086+
// * GroupAuthorizationFailed:
10811087
func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
10821088
request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments)
10831089
response, err := conn.syncGroup(request)

message.go

+4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ type Message struct {
2020
Value []byte
2121
Headers []Header
2222

23+
// If the message has been sent by a consumer group, it contains the
24+
// generation's id. Value is -1 if not using consumer groups.
25+
GenerationId int32
26+
2327
// This field is used to hold arbitrary data you wish to include, so it
2428
// will be available when handle it on the Writer's `Completion` method,
2529
// this support the application can do any post operation on each message.

reader.go

+75-19
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (r *Reader) unsubscribe() {
121121
// another consumer to avoid such a race.
122122
}
123123

124-
func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) {
124+
func (r *Reader) subscribe(generationId int32, allAssignments map[string][]PartitionAssignment) {
125125
offsets := make(map[topicPartition]int64)
126126
for topic, assignments := range allAssignments {
127127
for _, assignment := range assignments {
@@ -134,7 +134,7 @@ func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) {
134134
}
135135

136136
r.mutex.Lock()
137-
r.start(offsets)
137+
r.start(generationId, offsets)
138138
r.mutex.Unlock()
139139

140140
r.withLogger(func(l Logger) {
@@ -150,35 +150,73 @@ func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash
150150
backoffDelayMax = 5 * time.Second
151151
)
152152

153-
for attempt := 0; attempt < retries; attempt++ {
154-
if attempt != 0 {
155-
if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) {
156-
return
153+
messagesToSendForGeneration := make(map[int32]map[string]map[int]int64)
154+
for topic, partitionsInfo := range offsetStash {
155+
for partition, commitInfo := range partitionsInfo {
156+
if _, ok := messagesToSendForGeneration[commitInfo.generationID]; !ok {
157+
messagesToSendForGeneration[commitInfo.generationID] = make(map[string]map[int]int64)
158+
}
159+
msgsForTopic := messagesToSendForGeneration[commitInfo.generationID]
160+
if _, ok := msgsForTopic[topic]; !ok {
161+
msgsForTopic[topic] = make(map[int]int64)
157162
}
163+
msgsForPartition := msgsForTopic[topic]
164+
msgsForPartition[partition] = commitInfo.offset
158165
}
166+
}
167+
var illegalGenerationErr bool
168+
for generationID, messages := range messagesToSendForGeneration {
169+
for attempt := 0; attempt < retries; attempt++ {
170+
if attempt != 0 {
171+
if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) {
172+
continue
173+
}
174+
}
159175

160-
if err = gen.CommitOffsets(offsetStash); err == nil {
161-
return
176+
if err = gen.CommitOffsetsForGenID(generationID, messages); err == nil {
177+
break
178+
}
179+
180+
// IllegalGeneration error is not retriable, but we should attempt to
181+
// perform the remaining commits
182+
if errors.Is(err, IllegalGeneration) {
183+
r.withErrorLogger(func(l Logger) { l.Printf("generation %d - %v", generationID, err) })
184+
offsetStash.removeGenerationID(generationID)
185+
illegalGenerationErr = true
186+
err = nil
187+
break
188+
}
162189
}
163190
}
164191

192+
// if configured to ignore the error
193+
if illegalGenerationErr && r.config.ErrorOnWrongGenerationCommit {
194+
err = IllegalGeneration
195+
}
165196
return // err will not be nil
166197
}
167198

168-
// offsetStash holds offsets by topic => partition => offset.
169-
type offsetStash map[string]map[int]int64
199+
// offsetStash holds offsets by topic => partition => offsetEntry.
200+
type offsetEntry struct {
201+
offset int64
202+
generationID int32
203+
}
204+
type offsetStash map[string]map[int]offsetEntry
170205

171206
// merge updates the offsetStash with the offsets from the provided messages.
172207
func (o offsetStash) merge(commits []commit) {
173208
for _, c := range commits {
174209
offsetsByPartition, ok := o[c.topic]
175210
if !ok {
176-
offsetsByPartition = map[int]int64{}
211+
offsetsByPartition = map[int]offsetEntry{}
177212
o[c.topic] = offsetsByPartition
178213
}
179214

180-
if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset {
181-
offsetsByPartition[c.partition] = c.offset
215+
if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset.offset {
216+
offsetsByPartition[c.partition] = offsetEntry{
217+
offset: c.offset,
218+
generationID: c.generationId,
219+
}
182220
}
183221
}
184222
}
@@ -190,6 +228,19 @@ func (o offsetStash) reset() {
190228
}
191229
}
192230

231+
func (o offsetStash) removeGenerationID(genID int32) {
232+
for topic, offsetsForTopic := range o {
233+
for partition, offsetsForPartition := range offsetsForTopic {
234+
if offsetsForPartition.generationID == genID {
235+
delete(offsetsForTopic, partition)
236+
}
237+
if len(offsetsForTopic) == 0 {
238+
delete(o, topic)
239+
}
240+
}
241+
}
242+
}
243+
193244
// commitLoopImmediate handles each commit synchronously.
194245
func (r *Reader) commitLoopImmediate(ctx context.Context, gen *Generation) {
195246
offsets := offsetStash{}
@@ -329,7 +380,7 @@ func (r *Reader) run(cg *ConsumerGroup) {
329380

330381
r.stats.rebalances.observe(1)
331382

332-
r.subscribe(gen.Assignments)
383+
r.subscribe(gen.ID, gen.Assignments)
333384

334385
gen.Start(func(ctx context.Context) {
335386
r.commitLoop(ctx, gen)
@@ -522,6 +573,10 @@ type ReaderConfig struct {
522573
// This flag is being added to retain backwards-compatibility, so it will be
523574
// removed in a future version of kafka-go.
524575
OffsetOutOfRangeError bool
576+
577+
// ErrorOnWrongGenerationCommit indicates that we should return an error when
578+
// attempting to commit a message to a generation different than the current one.
579+
ErrorOnWrongGenerationCommit bool
525580
}
526581

527582
// Validate method validates ReaderConfig properties.
@@ -819,7 +874,7 @@ func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
819874
r.mutex.Lock()
820875

821876
if !r.closed && r.version == 0 {
822-
r.start(r.getTopicPartitionOffset())
877+
r.start(undefinedGenerationId, r.getTopicPartitionOffset())
823878
}
824879

825880
version := r.version
@@ -1040,7 +1095,7 @@ func (r *Reader) SetOffset(offset int64) error {
10401095
r.offset = offset
10411096

10421097
if r.version != 0 {
1043-
r.start(r.getTopicPartitionOffset())
1098+
r.start(undefinedGenerationId, r.getTopicPartitionOffset())
10441099
}
10451100

10461101
r.activateReadLag()
@@ -1178,7 +1233,7 @@ func (r *Reader) readLag(ctx context.Context) {
11781233
}
11791234
}
11801235

1181-
func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
1236+
func (r *Reader) start(generationId int32, offsetsByPartition map[topicPartition]int64) {
11821237
if r.closed {
11831238
// don't start child reader if parent Reader is closed
11841239
return
@@ -1216,7 +1271,7 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
12161271

12171272
// backwards-compatibility flags
12181273
offsetOutOfRangeError: r.config.OffsetOutOfRangeError,
1219-
}).run(ctx, offset)
1274+
}).run(ctx, generationId, offset)
12201275
}(ctx, key, offset, &r.join)
12211276
}
12221277
}
@@ -1253,7 +1308,7 @@ type readerMessage struct {
12531308
error error
12541309
}
12551310

1256-
func (r *reader) run(ctx context.Context, offset int64) {
1311+
func (r *reader) run(ctx context.Context, generationId int32, offset int64) {
12571312
// This is the reader's main loop, it only ends if the context is canceled
12581313
// and will keep attempting to reader messages otherwise.
12591314
//
@@ -1306,6 +1361,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
13061361
}
13071362
continue
13081363
}
1364+
conn.generationId = generationId
13091365

13101366
// Resetting the attempt counter ensures that if a failure occurs after
13111367
// a successful initialization we don't keep increasing the backoff

0 commit comments

Comments
 (0)