Skip to content

Commit d4dfbf8

Browse files
committed
changefeedccl: fix avro schema bug
Fix bug in avro schema generation introduced by #139655, which caused extraneous fields to appear in the schema. Epic: none Release note: None
1 parent 2e7d5a6 commit d4dfbf8

File tree

2 files changed

+40
-3
lines changed

2 files changed

+40
-3
lines changed

pkg/ccl/changefeedccl/avro/avro.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,10 @@ type EnvelopeOpts struct {
180180
type EnvelopeRecord struct {
181181
Record
182182

183-
// TODO: opts should be private
184-
Opts EnvelopeOpts
185-
Before, After, Rec *DataRecord
183+
// NOTE: this struct gets serialized to json, but we still need to be able
184+
// to access these fields outside this package, so we hide them.
185+
Opts EnvelopeOpts `json:"-"`
186+
Before, After, Rec *DataRecord `json:"-"`
186187
}
187188

188189
// typeToSchema converts a database type to an avro field

pkg/ccl/changefeedccl/encoder_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
gosql "database/sql"
1111
"encoding/base64"
12+
gojson "encoding/json"
1213
"fmt"
1314
"math/rand"
1415
"net/url"
@@ -770,6 +771,41 @@ func TestAvroSchemaNamespace(t *testing.T) {
770771
cdcTest(t, testFn, feedTestForceSink("kafka"), feedTestUseRootUserConnection)
771772
}
772773

774+
func TestAvroSchemaHasExpectedTopLevelFields(t *testing.T) {
775+
defer leaktest.AfterTest(t)()
776+
defer log.Scope(t).Close(t)
777+
778+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
779+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
780+
sqlDB.Exec(t, `CREATE DATABASE movr`)
781+
sqlDB.Exec(t, `CREATE TABLE movr.drivers (id INT PRIMARY KEY, name STRING)`)
782+
sqlDB.Exec(t,
783+
`INSERT INTO movr.drivers VALUES (1, 'Alice')`,
784+
)
785+
786+
foo := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+
787+
`WITH format=%s`, changefeedbase.OptFormatAvro))
788+
defer closeFeed(t, foo)
789+
790+
assertPayloads(t, foo, []string{
791+
`drivers: {"id":{"long":1}}->{"after":{"drivers":{"id":{"long":1},"name":{"string":"Alice"}}}}`,
792+
})
793+
794+
reg := foo.(*kafkaFeed).registry
795+
796+
schemaJSON := reg.SchemaForSubject(`drivers-value`)
797+
var schema map[string]any
798+
require.NoError(t, gojson.Unmarshal([]byte(schemaJSON), &schema))
799+
var keys []string
800+
for k := range schema {
801+
keys = append(keys, k)
802+
}
803+
require.ElementsMatch(t, keys, []string{"type", "name", "fields"})
804+
}
805+
806+
cdcTest(t, testFn, feedTestForceSink("kafka"), feedTestUseRootUserConnection)
807+
}
808+
773809
func TestTableNameCollision(t *testing.T) {
774810
defer leaktest.AfterTest(t)()
775811
defer log.Scope(t).Close(t)

0 commit comments

Comments
 (0)