Skip to content

Commit 353fded

Browse files
craig[bot]wenyihu6
andcommitted
Merge #118643
118643: changefeedccl: allow per changefeed kafka quota config r=rharding6373 a=wenyihu6 Previously, users were limited to setting a single kafka quota configuration for cockroachdb which was then applied and restricting all changefeeds. This patch introduces a new changefeed configuration option, allowing users to define client id for different changefeeds, allowing users to specify different kafka quota configurations for different changefeeds. To use it, users can specify a unique client ID using `kafka_sink_config` and configure different quota settings on kafka server based on https://kafka.apache.org/documentation/#quotas. ``` CREATE CHANGEFEED FOR foo WITH kafka_sink_config='{"ClientID": "clientID1"}' ``` Fixes: #92290 Release note: `kafka_sink_config` now supports specifying a different client ID for different changefeeds, enabling users to define distinct kafka quota configurations for various changefeeds. For any kafka versions >= V1_0_0_0 ([KIP-190: Handle client-ids consistently between clients and brokers](https://cwiki.apache.org/confluence/display/KAFKA/KIP-190%3A+Handle+client-ids+consistently+between+clients+and+brokers)), any string can be used as client ID. For earlier kafka versions, clientID can only contain characters [A-Za-z0-9._-] are acceptable. For example, ``` CREATE CHANGEFEED FOR ... WITH kafka_sink_config='{"ClientID": "clientID1"}' ``` Co-authored-by: Wenyi Hu <[email protected]>
2 parents f8a5841 + 9a05851 commit 353fded

File tree

2 files changed

+28
-2
lines changed

2 files changed

+28
-2
lines changed

pkg/ccl/changefeedccl/sink_kafka.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ func (j *compressionCodec) UnmarshalText(b []byte) error {
186186
// from sarama.Config. This facilitates users with limited sarama
187187
// configurations.
188188
type saramaConfig struct {
189+
ClientID string `json:",omitempty"`
189190
// These settings mirror ones in sarama config.
190191
// We just tag them w/ JSON annotations.
191192
// Flush describes settings specific to producer flushing.
@@ -250,7 +251,7 @@ func defaultSaramaConfig() *saramaConfig {
250251
// this workaround is the one that's been running in roachtests and I'd want
251252
// to test this one more before changing it.
252253
config.Flush.MaxMessages = 1000
253-
254+
config.ClientID = "CockroachDB"
254255
return config
255256
}
256257

@@ -837,6 +838,8 @@ func (c *saramaConfig) Apply(kafka *sarama.Config) error {
837838
kafka.Producer.Flush.Messages = c.Flush.Messages
838839
kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency)
839840
kafka.Producer.Flush.MaxMessages = c.Flush.MaxMessages
841+
kafka.ClientID = c.ClientID
842+
840843
if c.Version != "" {
841844
parsedVersion, err := sarama.ParseKafkaVersion(c.Version)
842845
if err != nil {
@@ -1092,7 +1095,6 @@ func buildKafkaConfig(
10921095
return nil, err
10931096
}
10941097
config := sarama.NewConfig()
1095-
config.ClientID = `CockroachDB`
10961098
config.Producer.Return.Successes = true
10971099
config.Producer.Partitioner = newChangefeedPartitioner
10981100
// Do not fetch metadata for all topics but just for the necessary ones.

pkg/ccl/changefeedccl/sink_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,22 @@ func TestSaramaConfigOptionParsing(t *testing.T) {
632632
cfg, err = getSaramaConfig(opts)
633633
require.NoError(t, err)
634634
require.NoError(t, cfg.Validate())
635+
636+
saramaCfg := sarama.NewConfig()
637+
opts = `{"ClientID": "clientID1"}`
638+
cfg, _ = getSaramaConfig(opts)
639+
err = cfg.Apply(saramaCfg)
640+
require.NoError(t, err)
641+
require.NoError(t, cfg.Validate())
642+
require.NoError(t, saramaCfg.Validate())
643+
644+
opts = `{"Flush": {"Messages": 1000, "Frequency": "1s"}, "ClientID": "clientID1"}`
645+
cfg, _ = getSaramaConfig(opts)
646+
err = cfg.Apply(saramaCfg)
647+
require.NoError(t, err)
648+
require.NoError(t, cfg.Validate())
649+
require.NoError(t, saramaCfg.Validate())
650+
require.True(t, cfg.ClientID == "clientID1")
635651
})
636652
t.Run("validate returns error for bad flush configuration", func(t *testing.T) {
637653
opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1000}}`)
@@ -644,6 +660,14 @@ func TestSaramaConfigOptionParsing(t *testing.T) {
644660
cfg, err = getSaramaConfig(opts)
645661
require.NoError(t, err)
646662
require.Error(t, cfg.Validate())
663+
664+
opts = `{"Version": "0.8.2.0", "ClientID": "bad_client_id*"}`
665+
saramaCfg := sarama.NewConfig()
666+
cfg, _ = getSaramaConfig(opts)
667+
err = cfg.Apply(saramaCfg)
668+
require.NoError(t, err)
669+
require.NoError(t, cfg.Validate())
670+
require.Error(t, saramaCfg.Validate())
647671
})
648672
t.Run("apply parses valid version", func(t *testing.T) {
649673
opts := changefeedbase.SinkSpecificJSONConfig(`{"version": "0.8.2.0"}`)

0 commit comments

Comments
 (0)