Skip to content

Commit 29f256d

Browse files
committed
changefeedccl: better validation check for sarama config
Previously, we implement our own validation check for sarama config which may have missed certain cases. This patch changes an additional check by adding sarama's own exported validation function. Release note: none Epic: none
1 parent 3ff9177 commit 29f256d

File tree

1 file changed

+11
-0
lines changed

1 file changed

+11
-0
lines changed

pkg/ccl/changefeedccl/sink_kafka.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ func (j *compressionCodec) UnmarshalText(b []byte) error {
182182
return nil
183183
}
184184

185+
// saramaConfig is a custom struct which contains a selection of options chosen
186+
// from sarama.Config. This facilitates users with limited sarama
187+
// configurations.
185188
type saramaConfig struct {
186189
// These settings mirror ones in sarama config.
187190
// We just tag them w/ JSON annotations.
@@ -1157,13 +1160,21 @@ func buildKafkaConfig(
11571160
"failed to parse sarama config; check %s option", changefeedbase.OptKafkaSinkConfig)
11581161
}
11591162

1163+
// Note that the sarama.Config.Validate() below only logs an error in some
1164+
// cases, so we explicitly validate sarama config from our side.
11601165
if err := saramaCfg.Validate(); err != nil {
11611166
return nil, errors.Wrap(err, "invalid sarama configuration")
11621167
}
11631168

1169+
// Apply configures config based on saramaCfg.
11641170
if err := saramaCfg.Apply(config); err != nil {
11651171
return nil, errors.Wrap(err, "failed to apply kafka client configuration")
11661172
}
1173+
1174+
// Validate sarama.Config using sarama's own exported validation function.
1175+
if err := config.Validate(); err != nil {
1176+
return nil, errors.Wrap(err, "invalid sarama configuration")
1177+
}
11671178
return config, nil
11681179
}
11691180

0 commit comments

Comments
 (0)