Skip to content

Commit 3935c78

Browse files
rarguelloFdarccio
andauthored
contrib/confluentinc/confluent-kafka-go: fix goroutine leak in Produce (#2924)
Co-authored-by: Dario Castañé <[email protected]>
1 parent 62ebcc3 commit 3935c78

File tree

4 files changed

+289
-176
lines changed

4 files changed

+289
-176
lines changed

contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -334,31 +334,43 @@ func (p *Producer) Close() {
334334
func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
335335
span := p.startSpan(msg)
336336

337+
var errChan chan error
338+
337339
// if the user has selected a delivery channel, we will wrap it and
338-
// wait for the delivery event to finish the span
340+
// wait for the delivery event to finish the span.
341+
// in case the Produce call returns an error, we won't receive anything
342+
// in the deliveryChan, so we use errChan to make sure this goroutine exits.
339343
if deliveryChan != nil {
344+
errChan = make(chan error, 1)
340345
oldDeliveryChan := deliveryChan
341346
deliveryChan = make(chan kafka.Event)
342347
go func() {
343348
var err error
344-
evt := <-deliveryChan
345-
if msg, ok := evt.(*kafka.Message); ok {
346-
// delivery errors are returned via TopicPartition.Error
347-
err = msg.TopicPartition.Error
348-
trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, err)
349+
select {
350+
case evt := <-deliveryChan:
351+
if msg, ok := evt.(*kafka.Message); ok {
352+
// delivery errors are returned via TopicPartition.Error
353+
err = msg.TopicPartition.Error
354+
trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, err)
355+
}
356+
oldDeliveryChan <- evt
357+
358+
case e := <-errChan:
359+
err = e
349360
}
350361
span.Finish(tracer.WithError(err))
351-
oldDeliveryChan <- evt
352362
}()
353363
}
354364

355365
setProduceCheckpoint(p.cfg.dataStreamsEnabled, p.libraryVersion, msg)
356366
err := p.Producer.Produce(msg, deliveryChan)
357-
// with no delivery channel or enqueue error, finish immediately
358-
if err != nil || deliveryChan == nil {
359-
span.Finish(tracer.WithError(err))
367+
if err != nil {
368+
if deliveryChan != nil {
369+
errChan <- err
370+
} else {
371+
span.Finish(tracer.WithError(err))
372+
}
360373
}
361-
362374
return err
363375
}
364376

contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go

Lines changed: 122 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -23,89 +23,14 @@ import (
2323
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
2424
"github.com/stretchr/testify/assert"
2525
"github.com/stretchr/testify/require"
26+
"go.uber.org/goleak"
2627
)
2728

2829
var (
2930
testGroupID = "gotest"
3031
testTopic = "gotest"
3132
)
3233

33-
type consumerActionFn func(c *Consumer) (*kafka.Message, error)
34-
35-
func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerOpts []Option, consumerOpts []Option) ([]mocktracer.Span, *kafka.Message) {
36-
if _, ok := os.LookupEnv("INTEGRATION"); !ok {
37-
t.Skip("to enable integration test, set the INTEGRATION environment variable")
38-
}
39-
mt := mocktracer.Start()
40-
defer mt.Stop()
41-
42-
// first write a message to the topic
43-
p, err := NewProducer(&kafka.ConfigMap{
44-
"bootstrap.servers": "127.0.0.1:9092",
45-
"go.delivery.reports": true,
46-
}, producerOpts...)
47-
require.NoError(t, err)
48-
49-
delivery := make(chan kafka.Event, 1)
50-
err = p.Produce(&kafka.Message{
51-
TopicPartition: kafka.TopicPartition{
52-
Topic: &testTopic,
53-
Partition: 0,
54-
},
55-
Key: []byte("key2"),
56-
Value: []byte("value2"),
57-
}, delivery)
58-
require.NoError(t, err)
59-
60-
msg1, _ := (<-delivery).(*kafka.Message)
61-
p.Close()
62-
63-
// next attempt to consume the message
64-
c, err := NewConsumer(&kafka.ConfigMap{
65-
"group.id": testGroupID,
66-
"bootstrap.servers": "127.0.0.1:9092",
67-
"fetch.wait.max.ms": 500,
68-
"socket.timeout.ms": 1500,
69-
"session.timeout.ms": 1500,
70-
"enable.auto.offset.store": false,
71-
}, consumerOpts...)
72-
require.NoError(t, err)
73-
74-
err = c.Assign([]kafka.TopicPartition{
75-
{Topic: &testTopic, Partition: 0, Offset: msg1.TopicPartition.Offset},
76-
})
77-
require.NoError(t, err)
78-
79-
msg2, err := consumerAction(c)
80-
require.NoError(t, err)
81-
_, err = c.CommitMessage(msg2)
82-
require.NoError(t, err)
83-
assert.Equal(t, msg1.String(), msg2.String())
84-
err = c.Close()
85-
require.NoError(t, err)
86-
87-
spans := mt.FinishedSpans()
88-
require.Len(t, spans, 2)
89-
// they should be linked via headers
90-
assert.Equal(t, spans[0].TraceID(), spans[1].TraceID())
91-
92-
if c.cfg.dataStreamsEnabled {
93-
backlogs := mt.SentDSMBacklogs()
94-
toMap := func(b []internaldsm.Backlog) map[string]struct{} {
95-
m := make(map[string]struct{})
96-
for _, b := range backlogs {
97-
m[strings.Join(b.Tags, "")] = struct{}{}
98-
}
99-
return m
100-
}
101-
backlogsMap := toMap(backlogs)
102-
require.Contains(t, backlogsMap, "consumer_group:"+testGroupID+"partition:0"+"topic:"+testTopic+"type:kafka_commit")
103-
require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_high_watermark")
104-
require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_produce")
105-
}
106-
return spans, msg2
107-
}
108-
10934
func TestConsumerChannel(t *testing.T) {
11035
// we can test consuming via the Events channel by artifically sending
11136
// messages. Testing .Poll is done via an integration test.
@@ -320,7 +245,7 @@ func TestCustomTags(t *testing.T) {
320245
"socket.timeout.ms": 10,
321246
"session.timeout.ms": 10,
322247
"enable.auto.offset.store": false,
323-
}, WithCustomTag("foo", func(msg *kafka.Message) interface{} {
248+
}, WithCustomTag("foo", func(_ *kafka.Message) interface{} {
324249
return "bar"
325250
}), WithCustomTag("key", func(msg *kafka.Message) interface{} {
326251
return msg.Key
@@ -370,3 +295,123 @@ func TestNamingSchema(t *testing.T) {
370295
}
371296
namingschematest.NewKafkaTest(genSpans)(t)
372297
}
298+
299+
// Test we don't leak goroutines and properly close the span when Produce returns an error.
300+
func TestProduceError(t *testing.T) {
301+
defer func() {
302+
err := goleak.Find()
303+
if err != nil {
304+
// if a goroutine is leaking, ensure it is not coming from this package
305+
assert.NotContains(t, err.Error(), "contrib/confluentinc/confluent-kafka-go")
306+
}
307+
}()
308+
309+
mt := mocktracer.Start()
310+
defer mt.Stop()
311+
312+
// first write a message to the topic
313+
p, err := NewProducer(&kafka.ConfigMap{
314+
"bootstrap.servers": "127.0.0.1:9092",
315+
"go.delivery.reports": true,
316+
})
317+
require.NoError(t, err)
318+
defer p.Close()
319+
320+
// this empty message should cause an error in the Produce call.
321+
topic := ""
322+
msg := &kafka.Message{
323+
TopicPartition: kafka.TopicPartition{
324+
Topic: &topic,
325+
},
326+
}
327+
deliveryChan := make(chan kafka.Event, 1)
328+
err = p.Produce(msg, deliveryChan)
329+
require.Error(t, err)
330+
require.EqualError(t, err, "Local: Invalid argument or configuration")
331+
332+
select {
333+
case <-deliveryChan:
334+
assert.Fail(t, "there should be no events in the deliveryChan")
335+
case <-time.After(1 * time.Second):
336+
// assume there is no event
337+
}
338+
339+
spans := mt.FinishedSpans()
340+
assert.Len(t, spans, 1)
341+
}
342+
343+
type consumerActionFn func(c *Consumer) (*kafka.Message, error)
344+
345+
func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerOpts []Option, consumerOpts []Option) ([]mocktracer.Span, *kafka.Message) {
346+
if _, ok := os.LookupEnv("INTEGRATION"); !ok {
347+
t.Skip("to enable integration test, set the INTEGRATION environment variable")
348+
}
349+
mt := mocktracer.Start()
350+
defer mt.Stop()
351+
352+
// first write a message to the topic
353+
p, err := NewProducer(&kafka.ConfigMap{
354+
"bootstrap.servers": "127.0.0.1:9092",
355+
"go.delivery.reports": true,
356+
}, producerOpts...)
357+
require.NoError(t, err)
358+
359+
delivery := make(chan kafka.Event, 1)
360+
err = p.Produce(&kafka.Message{
361+
TopicPartition: kafka.TopicPartition{
362+
Topic: &testTopic,
363+
Partition: 0,
364+
},
365+
Key: []byte("key2"),
366+
Value: []byte("value2"),
367+
}, delivery)
368+
require.NoError(t, err)
369+
370+
msg1, _ := (<-delivery).(*kafka.Message)
371+
p.Close()
372+
373+
// next attempt to consume the message
374+
c, err := NewConsumer(&kafka.ConfigMap{
375+
"group.id": testGroupID,
376+
"bootstrap.servers": "127.0.0.1:9092",
377+
"fetch.wait.max.ms": 500,
378+
"socket.timeout.ms": 1500,
379+
"session.timeout.ms": 1500,
380+
"enable.auto.offset.store": false,
381+
}, consumerOpts...)
382+
require.NoError(t, err)
383+
384+
err = c.Assign([]kafka.TopicPartition{
385+
{Topic: &testTopic, Partition: 0, Offset: msg1.TopicPartition.Offset},
386+
})
387+
require.NoError(t, err)
388+
389+
msg2, err := consumerAction(c)
390+
require.NoError(t, err)
391+
_, err = c.CommitMessage(msg2)
392+
require.NoError(t, err)
393+
assert.Equal(t, msg1.String(), msg2.String())
394+
err = c.Close()
395+
require.NoError(t, err)
396+
397+
spans := mt.FinishedSpans()
398+
require.Len(t, spans, 2)
399+
// they should be linked via headers
400+
assert.Equal(t, spans[0].TraceID(), spans[1].TraceID())
401+
402+
if c.cfg.dataStreamsEnabled {
403+
backlogs := mt.SentDSMBacklogs()
404+
toMap := func(_ []internaldsm.Backlog) map[string]struct{} {
405+
m := make(map[string]struct{})
406+
for _, b := range backlogs {
407+
m[strings.Join(b.Tags, "")] = struct{}{}
408+
}
409+
return m
410+
}
411+
backlogsMap := toMap(backlogs)
412+
require.Contains(t, backlogsMap, "consumer_group:"+testGroupID+"partition:0"+"topic:"+testTopic+"type:kafka_commit")
413+
require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_high_watermark")
414+
require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_produce")
415+
}
416+
return spans, msg2
417+
}

contrib/confluentinc/confluent-kafka-go/kafka/kafka.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -334,31 +334,43 @@ func (p *Producer) Close() {
334334
func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
335335
span := p.startSpan(msg)
336336

337+
var errChan chan error
338+
337339
// if the user has selected a delivery channel, we will wrap it and
338-
// wait for the delivery event to finish the span
340+
// wait for the delivery event to finish the span.
341+
// in case the Produce call returns an error, we won't receive anything
342+
// in the deliveryChan, so we use errChan to make sure this goroutine exits.
339343
if deliveryChan != nil {
344+
errChan = make(chan error, 1)
340345
oldDeliveryChan := deliveryChan
341346
deliveryChan = make(chan kafka.Event)
342347
go func() {
343348
var err error
344-
evt := <-deliveryChan
345-
if msg, ok := evt.(*kafka.Message); ok {
346-
// delivery errors are returned via TopicPartition.Error
347-
err = msg.TopicPartition.Error
348-
trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, err)
349+
select {
350+
case evt := <-deliveryChan:
351+
if msg, ok := evt.(*kafka.Message); ok {
352+
// delivery errors are returned via TopicPartition.Error
353+
err = msg.TopicPartition.Error
354+
trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, err)
355+
}
356+
oldDeliveryChan <- evt
357+
358+
case e := <-errChan:
359+
err = e
349360
}
350361
span.Finish(tracer.WithError(err))
351-
oldDeliveryChan <- evt
352362
}()
353363
}
354364

355365
setProduceCheckpoint(p.cfg.dataStreamsEnabled, p.libraryVersion, msg)
356366
err := p.Producer.Produce(msg, deliveryChan)
357-
// with no delivery channel or enqueue error, finish immediately
358-
if err != nil || deliveryChan == nil {
359-
span.Finish(tracer.WithError(err))
367+
if err != nil {
368+
if deliveryChan != nil {
369+
errChan <- err
370+
} else {
371+
span.Finish(tracer.WithError(err))
372+
}
360373
}
361-
362374
return err
363375
}
364376

0 commit comments

Comments
 (0)