Skip to content

Commit 5fac087

Browse files
authored
Merge pull request #26 from zerodha/randomize-initial
Add `randomize_initial` for randomizing the initial connection.
2 parents 43e191d + 0febbc4 commit 5fac087

File tree

2 files changed

+22
-0
lines changed

2 files changed

+22
-0
lines changed

config.sample.toml

+6
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ backoff_max = "10s"
4242
# whether it's healthy or not.
4343
request_timeout = "100ms"
4444

45+
# Pick a random server from the [[sources]] list to connect first on boot
46+
# instead of the first one from the list. This can be useful for testing
47+
# servers in production environments that may never be consumed from except
48+
# during rare failover events.
49+
randomize_initial = false
50+
4551

4652
[[sources]]
4753
name = "node1"

init.go

+16
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"log"
88
"log/slog"
9+
"math/rand"
910
"net/http"
1011
"os"
1112
"plugin"
@@ -184,6 +185,21 @@ func initKafkaConfig(ko *koanf.Koanf) ([]relay.ConsumerGroupCfg, relay.ProducerC
184185
log.Fatalf("error unmarshalling `sources` config: %v", err)
185186
}
186187

188+
log.Printf("read config for %d servers in the source pool", len(src.Sources))
189+
if ko.Bool("source_pool.randomize_initial") {
190+
log.Println("randomizing source pool for initial connection")
191+
r := rand.New(rand.NewSource(time.Now().UnixNano()))
192+
r.Shuffle(len(src.Sources), func(i, j int) {
193+
src.Sources[i], src.Sources[j] = src.Sources[j], src.Sources[i]
194+
})
195+
}
196+
197+
// If it's single mode, eliminate all servers in the pool except one
198+
// to disable healthcehcks and failover.
199+
if ko.String("mode") == relay.ModeSingle {
200+
src.Sources = src.Sources[:1]
201+
}
202+
187203
// Read target Kafka config.
188204
var prod relay.ProducerCfg
189205
if err := ko.Unmarshal("target", &prod); err != nil {

0 commit comments

Comments
 (0)