Skip to content

Commit a1cd0a7

Browse files
committed
Apply minor cosmetic fixes.
- Declutter the `main()` func by moving init functions to the init file. - Minor variable name and comment changes for consistency.
1 parent f2fc876 commit a1cd0a7

File tree

4 files changed

+217
-215
lines changed

4 files changed

+217
-215
lines changed

consumer.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type consumer struct {
2525
offsetMgr *offsetManager
2626
nodeTracker *NodeTracker
2727

28-
l *slog.Logger
28+
log *slog.Logger
2929
}
3030

3131
// offsetManager is a holder for the topic offsets.
@@ -74,27 +74,27 @@ func (c *consumer) GetHealthy(ctx context.Context) (int, error) {
7474

7575
// reinit reinitializes the consumer group
7676
func (c *consumer) Connect(ctx context.Context, cfg ConsumerGroupCfg) error {
77-
c.l.Debug("reinitializing consumer group", "broker", cfg.BootstrapBrokers)
77+
c.log.Debug("reinitializing consumer group", "broker", cfg.BootstrapBrokers)
7878

7979
// tcp health check
8080
if ok := healthcheck(ctx, cfg.BootstrapBrokers, c.maxReqTime); !ok {
8181
return ErrorNoHealthy
8282
}
8383

84-
cl, err := initConsumerGroup(ctx, cfg, c.l)
84+
cl, err := initConsumerGroup(ctx, cfg, c.log)
8585
if err != nil {
8686
return err
8787
}
8888

8989
offsets := c.GetOffsets()
9090
if offsets != nil {
91-
err = leaveAndResetOffsets(ctx, cl, cfg, offsets, c.l)
91+
err = leaveAndResetOffsets(ctx, cl, cfg, offsets, c.log)
9292
if err != nil {
93-
c.l.Error("error resetting offsets", "err", err)
93+
c.log.Error("error resetting offsets", "err", err)
9494
return err
9595
}
9696

97-
cl, err = initConsumerGroup(ctx, cfg, c.l)
97+
cl, err = initConsumerGroup(ctx, cfg, c.log)
9898
if err != nil {
9999
return err
100100
}

initz.go

+171-73
Original file line numberDiff line numberDiff line change
@@ -1,107 +1,91 @@
11
package main
22

33
import (
4+
"bytes"
45
"context"
56
"fmt"
7+
"log"
68
"log/slog"
9+
"net/http"
710
"os"
811
"path/filepath"
912
"plugin"
1013
"strings"
14+
"time"
1115

1216
"github.com/VictoriaMetrics/metrics"
17+
"github.com/knadh/koanf/parsers/toml"
18+
"github.com/knadh/koanf/providers/file"
19+
"github.com/knadh/koanf/providers/posflag"
1320
"github.com/knadh/koanf/v2"
21+
flag "github.com/spf13/pflag"
1422
"github.com/twmb/franz-go/pkg/kgo"
1523
"github.com/zerodha/kaf-relay/filter"
1624
)
1725

18-
// getProducerClient returns a kafka producer client.
19-
func getProducerClient(ctx context.Context, cfg ProducerCfg, bCfg BackoffCfg, l *slog.Logger) (*kgo.Client, error) {
20-
opts := []kgo.Opt{
21-
kgo.ProduceRequestTimeout(cfg.SessionTimeout),
22-
kgo.RecordDeliveryTimeout(cfg.SessionTimeout), // break the :ProduceSync if it takes too long
23-
kgo.ProducerBatchMaxBytes(int32(cfg.MaxMessageBytes)),
24-
kgo.MaxBufferedRecords(cfg.FlushBatchSize),
25-
kgo.ProducerLinger(cfg.FlushFrequency),
26-
kgo.ProducerBatchCompression(getCompressionCodec(cfg.Compression)),
27-
kgo.SeedBrokers(cfg.BootstrapBrokers...),
28-
kgo.RequiredAcks(getAckPolicy(cfg.CommitAck)),
26+
func initConfig() (*koanf.Koanf, Config) {
27+
// Initialize config
28+
f := flag.NewFlagSet("config", flag.ContinueOnError)
29+
f.Usage = func() {
30+
fmt.Println(f.FlagUsages())
31+
os.Exit(0)
2932
}
3033

31-
// TCPAck/LeaderAck requires kafka deduplication to be turned off
32-
if !cfg.EnableIdempotency {
33-
opts = append(opts, kgo.DisableIdempotentWrite())
34+
f.StringSlice("config", []string{"config.toml"}, "path to one or more config files (will be merged in order)")
35+
f.String("mode", "single", "single | failover")
36+
f.Bool("stop-at-end", false, "stop relay at the end of offsets")
37+
f.StringSlice("filter", []string{}, "path to one or more filter providers")
38+
f.StringSlice("topic", []string{}, "one or more source:target topic names. Setting this overrides [topics] in the config file.")
39+
f.Bool("version", false, "show current version of the build")
40+
41+
if err := f.Parse(os.Args[1:]); err != nil {
42+
log.Fatalf("error loading flags: %v", err)
3443
}
3544

36-
opts = append(opts, kgo.RecordPartitioner(kgo.ManualPartitioner()))
45+
ko := koanf.New(".")
46+
if err := ko.Load(posflag.Provider(f, ".", ko), nil); err != nil {
47+
log.Fatalf("error reading flag config: %v", err)
48+
}
3749

38-
// Add authentication
39-
if cfg.EnableAuth {
40-
opts = appendSASL(opts, cfg.ClientCfg)
50+
// Version flag.
51+
if ko.Bool("version") {
52+
fmt.Println(buildString)
53+
os.Exit(0)
4154
}
4255

43-
if cfg.EnableTLS {
44-
if cfg.CACertPath == "" && cfg.ClientCertPath == "" && cfg.ClientKeyPath == "" {
45-
opts = append(opts, kgo.DialTLS())
46-
} else {
47-
tlsOpt, err := createTLSConfig(cfg.CACertPath, cfg.ClientCertPath, cfg.ClientKeyPath)
48-
if err != nil {
49-
return nil, err
50-
}
56+
if ko.Bool("stop-at-end") && ko.String("mode") == ModeFailover {
57+
log.Fatalf("`--stop-at-end` cannot be used with `failover` mode")
58+
}
5159

52-
// Set up TLS configuration
53-
opts = append(opts, tlsOpt)
60+
// Load one or more config files. Keys in each subsequent file is merged
61+
// into the previous file's keys.
62+
for _, f := range ko.Strings("config") {
63+
log.Printf("reading config from %s", f)
64+
if err := ko.Load(file.Provider(f), toml.Parser()); err != nil {
65+
log.Fatalf("error reading config: %v", err)
5466
}
5567
}
5668

57-
var (
58-
retries = 0
59-
backoff = getBackoffFn(bCfg)
60-
err error
61-
cl *kgo.Client
62-
)
63-
64-
// retry until we can connect to kafka
65-
outerLoop:
66-
for retries < cfg.MaxRetries || cfg.MaxRetries == IndefiniteRetry {
67-
select {
68-
case <-ctx.Done():
69-
break outerLoop
70-
default:
71-
cl, err = kgo.NewClient(opts...)
72-
if err != nil {
73-
l.Error("error creating producer client", "err", err)
74-
retries++
75-
waitTries(ctx, backoff(retries))
76-
continue
77-
}
78-
79-
// Get the destination topics
80-
var topics []string
81-
for _, v := range cfg.Topics {
82-
topics = append(topics, v)
83-
}
69+
var cfg Config
70+
if err := ko.Unmarshal("", &cfg); err != nil {
71+
log.Fatalf("error marshalling application config: %v", err)
72+
}
8473

85-
// test connectivity and ensures destination topics exists.
86-
err = testConnection(cl, cfg.SessionTimeout, topics, cfg.TopicsPartition)
87-
if err != nil {
88-
l.Error("error connecting to producer", "err", err)
89-
retries++
90-
waitTries(ctx, backoff(retries))
91-
continue
74+
// If there are topics in the commandline flags, override the ones read from the file.
75+
if topics := ko.Strings("topic"); len(topics) > 0 {
76+
mp := map[string]string{}
77+
for _, t := range topics {
78+
split := strings.Split(t, ":")
79+
if len(split) != 2 {
80+
log.Fatalf("invalid topic '%s'. Should be in the format 'source:target'", t)
9281
}
9382

94-
if err == nil {
95-
break outerLoop
96-
}
83+
mp[split[0]] = split[1]
9784
}
85+
cfg.Topics = mp
9886
}
9987

100-
if err != nil {
101-
return nil, err
102-
}
103-
104-
return cl, nil
88+
return ko, cfg
10589
}
10690

10791
// initProducer initializes the kafka producer client.
@@ -160,7 +144,6 @@ func initConsumerGroup(ctx context.Context, cfg ConsumerGroupCfg, l *slog.Logger
160144
opts = append(opts, kgo.WithLogger(kgo.BasicLogger(os.Stdout, kgo.LogLevelDebug, nil)))
161145
}
162146

163-
// Add authentication
164147
if cfg.EnableAuth {
165148
opts = appendSASL(opts, cfg.ClientCfg)
166149
}
@@ -244,7 +227,123 @@ func initFilterProviders(names []string, ko *koanf.Koanf, log *slog.Logger) (map
244227
return out, nil
245228
}
246229

247-
// getClient returns franz-go client with default config
230+
func initMetricsServer(relay *Relay, addr string) http.Server {
231+
mux := http.NewServeMux()
232+
mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
233+
buf := new(bytes.Buffer)
234+
relay.getMetrics(buf)
235+
236+
w.Header().Set("Content-Type", "text/plain")
237+
w.WriteHeader(http.StatusOK)
238+
buf.WriteTo(w)
239+
})
240+
241+
srv := http.Server{
242+
Addr: addr,
243+
Handler: mux,
244+
ReadTimeout: 10 * time.Second,
245+
WriteTimeout: 10 * time.Second,
246+
}
247+
248+
go func() {
249+
err := srv.ListenAndServe()
250+
if err != nil && err != http.ErrServerClosed {
251+
log.Printf("error starting server: %v", err)
252+
}
253+
}()
254+
255+
return srv
256+
}
257+
258+
// getProducerClient returns a Kafka producer client.
259+
func getProducerClient(ctx context.Context, cfg ProducerCfg, bCfg BackoffCfg, l *slog.Logger) (*kgo.Client, error) {
260+
opts := []kgo.Opt{
261+
kgo.ProduceRequestTimeout(cfg.SessionTimeout),
262+
kgo.RecordDeliveryTimeout(cfg.SessionTimeout), // break the :ProduceSync if it takes too long
263+
kgo.ProducerBatchMaxBytes(int32(cfg.MaxMessageBytes)),
264+
kgo.MaxBufferedRecords(cfg.FlushBatchSize),
265+
kgo.ProducerLinger(cfg.FlushFrequency),
266+
kgo.ProducerBatchCompression(getCompressionCodec(cfg.Compression)),
267+
kgo.SeedBrokers(cfg.BootstrapBrokers...),
268+
kgo.RequiredAcks(getAckPolicy(cfg.CommitAck)),
269+
}
270+
271+
// TCPAck/LeaderAck requires Kafka deduplication to be turned off.
272+
if !cfg.EnableIdempotency {
273+
opts = append(opts, kgo.DisableIdempotentWrite())
274+
}
275+
276+
opts = append(opts, kgo.RecordPartitioner(kgo.ManualPartitioner()))
277+
278+
if cfg.EnableAuth {
279+
opts = appendSASL(opts, cfg.ClientCfg)
280+
}
281+
282+
if cfg.EnableTLS {
283+
if cfg.CACertPath == "" && cfg.ClientCertPath == "" && cfg.ClientKeyPath == "" {
284+
opts = append(opts, kgo.DialTLS())
285+
} else {
286+
tlsOpt, err := createTLSConfig(cfg.CACertPath, cfg.ClientCertPath, cfg.ClientKeyPath)
287+
if err != nil {
288+
return nil, err
289+
}
290+
291+
// Set up TLS configuration
292+
opts = append(opts, tlsOpt)
293+
}
294+
}
295+
296+
var (
297+
retries = 0
298+
backoff = getBackoffFn(bCfg)
299+
err error
300+
cl *kgo.Client
301+
)
302+
303+
// Retry until a successful connection.
304+
outerLoop:
305+
for retries < cfg.MaxRetries || cfg.MaxRetries == IndefiniteRetry {
306+
select {
307+
case <-ctx.Done():
308+
break outerLoop
309+
default:
310+
cl, err = kgo.NewClient(opts...)
311+
if err != nil {
312+
l.Error("error creating producer client", "err", err)
313+
retries++
314+
waitTries(ctx, backoff(retries))
315+
continue
316+
}
317+
318+
// Get the destination topics
319+
var topics []string
320+
for _, v := range cfg.Topics {
321+
topics = append(topics, v)
322+
}
323+
324+
// Test connectivity and ensure destination topics exists.
325+
err = testConnection(cl, cfg.SessionTimeout, topics, cfg.TopicsPartition)
326+
if err != nil {
327+
l.Error("error connecting to producer", "err", err)
328+
retries++
329+
waitTries(ctx, backoff(retries))
330+
continue
331+
}
332+
333+
if err == nil {
334+
break outerLoop
335+
}
336+
}
337+
}
338+
339+
if err != nil {
340+
return nil, err
341+
}
342+
343+
return cl, nil
344+
}
345+
346+
// getClient returns franz-go client with default config.
248347
func getClient(cfg ConsumerGroupCfg) (*kgo.Client, error) {
249348
opts := []kgo.Opt{
250349
kgo.SeedBrokers(cfg.BootstrapBrokers...),
@@ -256,7 +355,6 @@ func getClient(cfg ConsumerGroupCfg) (*kgo.Client, error) {
256355
opts = append(opts, kgo.WithLogger(kgo.BasicLogger(os.Stdout, kgo.LogLevelDebug, nil)))
257356
}
258357

259-
// Add authentication
260358
if cfg.EnableAuth {
261359
opts = appendSASL(opts, cfg.ClientCfg)
262360
}

0 commit comments

Comments
 (0)