Skip to content

Commit 8d77fab

Browse files
committed
Add randomize_initial for randomizing the initial connection in source pool.
1 parent 43e191d commit 8d77fab

File tree

2 files changed

+16
-0
lines changed

2 files changed

+16
-0
lines changed

config.sample.toml

+6
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ backoff_max = "10s"
4242
# whether it's healthy or not.
4343
request_timeout = "100ms"
4444

45+
# Pick a random server from the [[sources]] list to connect first on boot
46+
# instead of the first one from the list. This can be useful for testing
47+
# servers in production environments that may never be consumed from except
48+
# during rare failover events.
49+
randomize_initial = false
50+
4551

4652
[[sources]]
4753
name = "node1"

init.go

+10
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"log"
88
"log/slog"
9+
"math/rand"
910
"net/http"
1011
"os"
1112
"plugin"
@@ -184,6 +185,15 @@ func initKafkaConfig(ko *koanf.Koanf) ([]relay.ConsumerGroupCfg, relay.ProducerC
184185
log.Fatalf("error unmarshalling `sources` config: %v", err)
185186
}
186187

188+
log.Printf("read config for %d servers in the source pool", len(src.Sources))
189+
if ko.Bool("source_pool.randomize_initial") {
190+
log.Println("randomizing source pool for initial connection")
191+
r := rand.New(rand.NewSource(time.Now().UnixNano()))
192+
r.Shuffle(len(src.Sources), func(i, j int) {
193+
src.Sources[i], src.Sources[j] = src.Sources[j], src.Sources[i]
194+
})
195+
}
196+
187197
// Read target Kafka config.
188198
var prod relay.ProducerCfg
189199
if err := ko.Unmarshal("target", &prod); err != nil {

0 commit comments

Comments
 (0)