From e18d764f0f340c1279f6ffa501a7e3a2b28a9080 Mon Sep 17 00:00:00 2001 From: Youngteac Hong Date: Wed, 12 Feb 2025 15:59:52 +0900 Subject: [PATCH] Add write timeout configuration for Kafka --- cmd/yorkie/server.go | 13 +++-- server/backend/messagebroker/config.go | 28 +++++++++- server/backend/messagebroker/config_test.go | 51 +++++++++++++++++++ server/backend/messagebroker/kafka.go | 20 ++++---- server/backend/messagebroker/messagebroker.go | 7 ++- server/config.go | 3 ++ server/config.sample.yml | 3 ++ 7 files changed, 107 insertions(+), 18 deletions(-) create mode 100644 server/backend/messagebroker/config_test.go diff --git a/cmd/yorkie/server.go b/cmd/yorkie/server.go index a0334c42e..3708ec91c 100644 --- a/cmd/yorkie/server.go +++ b/cmd/yorkie/server.go @@ -54,8 +54,9 @@ var ( authWebhookCacheTTL time.Duration projectCacheTTL time.Duration - kafkaAddresses string - kafkaTopic string + kafkaAddresses string + kafkaTopic string + kafkaWriteTimeout time.Duration conf = server.NewConfig() ) @@ -379,9 +380,15 @@ func init() { cmd.Flags().StringVar( &kafkaTopic, "kafka-topic", - "", + server.DefaultKafkaTopic, "Kafka topic name to publish events", ) + cmd.Flags().DurationVar( + &kafkaWriteTimeout, + "kafka-write-timeout", + server.DefaultKafkaWriteTimeout, + "Timeout for writing messages to Kafka", + ) rootCmd.AddCommand(cmd) } diff --git a/server/backend/messagebroker/config.go b/server/backend/messagebroker/config.go index 45fb5b34c..35ef43af6 100644 --- a/server/backend/messagebroker/config.go +++ b/server/backend/messagebroker/config.go @@ -21,6 +21,7 @@ import ( "fmt" "net/url" "strings" + "time" ) var ( @@ -29,12 +30,31 @@ var ( // ErrEmptyTopic is returned when the topic is empty. ErrEmptyTopic = errors.New("topic cannot be empty") + + // ErrInvalidDuration is returned when the duration is invalid. + ErrInvalidDuration = errors.New("invalid duration") ) // Config is the configuration for creating a message broker instance. type Config struct { - Addresses string `yaml:"Addresses"` - Topic string `yaml:"Topic"` + Addresses string `yaml:"Addresses"` + Topic string `yaml:"Topic"` + WriteTimeout string `yaml:"WriteTimeout"` +} + +// SplitAddresses splits the addresses by comma. +func (c *Config) SplitAddresses() []string { + return strings.Split(c.Addresses, ",") +} + +// MustParseWriteTimeout parses the write timeout and returns the duration. +func (c *Config) MustParseWriteTimeout() time.Duration { + d, err := time.ParseDuration(c.WriteTimeout) + if err != nil { + panic(ErrInvalidDuration) + } + + return d } // Validate validates this config. @@ -58,5 +78,9 @@ func (c *Config) Validate() error { return ErrEmptyTopic } + if _, err := time.ParseDuration(c.WriteTimeout); err != nil { + return fmt.Errorf(`parse write timeout "%s": %w`, c.WriteTimeout, ErrInvalidDuration) + } + return nil } diff --git a/server/backend/messagebroker/config_test.go b/server/backend/messagebroker/config_test.go new file mode 100644 index 000000000..182d7cf74 --- /dev/null +++ b/server/backend/messagebroker/config_test.go @@ -0,0 +1,51 @@ +/* + * Copyright 2025 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package messagebroker_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/yorkie-team/yorkie/server/backend/messagebroker" +) + +func TestConfig(t *testing.T) { + t.Run("test split addresses", func(t *testing.T) { + c := &messagebroker.Config{ + Addresses: "localhost:8080,localhost:8081", + } + addrs := c.SplitAddresses() + assert.Equal(t, []string{"localhost:8080", "localhost:8081"}, addrs) + }) + + t.Run("test must parse write timeout", func(t *testing.T) { + c := &messagebroker.Config{ + WriteTimeout: "1s", + } + assert.Equal(t, time.Second, c.MustParseWriteTimeout()) + }) + + t.Run("test must parse write timeout with invalid duration", func(t *testing.T) { + c := &messagebroker.Config{ + WriteTimeout: "1", + } + assert.PanicsWithError(t, messagebroker.ErrInvalidDuration.Error(), func() { + c.MustParseWriteTimeout() + }) + }) +} diff --git a/server/backend/messagebroker/kafka.go b/server/backend/messagebroker/kafka.go index 68ff045b7..ba49e69b7 100644 --- a/server/backend/messagebroker/kafka.go +++ b/server/backend/messagebroker/kafka.go @@ -25,17 +25,20 @@ import ( // KafkaBroker is a producer for Kafka. type KafkaBroker struct { + conf *Config writer *kafka.Writer } // newKafkaBroker creates a new instance of KafkaProducer. -func newKafkaBroker(addresses []string, topic string) *KafkaBroker { +func newKafkaBroker(conf *Config) *KafkaBroker { return &KafkaBroker{ + conf: conf, writer: &kafka.Writer{ - Addr: kafka.TCP(addresses...), - Topic: topic, - Balancer: &kafka.LeastBytes{}, - Async: true, + Addr: kafka.TCP(conf.SplitAddresses()...), + Topic: conf.Topic, + WriteTimeout: conf.MustParseWriteTimeout(), + Balancer: &kafka.LeastBytes{}, + Async: true, }, } } @@ -47,12 +50,11 @@ func (mb *KafkaBroker) Produce( ) error { value, err := msg.Marshal() if err != nil { - return fmt.Errorf("marshal message: %v", err) + return fmt.Errorf("marshal message: %w", err) } - // TODO(hackerwins): Consider using message batching. if err := mb.writer.WriteMessages(ctx, kafka.Message{Value: value}); err != nil { - return fmt.Errorf("write message to kafka: %v", err) + return fmt.Errorf("write message to kafka: %w", err) } return nil @@ -61,7 +63,7 @@ func (mb *KafkaBroker) Produce( // Close closes the KafkaProducer. func (mb *KafkaBroker) Close() error { if err := mb.writer.Close(); err != nil { - return fmt.Errorf("close kafka writer: %v", err) + return fmt.Errorf("close kafka writer: %w", err) } return nil diff --git a/server/backend/messagebroker/messagebroker.go b/server/backend/messagebroker/messagebroker.go index e6e57c606..e268e9ea2 100644 --- a/server/backend/messagebroker/messagebroker.go +++ b/server/backend/messagebroker/messagebroker.go @@ -21,7 +21,6 @@ import ( "context" "encoding/json" "fmt" - "strings" "time" "github.com/yorkie-team/yorkie/api/types/events" @@ -35,10 +34,10 @@ type Message interface { // UserEventMessage represents a message for user events type UserEventMessage struct { + ProjectID string `json:"project_id"` + EventType events.ClientEventType `json:"event_type"` UserID string `json:"user_id"` Timestamp time.Time `json:"timestamp"` - EventType events.ClientEventType `json:"event_type"` - ProjectID string `json:"project_id"` UserAgent string `json:"user_agent"` } @@ -74,5 +73,5 @@ func Ensure(kafkaConf *Config) Broker { kafkaConf.Topic, ) - return newKafkaBroker(strings.Split(kafkaConf.Addresses, ","), kafkaConf.Topic) + return newKafkaBroker(kafkaConf) } diff --git a/server/config.go b/server/config.go index 2a502af8c..fab17b578 100644 --- a/server/config.go +++ b/server/config.go @@ -50,6 +50,9 @@ const ( DefaultMongoPingTimeout = 5 * time.Second DefaultMongoYorkieDatabase = "yorkie-meta" + DefaultKafkaTopic = "user-events" + DefaultKafkaWriteTimeout = 5 * time.Second + DefaultAdminUser = "admin" DefaultAdminPassword = "admin" DefaultSecretKey = "yorkie-secret" diff --git a/server/config.sample.yml b/server/config.sample.yml index e97418f97..eca070518 100644 --- a/server/config.sample.yml +++ b/server/config.sample.yml @@ -114,3 +114,6 @@ Kafka: # Topic is the message broker topic to use. Topic: "user-events" + + # WriteTimeout is the timeout for writing to the message broker. + WriteTimeout: "5s"