Skip to content

Commit bb190ee

Browse files
committed
Fix: Closing kafka Writer during WriteMessages causes a potential hang
Fixes #1307
1 parent 4713019 commit bb190ee

File tree

2 files changed

+54
-3
lines changed

2 files changed

+54
-3
lines changed

writer.go

+10-3
Original file line numberDiff line numberDiff line change
@@ -663,7 +663,10 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
663663
assignments[key] = append(assignments[key], int32(i))
664664
}
665665

666-
batches := w.batchMessages(msgs, assignments)
666+
batches, err := w.batchMessages(msgs, assignments)
667+
if err != nil {
668+
return err
669+
}
667670
if w.Async {
668671
return nil
669672
}
@@ -695,7 +698,7 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
695698
return werr
696699
}
697700

698-
func (w *Writer) batchMessages(messages []Message, assignments map[topicPartition][]int32) map[*writeBatch][]int32 {
701+
func (w *Writer) batchMessages(messages []Message, assignments map[topicPartition][]int32) (map[*writeBatch][]int32, error) {
699702
var batches map[*writeBatch][]int32
700703
if !w.Async {
701704
batches = make(map[*writeBatch][]int32, len(assignments))
@@ -704,6 +707,10 @@ func (w *Writer) batchMessages(messages []Message, assignments map[topicPartitio
704707
w.mutex.Lock()
705708
defer w.mutex.Unlock()
706709

710+
if w.closed {
711+
return nil, io.ErrClosedPipe
712+
}
713+
707714
if w.writers == nil {
708715
w.writers = map[topicPartition]*partitionWriter{}
709716
}
@@ -721,7 +728,7 @@ func (w *Writer) batchMessages(messages []Message, assignments map[topicPartitio
721728
}
722729
}
723730

724-
return batches
731+
return batches, nil
725732
}
726733

727734
func (w *Writer) produce(key topicPartition, batch *writeBatch) (*ProduceResponse, error) {

writer_test.go

+44
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,10 @@ func TestWriter(t *testing.T) {
191191
scenario: "test write message with writer data",
192192
function: testWriteMessageWithWriterData,
193193
},
194+
{
195+
scenario: "test no new partition writers after close",
196+
function: testWriterNoNewPartitionWritersAfterClose,
197+
},
194198
}
195199

196200
for _, test := range tests {
@@ -1030,6 +1034,46 @@ func testWriterOverrideConfigStats(t *testing.T) {
10301034
}
10311035
}
10321036

1037+
func testWriterNoNewPartitionWritersAfterClose(t *testing.T) {
1038+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1039+
defer cancel()
1040+
topic1 := makeTopic()
1041+
createTopic(t, topic1, 1)
1042+
defer deleteTopic(t, topic1)
1043+
1044+
w := newTestWriter(WriterConfig{
1045+
Topic: topic1,
1046+
})
1047+
defer w.Close() // try and close anyway after test finished
1048+
1049+
// using balancer to close writer right between first mutex is released and second mutex is taken to make map of partition writers
1050+
w.Balancer = mockBalancerFunc(func(m Message, i ...int) int {
1051+
go w.Close() // close is blocking so run in goroutine
1052+
for { // wait until writer is marked as closed
1053+
w.mutex.Lock()
1054+
if w.closed {
1055+
w.mutex.Unlock()
1056+
break
1057+
}
1058+
w.mutex.Unlock()
1059+
}
1060+
return 0
1061+
})
1062+
1063+
msg := Message{Value: []byte("Hello World")} // no topic
1064+
1065+
if err := w.WriteMessages(ctx, msg); !errors.Is(err, io.ErrClosedPipe) {
1066+
t.Errorf("expected error: %v got: %v", io.ErrClosedPipe, err)
1067+
return
1068+
}
1069+
}
1070+
1071+
type mockBalancerFunc func(msg Message, partitions ...int) (partition int)
1072+
1073+
func (b mockBalancerFunc) Balance(msg Message, partitions ...int) int {
1074+
return b(msg, partitions...)
1075+
}
1076+
10331077
type staticBalancer struct {
10341078
partition int
10351079
}

0 commit comments

Comments
 (0)