Skip to content

Commit 98987e0

Browse files
author
Iwasaki Yudai
committed
Rename ZookeeperTimeout to ZookeeperSessionTimeout
It's not actually a read timeout. If this value is not enough large, ephemeral nodes will be deleted while reconnecting to another ZK node when the leader ZK node goes down.
1 parent f498bb6 commit 98987e0

File tree

2 files changed

+6
-6
lines changed

2 files changed

+6
-6
lines changed

consumer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,7 @@ func testConsumerConfig() *ConsumerConfig {
631631
zkConfig := NewZookeeperConfig()
632632
zkConfig.ZookeeperConnect = []string{localZk}
633633
zkConfig.MaxRequestRetries = 10
634-
zkConfig.ZookeeperTimeout = 30 * time.Second
634+
zkConfig.ZookeeperSessionTimeout = 30 * time.Second
635635
zkConfig.RequestBackoff = 3 * time.Second
636636
config.Coordinator = NewZookeeperCoordinator(zkConfig)
637637

zk_coordinator.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func (this *ZookeeperCoordinator) Connect() (err error) {
8282

8383
func (this *ZookeeperCoordinator) tryConnect() (zkConn *zk.Conn, connectionEvents <-chan zk.Event, err error) {
8484
Infof(this, "Connecting to ZK at %s\n", this.config.ZookeeperConnect)
85-
zkConn, connectionEvents, err = zk.Connect(this.config.ZookeeperConnect, this.config.ZookeeperTimeout)
85+
zkConn, connectionEvents, err = zk.Connect(this.config.ZookeeperConnect, this.config.ZookeeperSessionTimeout)
8686
return
8787
}
8888

@@ -1102,8 +1102,8 @@ type ZookeeperConfig struct {
11021102
/* Zookeeper hosts */
11031103
ZookeeperConnect []string
11041104

1105-
/* Zookeeper read timeout */
1106-
ZookeeperTimeout time.Duration
1105+
/* Zookeeper session timeout */
1106+
ZookeeperSessionTimeout time.Duration
11071107

11081108
/* Max retries for any request except CommitOffset. CommitOffset is controlled by ConsumerConfig.OffsetsCommitMaxRetries. */
11091109
MaxRequestRetries int
@@ -1122,7 +1122,7 @@ type ZookeeperConfig struct {
11221122
func NewZookeeperConfig() *ZookeeperConfig {
11231123
config := &ZookeeperConfig{}
11241124
config.ZookeeperConnect = []string{"localhost"}
1125-
config.ZookeeperTimeout = 1 * time.Second
1125+
config.ZookeeperSessionTimeout = 5 * time.Second
11261126
config.MaxRequestRetries = 3
11271127
config.RequestBackoff = 150 * time.Millisecond
11281128
config.Root = ""
@@ -1152,7 +1152,7 @@ func ZookeeperConfigFromFile(filename string) (*ZookeeperConfig, error) {
11521152
setStringSliceConfig(&config.ZookeeperConnect, z["zookeeper.connect"], ",")
11531153
setStringConfig(&config.Root, z["zookeeper.kafka.root"])
11541154

1155-
if err := setDurationConfig(&config.ZookeeperTimeout, z["zookeeper.connection.timeout"]); err != nil {
1155+
if err := setDurationConfig(&config.ZookeeperSessionTimeout, z["zookeeper.connection.session.timeout"]); err != nil {
11561156
return nil, err
11571157
}
11581158
if err := setIntConfig(&config.MaxRequestRetries, z["zookeeper.max.request.retries"]); err != nil {

0 commit comments

Comments
 (0)