@@ -9,12 +9,14 @@ import (
9
9
"net/http"
10
10
"os"
11
11
"os/signal"
12
+ "strings"
12
13
"syscall"
13
14
"time"
14
15
15
16
"github.com/VictoriaMetrics/metrics"
16
17
"github.com/knadh/koanf/parsers/toml"
17
18
"github.com/knadh/koanf/providers/file"
19
+ "github.com/knadh/koanf/providers/posflag"
18
20
"github.com/knadh/koanf/v2"
19
21
flag "github.com/spf13/pflag"
20
22
)
@@ -31,52 +33,68 @@ func main() {
31
33
os .Exit (0 )
32
34
}
33
35
34
- var (
35
- configPath string
36
- mode string
37
- stopAtEnd bool
38
- filterPaths []string
39
- )
40
- f .StringVar (& configPath , "config" , "config.toml" , "Path to the TOML configuration file" )
41
- f .StringVar (& mode , "mode" , "single" , "single/failover" )
42
- f .BoolVar (& stopAtEnd , "stop-at-end" , false , "Stop relay at the end of offsets" )
43
- f .StringSliceVar (& filterPaths , "filter" , []string {}, "Path to filter providers. Can specify multiple values." )
44
- f .Bool ("version" , false , "Current version of the build" )
36
+ f .StringSlice ("config" , []string {"config.toml" }, "path to one or more config files (will be merged in order)" )
37
+ f .String ("mode" , "single" , "single | failover" )
38
+ f .Bool ("stop-at-end" , false , "stop relay at the end of offsets" )
39
+ f .StringSlice ("filter" , []string {}, "path to one or more filter providers" )
40
+ f .StringSlice ("topic" , []string {}, "one or more source:target topic names. Setting this overrides [topics] in the config file." )
41
+ f .Bool ("version" , false , "show current version of the build" )
45
42
46
43
if err := f .Parse (os .Args [1 :]); err != nil {
47
44
log .Fatalf ("error loading flags: %v" , err )
48
45
}
49
46
47
+ ko := koanf .New ("." )
48
+ if err := ko .Load (posflag .Provider (f , "." , ko ), nil ); err != nil {
49
+ log .Fatalf ("error reading flag config: %v" , err )
50
+ }
51
+
50
52
// Version flag.
51
- if ok , _ := f . GetBool ("version" ); ok {
53
+ if ko . Bool ("version" ) {
52
54
fmt .Println (buildString )
53
55
os .Exit (0 )
54
56
}
55
57
56
- if stopAtEnd && mode == ModeFailover {
58
+ if ko . Bool ( "stop-at-end" ) && ko . String ( " mode" ) == ModeFailover {
57
59
log .Fatalf ("`--stop-at-end` cannot be used with `failover` mode" )
58
60
}
59
61
60
- // Load the config file.
61
- ko := koanf .New ("." )
62
- log .Printf ("reading config: %s" , configPath )
63
- if err := ko .Load (file .Provider (configPath ), toml .Parser ()); err != nil {
64
- log .Fatalf ("error reading config: %v" , err )
62
+ // Load one or more config files. Keys in each subsequent file is merged
63
+ // into the previous file's keys.
64
+ for _ , f := range ko .Strings ("config" ) {
65
+ log .Printf ("reading config from %s" , f )
66
+ if err := ko .Load (file .Provider (f ), toml .Parser ()); err != nil {
67
+ log .Fatalf ("error reading config: %v" , err )
68
+ }
65
69
}
66
70
67
71
var cfg Config
68
72
if err := ko .Unmarshal ("" , & cfg ); err != nil {
69
73
log .Fatalf ("error marshalling application config: %v" , err )
70
74
}
71
75
72
- // setup logger
76
+ // If there are topics in the commandline flags, override the ones read from the file.
77
+ if topics := ko .Strings ("topic" ); len (topics ) > 0 {
78
+ mp := map [string ]string {}
79
+ for _ , t := range topics {
80
+ split := strings .Split (t , ":" )
81
+ if len (split ) != 2 {
82
+ log .Fatalf ("invalid topic '%s'. Should be in the format 'source:target'" , t )
83
+ }
84
+
85
+ mp [split [0 ]] = split [1 ]
86
+ }
87
+ cfg .Topics = mp
88
+ }
89
+
90
+ // Initialized the structured logger.
73
91
logger := slog .New (slog .NewJSONHandler (os .Stdout , & slog.HandlerOptions {
74
92
AddSource : false ,
75
93
Level : cfg .App .LogLevel ,
76
94
}))
77
95
78
- // setup filter providers
79
- filters , err := initFilterProviders (filterPaths , ko , logger )
96
+ // Load the optional filter providers.
97
+ filters , err := initFilterProviders (ko . Strings ( "filter" ) , ko , logger )
80
98
if err != nil {
81
99
log .Fatalf ("error initializing filter provider: %v" , err )
82
100
}
@@ -153,7 +171,7 @@ func main() {
153
171
lagThreshold : cfg .App .LagThreshold ,
154
172
maxReqTime : cfg .App .MaxRequestDuration ,
155
173
156
- stopAtEnd : stopAtEnd ,
174
+ stopAtEnd : ko . Bool ( "stop-at-end" ) ,
157
175
srcOffsets : make (map [string ]map [int32 ]int64 ),
158
176
destOffsets : destOffsets .KOffsets (),
159
177
0 commit comments