Skip to content

Commit 7693d37

Browse files
committed
CreateTopics: only suppress topic already exists errors
1 parent 199b8b2 commit 7693d37

File tree

2 files changed

+74
-13
lines changed

2 files changed

+74
-13
lines changed

createtopics.go

+4-12
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package kafka
33
import (
44
"bufio"
55
"context"
6-
"errors"
76
"fmt"
87
"net"
98
"time"
@@ -65,7 +64,6 @@ func (c *Client) CreateTopics(ctx context.Context, req *CreateTopicsRequest) (*C
6564
TimeoutMs: c.timeoutMs(ctx, defaultCreateTopicsTimeout),
6665
ValidateOnly: req.ValidateOnly,
6766
})
68-
6967
if err != nil {
7068
return nil, fmt.Errorf("kafka.(*Client).CreateTopics: %w", err)
7169
}
@@ -363,6 +361,9 @@ func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponse
363361
return response, err
364362
}
365363
for _, tr := range response.TopicErrors {
364+
if tr.ErrorCode == int16(TopicAlreadyExists) {
365+
continue
366+
}
366367
if tr.ErrorCode != 0 {
367368
return response, Error(tr.ErrorCode)
368369
}
@@ -385,14 +386,5 @@ func (c *Conn) CreateTopics(topics ...TopicConfig) error {
385386
_, err := c.createTopics(createTopicsRequestV0{
386387
Topics: requestV0Topics,
387388
})
388-
if err != nil {
389-
if errors.Is(err, TopicAlreadyExists) {
390-
// ok
391-
return nil
392-
}
393-
394-
return err
395-
}
396-
397-
return nil
389+
return err
398390
}

createtopics_test.go

+70-1
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,80 @@ import (
44
"bufio"
55
"bytes"
66
"context"
7+
"errors"
8+
"net"
79
"reflect"
10+
"strconv"
811
"testing"
912
)
1013

14+
func TestConnCreateTopics(t *testing.T) {
15+
topic1 := makeTopic()
16+
topic2 := makeTopic()
17+
18+
conn, err := DialContext(context.Background(), "tcp", "localhost:9092")
19+
defer conn.Close()
20+
if err != nil {
21+
panic(err)
22+
}
23+
24+
controller, _ := conn.Controller()
25+
26+
controllerConn, err := Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
27+
if err != nil {
28+
t.Fatal(err)
29+
}
30+
defer controllerConn.Close()
31+
32+
err = controllerConn.CreateTopics(TopicConfig{
33+
Topic: topic1,
34+
NumPartitions: 1,
35+
ReplicationFactor: 1,
36+
})
37+
if err != nil {
38+
t.Fatalf("unexpected error creating topic: %s", err.Error())
39+
}
40+
41+
err = controllerConn.CreateTopics(TopicConfig{
42+
Topic: topic1,
43+
NumPartitions: 1,
44+
ReplicationFactor: 1,
45+
})
46+
47+
// Duplicate topic should not return an error
48+
if err != nil {
49+
t.Fatalf("unexpected error creating duplicate topic topic: %v", err)
50+
}
51+
52+
err = controllerConn.CreateTopics(
53+
TopicConfig{
54+
Topic: topic1,
55+
NumPartitions: 1,
56+
ReplicationFactor: 1,
57+
},
58+
TopicConfig{
59+
Topic: topic2,
60+
NumPartitions: 1,
61+
ReplicationFactor: 1,
62+
},
63+
TopicConfig{
64+
Topic: topic2,
65+
NumPartitions: 1,
66+
ReplicationFactor: 1,
67+
},
68+
)
69+
70+
if err == nil {
71+
t.Fatal("CreateTopics should have returned an error for invalid requests")
72+
}
73+
74+
if !errors.Is(err, InvalidRequest) {
75+
t.Fatalf("expected invalid request: %v", err)
76+
}
77+
78+
deleteTopic(t, topic1, topic2)
79+
}
80+
1181
func TestClientCreateTopics(t *testing.T) {
1282
const (
1383
topic1 = "client-topic-1"
@@ -59,7 +129,6 @@ func TestClientCreateTopics(t *testing.T) {
59129
},
60130
},
61131
})
62-
63132
if err != nil {
64133
t.Fatal(err)
65134
}

0 commit comments

Comments
 (0)