Skip to content

Commit 75bfd41

Browse files
use many config files (#724)
* use many config files * update docs about overriding configurations
1 parent fe295f9 commit 75bfd41

File tree

10 files changed

+283
-18
lines changed

10 files changed

+283
-18
lines changed

Diff for: cfg/config.go

+44-9
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import (
1313

1414
"github.com/bitly/go-simplejson"
1515
"github.com/ozontech/file.d/logger"
16-
"sigs.k8s.io/yaml"
16+
"gopkg.in/yaml.v2"
17+
k8s_yaml "sigs.k8s.io/yaml"
1718
)
1819

1920
const trueValue = "true"
@@ -77,22 +78,37 @@ func NewConfig() *Config {
7778
}
7879
}
7980

80-
func NewConfigFromFile(path string) *Config {
81-
logger.Infof("reading config %q", path)
82-
yamlContents, err := os.ReadFile(path)
81+
func NewConfigFromFile(paths []string) *Config {
82+
mergedConfig := make(map[interface{}]interface{})
83+
84+
for _, path := range paths {
85+
logger.Infof("reading config %q", path)
86+
yamlContents, err := os.ReadFile(path)
87+
if err != nil {
88+
logger.Fatalf("can't read config file %q: %s", path, err)
89+
}
90+
var currentConfig map[interface{}]interface{}
91+
if err := yaml.Unmarshal(yamlContents, &currentConfig); err != nil {
92+
logger.Fatalf("can't parse config file yaml %q: %s", path, err)
93+
}
94+
95+
mergedConfig = mergeYAMLs(mergedConfig, currentConfig)
96+
}
97+
98+
mergedYAML, err := yaml.Marshal(mergedConfig)
8399
if err != nil {
84-
logger.Fatalf("can't read config file %q: %s", path, err)
100+
logger.Fatalf("can't marshal merged config to YAML: %s", err)
85101
}
86102

87-
jsonContents, err := yaml.YAMLToJSON(yamlContents)
103+
jsonContents, err := k8s_yaml.YAMLToJSON(mergedYAML)
88104
if err != nil {
89-
logger.Infof("config content:\n%s", logger.Numerate(string(yamlContents)))
90-
logger.Fatalf("can't parse config file yaml %q: %s", path, err.Error())
105+
logger.Infof("config content:\n%s", logger.Numerate(string(mergedYAML)))
106+
logger.Fatalf("can't parse config file yaml %q: %s", paths, err.Error())
91107
}
92108

93109
object, err := simplejson.NewJson(jsonContents)
94110
if err != nil {
95-
logger.Fatalf("can't convert config to json %q: %s", path, err.Error())
111+
logger.Fatalf("can't convert config to json %q: %s", paths, err.Error())
96112
}
97113

98114
err = applyEnvs(object)
@@ -637,3 +653,22 @@ func CompileRegex(s string) (*regexp.Regexp, error) {
637653

638654
return regexp.Compile(s[1 : len(s)-1])
639655
}
656+
657+
func mergeYAMLs(a, b map[interface{}]interface{}) map[interface{}]interface{} {
658+
merged := make(map[interface{}]interface{})
659+
for k, v := range a {
660+
merged[k] = v
661+
}
662+
for k, v := range b {
663+
if existingValue, exists := merged[k]; exists {
664+
if existingMap, ok := existingValue.(map[interface{}]interface{}); ok {
665+
if newMap, ok := v.(map[interface{}]interface{}); ok {
666+
merged[k] = mergeYAMLs(existingMap, newMap)
667+
continue
668+
}
669+
}
670+
}
671+
merged[k] = v
672+
}
673+
return merged
674+
}

Diff for: cfg/config_test.go

+145-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cfg
33
import (
44
"encoding/json"
55
"errors"
6+
"reflect"
67
"testing"
78
"time"
89

@@ -11,16 +12,24 @@ import (
1112
"github.com/stretchr/testify/require"
1213
)
1314

14-
func NewTestConfig(name string) *Config {
15-
return NewConfigFromFile("../testdata/config/" + name)
15+
func NewTestConfig(names []string) *Config {
16+
configFiles := make([]string, 0, len(names))
17+
for _, name := range names {
18+
configFiles = append(configFiles, "../testdata/config/"+name)
19+
}
20+
return NewConfigFromFile(configFiles)
1621
}
1722

1823
func TestSimple(t *testing.T) {
19-
c := NewTestConfig("e2e.yaml")
24+
c := NewTestConfig([]string{"e2e.yaml", "e2e.override.yaml"})
2025

2126
assert.NotNil(t, c, "config loading should't return nil")
22-
2327
assert.Equal(t, 1, len(c.Pipelines), "pipelines count isn't match")
28+
29+
// check override config
30+
outputType, err := c.Pipelines["test"].Raw.Get("output").Get("type").String()
31+
assert.Nil(t, err, "cannot get output type")
32+
assert.Equal(t, "devnull", outputType, "output type is not overrided")
2433
}
2534

2635
type intDefault struct {
@@ -643,3 +652,135 @@ func TestExpression_UnmarshalJSON(t *testing.T) {
643652
require.Equal(t, Expression("2"), val.E2)
644653
require.Equal(t, Expression("2+2"), val.E3)
645654
}
655+
656+
func TestMergeYAMLs(t *testing.T) {
657+
tests := []struct {
658+
name string
659+
a map[interface{}]interface{}
660+
b map[interface{}]interface{}
661+
expected map[interface{}]interface{}
662+
}{
663+
{
664+
name: "simple merge",
665+
a: map[interface{}]interface{}{
666+
"key1": "value1",
667+
"key2": "value2",
668+
},
669+
b: map[interface{}]interface{}{
670+
"key2": "newValue2",
671+
"key3": "value3",
672+
},
673+
expected: map[interface{}]interface{}{
674+
"key1": "value1",
675+
"key2": "newValue2",
676+
"key3": "value3",
677+
},
678+
},
679+
{
680+
name: "nested maps",
681+
a: map[interface{}]interface{}{
682+
"key1": map[interface{}]interface{}{
683+
"subkey1": "subvalue1",
684+
},
685+
},
686+
b: map[interface{}]interface{}{
687+
"key1": map[interface{}]interface{}{
688+
"subkey2": "subvalue2",
689+
},
690+
"key2": "value2",
691+
},
692+
expected: map[interface{}]interface{}{
693+
"key1": map[interface{}]interface{}{
694+
"subkey1": "subvalue1",
695+
"subkey2": "subvalue2",
696+
},
697+
"key2": "value2",
698+
},
699+
},
700+
{
701+
name: "overwriting nested maps",
702+
a: map[interface{}]interface{}{
703+
"key1": map[interface{}]interface{}{
704+
"subkey1": "subvalue1",
705+
},
706+
},
707+
b: map[interface{}]interface{}{
708+
"key1": map[interface{}]interface{}{
709+
"subkey1": "newSubvalue1",
710+
"subkey2": "subvalue2",
711+
},
712+
},
713+
expected: map[interface{}]interface{}{
714+
"key1": map[interface{}]interface{}{
715+
"subkey1": "newSubvalue1",
716+
"subkey2": "subvalue2",
717+
},
718+
},
719+
},
720+
{
721+
name: "empty maps",
722+
a: map[interface{}]interface{}{},
723+
b: map[interface{}]interface{}{},
724+
expected: map[interface{}]interface{}{
725+
// Expecting an empty map
726+
},
727+
},
728+
{
729+
name: "a is empty",
730+
a: map[interface{}]interface{}{},
731+
b: map[interface{}]interface{}{
732+
"key1": "value1",
733+
},
734+
expected: map[interface{}]interface{}{
735+
"key1": "value1",
736+
},
737+
},
738+
{
739+
name: "b is empty",
740+
a: map[interface{}]interface{}{
741+
"key1": "value1",
742+
},
743+
b: map[interface{}]interface{}{},
744+
expected: map[interface{}]interface{}{
745+
"key1": "value1",
746+
},
747+
},
748+
{
749+
name: "override slice",
750+
a: map[interface{}]interface{}{
751+
"key1": []interface{}{"value1", "value2"},
752+
},
753+
b: map[interface{}]interface{}{
754+
"key1": []interface{}{"newValue1", "newValue2"},
755+
},
756+
expected: map[interface{}]interface{}{
757+
"key1": []interface{}{"newValue1", "newValue2"},
758+
},
759+
},
760+
{
761+
name: "merge slice with map",
762+
a: map[interface{}]interface{}{
763+
"key1": []interface{}{"value1", "value2"},
764+
},
765+
b: map[interface{}]interface{}{
766+
"key1": map[interface{}]interface{}{
767+
"subkey1": "subvalue1",
768+
},
769+
},
770+
expected: map[interface{}]interface{}{
771+
"key1": map[interface{}]interface{}{
772+
"subkey1": "subvalue1",
773+
},
774+
},
775+
},
776+
}
777+
778+
for _, tt := range tests {
779+
t.Run(tt.name, func(t *testing.T) {
780+
result := mergeYAMLs(tt.a, tt.b)
781+
if !reflect.DeepEqual(result, tt.expected) {
782+
t.Errorf("expected %v, got %v", tt.expected, result)
783+
}
784+
})
785+
}
786+
}

Diff for: cmd/file.d/file.d.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ var (
6565
fileD *fd.FileD
6666
exit = make(chan bool)
6767

68-
config = kingpin.Flag("config", `Config file name`).Required().ExistingFile()
68+
config = kingpin.Flag("config", `Config file name (to add a config file you can repeat the argument)`).Required().ExistingFiles()
6969
http = kingpin.Flag("http", `HTTP listen addr eg. ":9000", "off" to disable`).Default(":9000").String()
7070
memLimitRatio = kingpin.Flag(
7171
"mem-limit-ratio",

Diff for: cmd/file.d/file.d_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ const testTime = 10 * time.Minute
7070
// E.g. keep this test running while you are sleeping :)
7171
func TestEndToEnd(t *testing.T) {
7272
configFilename := "./../testdata/config/e2e.yaml"
73+
configOverrideFilename := "./../testdata/config/e2e.override.yaml"
7374
iterationInterval := time.Second * 10
7475
writerCount := 8
7576
fileCount := 8
@@ -85,7 +86,7 @@ func TestEndToEnd(t *testing.T) {
8586
filesDir := t.TempDir()
8687
offsetsDir := t.TempDir()
8788

88-
config := cfg.NewConfigFromFile(configFilename)
89+
config := cfg.NewConfigFromFile([]string{configFilename, configOverrideFilename})
8990
input := config.Pipelines["test"].Raw.Get("input")
9091
input.Set("watching_dir", filesDir)
9192
input.Set("offsets_file", filepath.Join(offsetsDir, "offsets.yaml"))

Diff for: docs/configuring.idoc.md

+42
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,48 @@ If `-http=':9090'` debug endpoints will be:
4444

4545
`http://127.0.0.1:9090/pipelines/http_file/2/sample` - for the join plugin
4646

47+
### Overriding configurations
48+
49+
You can use multiple configuration files. This allows you to define a base configuration (e.g., common.yaml) and override or extend it with additional configurations (e.g., local.yaml).
50+
51+
```
52+
./file.d
53+
--config=common.yaml
54+
--config=local.yaml
55+
```
56+
57+
```yaml
58+
# common.yaml
59+
pipelines:
60+
test1:
61+
input:
62+
...
63+
actions:
64+
- type: discard
65+
output:
66+
type: kafka
67+
68+
# local.yaml
69+
pipelines:
70+
test1:
71+
actions:
72+
- type: modify
73+
output:
74+
type: file
75+
76+
# result
77+
pipelines:
78+
test1:
79+
input:
80+
...
81+
actions:
82+
- type: modify
83+
output:
84+
type: file
85+
```
86+
87+
Arrays (or lists) are usually replaced entirely when merging configurations (e.g., actions). Dictionaries (or maps), on the other hand, are typically merged (e.g., output.type).
88+
4789
### Overriding by environment variables
4890

4991
`file.d` can override config fields if you specify environment variables with `FILED_` prefix.

Diff for: docs/configuring.md

+42
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,48 @@ If `-http=':9090'` debug endpoints will be:
4444

4545
`http://127.0.0.1:9090/pipelines/http_file/2/sample` - for the join plugin
4646

47+
### Overriding configurations
48+
49+
You can use multiple configuration files. This allows you to define a base configuration (e.g., common.yaml) and override or extend it with additional configurations (e.g., local.yaml).
50+
51+
```
52+
./file.d
53+
--config=common.yaml
54+
--config=local.yaml
55+
```
56+
57+
```yaml
58+
# common.yaml
59+
pipelines:
60+
test1:
61+
input:
62+
...
63+
actions:
64+
- type: discard
65+
output:
66+
type: kafka
67+
68+
# local.yaml
69+
pipelines:
70+
test1:
71+
actions:
72+
- type: modify
73+
output:
74+
type: file
75+
76+
# result
77+
pipelines:
78+
test1:
79+
input:
80+
...
81+
actions:
82+
- type: modify
83+
output:
84+
type: file
85+
```
86+
87+
Arrays (or lists) are usually replaced entirely when merging configurations (e.g., actions). Dictionaries (or maps), on the other hand, are typically merged (e.g., output.type).
88+
4789
### Overriding by environment variables
4890

4991
`file.d` can override config fields if you specify environment variables with `FILED_` prefix.

Diff for: e2e/start_work_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func TestE2EStabilityWorkCase(t *testing.T) {
174174
}
175175

176176
func startForTest(t *testing.T, test E2ETest, num int) *fd.FileD {
177-
conf := cfg.NewConfigFromFile(test.cfgPath)
177+
conf := cfg.NewConfigFromFile([]string{test.cfgPath})
178178
if _, ok := conf.Pipelines[test.name]; !ok {
179179
log.Fatalf("pipeline name must be named the same as the name of the test")
180180
}

0 commit comments

Comments
 (0)