Skip to content

Commit c2a78c1

Browse files
committed
feat: allow adding nested fields when refreshing schema
1 parent f98f6c4 commit c2a78c1

File tree

8 files changed

+873
-44
lines changed

8 files changed

+873
-44
lines changed

e2e_test/source_inline/kafka/protobuf/alter_source.slt

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ rpk topic delete pb_alter_source_test || true; \
1010
system ok
1111
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "pb_alter_source_test" 20 user
1212

13+
statement ok
14+
DROP SOURCE IF EXISTS src_user CASCADE;
15+
1316
statement ok
1417
CREATE SOURCE src_user
1518
INCLUDE timestamp -- include explicitly here to test a bug found in https://github.com/risingwavelabs/risingwave/pull/17293
@@ -24,6 +27,12 @@ FORMAT PLAIN ENCODE PROTOBUF(
2427
);
2528

2629
# Create source with generated column to test ALTER SOURCE REFRESH SCHEMA with generated columns
30+
statement ok
31+
DROP SOURCE IF EXISTS src_user_gen CASCADE;
32+
33+
statement ok
34+
DROP TABLE IF EXISTS t_user CASCADE;
35+
2736
statement ok
2837
CREATE SOURCE src_user_gen (*, t int as id+1)
2938
INCLUDE timestamp
@@ -168,5 +177,81 @@ DROP SOURCE src_user;
168177
statement ok
169178
DROP SOURCE src_user_gen;
170179

180+
# Test nested struct schema evolution (compatible changes)
181+
system ok
182+
rpk topic delete pb_nested_alter_test || true; \
183+
(rpk sr subject delete 'pb_nested_alter_test-value' && rpk sr subject delete 'pb_nested_alter_test-value' --permanent) || true;
184+
185+
system ok
186+
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "pb_nested_alter_test" 10 user_with_nested_struct
187+
188+
statement ok
189+
DROP SOURCE IF EXISTS src_user_nested CASCADE;
190+
191+
statement ok
192+
CREATE SOURCE src_user_nested
193+
WITH (
194+
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
195+
topic = 'pb_nested_alter_test',
196+
scan.startup.mode = 'earliest'
197+
)
198+
FORMAT PLAIN ENCODE PROTOBUF(
199+
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
200+
message = 'test.User'
201+
);
202+
203+
# Verify original nested struct schema
204+
query T
205+
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 'src_user_nested';
206+
----
207+
CREATE SOURCE src_user_nested (id INT, name CHARACTER VARYING, address CHARACTER VARYING, city CHARACTER VARYING, gender CHARACTER VARYING, sc STRUCT<file_name CHARACTER VARYING>, age INT, address_detail STRUCT<street CHARACTER VARYING, city CHARACTER VARYING, location_detail STRUCT<latitude DOUBLE, longitude DOUBLE>>)
208+
209+
statement ok
210+
CREATE MATERIALIZED VIEW mv_user_nested AS SELECT * FROM src_user_nested;
211+
212+
# Verify we can query the nested fields
213+
query I retry 3 backoff 5s
214+
SELECT COUNT(*) FROM mv_user_nested WHERE (address_detail).city IS NOT NULL;
215+
----
216+
10
217+
218+
# Push events with extended nested fields (compatible: adds country to Address, altitude to Location, and industry at top level)
219+
system ok
220+
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "pb_nested_alter_test" 5 user_with_nested_struct_extended
221+
222+
# Change the source schema - this should succeed because adding fields to nested structs is compatible
223+
statement ok
224+
ALTER SOURCE src_user_nested REFRESH SCHEMA;
225+
226+
# Verify the schema now includes the new nested fields
227+
query T
228+
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 'src_user_nested';
229+
----
230+
CREATE SOURCE src_user_nested (id INT, name CHARACTER VARYING, address CHARACTER VARYING, city CHARACTER VARYING, gender CHARACTER VARYING, sc STRUCT<file_name CHARACTER VARYING>, age INT, address_detail STRUCT<street CHARACTER VARYING, city CHARACTER VARYING, location_detail STRUCT<latitude DOUBLE, longitude DOUBLE, altitude DOUBLE>, country CHARACTER VARYING>)
231+
232+
statement ok
233+
CREATE MATERIALIZED VIEW mv_user_nested_extended AS SELECT * FROM src_user_nested;
234+
235+
# Verify the new nested fields are accessible
236+
query I retry 3 backoff 5s
237+
SELECT (address_detail).country FROM mv_user_nested_extended WHERE (address_detail).country IS NOT NULL;
238+
----
239+
Country_0
240+
Country_1
241+
Country_2
242+
Country_3
243+
Country_4
244+
245+
query I
246+
SELECT COUNT(*) FROM mv_user_nested_extended WHERE ((address_detail).location_detail).altitude IS NOT NULL;
247+
----
248+
5
249+
250+
statement ok
251+
DROP MATERIALIZED VIEW mv_user_nested_extended;
252+
253+
statement ok
254+
DROP MATERIALIZED VIEW mv_user_nested;
255+
171256
statement ok
172-
SET streaming_use_shared_source TO true;
257+
DROP SOURCE src_user_nested;

e2e_test/source_inline/kafka/protobuf/pb.py

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,33 +39,73 @@ def get_user_with_more_fields(i):
3939
)
4040

4141

42+
def get_user_with_nested_struct(i):
43+
return user_pb2.User(
44+
id=i,
45+
name="User_{}".format(i),
46+
address="Address_{}".format(i),
47+
city="City_{}".format(i),
48+
gender=user_pb2.MALE if i % 2 == 0 else user_pb2.FEMALE,
49+
sc=SourceContext(file_name="source/context_{:03}.proto".format(i)),
50+
address_detail=user_pb2.AddressDetail(
51+
street="Street_{}".format(i),
52+
city="City_{}".format(i),
53+
location_detail=user_pb2.LocationDetail(
54+
latitude=37.7749 + i * 0.01,
55+
longitude=-122.4194 + i * 0.01,
56+
),
57+
),
58+
)
59+
60+
61+
def get_user_with_nested_struct_extended(i):
62+
return user_pb2.User(
63+
id=i,
64+
name="User_{}".format(i),
65+
address="Address_{}".format(i),
66+
city="City_{}".format(i),
67+
gender=user_pb2.MALE if i % 2 == 0 else user_pb2.FEMALE,
68+
sc=SourceContext(file_name="source/context_{:03}.proto".format(i)),
69+
address_detail=user_pb2.AddressDetail(
70+
street="Street_{}".format(i),
71+
city="City_{}".format(i),
72+
location_detail=user_pb2.LocationDetail(
73+
latitude=37.7749 + i * 0.01,
74+
longitude=-122.4194 + i * 0.01,
75+
altitude=100.0 + i * 10.0,
76+
),
77+
country="Country_{}".format(i),
78+
),
79+
)
80+
81+
4282
def send_to_kafka(
43-
producer_conf, schema_registry_conf, topic, num_records, get_user_fn, pb_message
83+
producer_conf, schema_registry_conf, topic, num_records, get_message_fn, pb_message_class
4484
):
4585
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
4686
serializer = ProtobufSerializer(
47-
pb_message,
87+
pb_message_class,
4888
schema_registry_client,
4989
{"use.deprecated.format": False, "skip.known.types": True},
5090
)
5191

5292
producer = Producer(producer_conf)
5393
for i in range(num_records):
54-
user = get_user_fn(i)
94+
message = get_message_fn(i)
5595

5696
producer.produce(
5797
topic=topic,
5898
partition=0,
5999
key=json.dumps({"id": i}), # RisingWave does not handle key schema, so we use json
60-
value=serializer(user, SerializationContext(topic, MessageField.VALUE)),
100+
value=serializer(message, SerializationContext(topic, MessageField.VALUE)),
61101
on_delivery=delivery_report,
62102
)
63103
producer.flush()
64104
print("Send {} records to kafka\n".format(num_records))
65105

66106

67107
if __name__ == "__main__":
68-
if len(sys.argv) < 6:
108+
if len(sys.argv) < 5:
69109
print(
70110
"pb.py <brokerlist> <schema-registry-url> <topic> <num-records> <pb_message>"
71111
)
@@ -82,6 +122,8 @@ def send_to_kafka(
82122
all_pb_messages = {
83123
"user": get_user,
84124
"user_with_more_fields": get_user_with_more_fields,
125+
"user_with_nested_struct": get_user_with_nested_struct,
126+
"user_with_nested_struct_extended": get_user_with_nested_struct_extended,
85127
}
86128

87129
assert (
@@ -91,14 +133,22 @@ def send_to_kafka(
91133
schema_registry_conf = {"url": schema_registry_url}
92134
producer_conf = {"bootstrap.servers": broker_list}
93135

136+
# Determine the protobuf message class based on the message name
137+
pb_message_classes = {
138+
"user": user_pb2.User,
139+
"user_with_more_fields": user_pb2.User,
140+
"user_with_nested_struct": user_pb2.User,
141+
"user_with_nested_struct_extended": user_pb2.User,
142+
}
143+
94144
try:
95145
send_to_kafka(
96146
producer_conf,
97147
schema_registry_conf,
98148
topic,
99149
num_records,
100150
all_pb_messages[pb_message],
101-
user_pb2.User,
151+
pb_message_classes[pb_message],
102152
)
103153
except Exception as e:
104154
print("Send Protobuf data to schema registry and kafka failed {}", e)
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
syntax = "proto3";
2+
3+
package test;
4+
5+
import "google/protobuf/source_context.proto";
6+
7+
// Extended from user_with_more_fields.proto
8+
9+
message User {
10+
int32 id = 1;
11+
string name = 2;
12+
string address = 3;
13+
string city = 4;
14+
Gender gender = 5;
15+
google.protobuf.SourceContext sc = 6;
16+
int32 age = 7;
17+
AddressDetail address_detail = 8; // Added for nested struct testing
18+
}
19+
20+
message AddressDetail {
21+
string street = 1;
22+
string city = 2;
23+
LocationDetail location_detail = 3;
24+
}
25+
26+
message LocationDetail {
27+
double latitude = 1;
28+
double longitude = 2;
29+
}
30+
31+
enum Gender {
32+
MALE = 0;
33+
FEMALE = 1;
34+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
syntax = "proto3";
2+
3+
package test;
4+
5+
import "google/protobuf/source_context.proto";
6+
7+
// Extended from user_with_nested_struct.proto
8+
9+
message User {
10+
int32 id = 1;
11+
string name = 2;
12+
string address = 3;
13+
string city = 4;
14+
Gender gender = 5;
15+
google.protobuf.SourceContext sc = 6;
16+
int32 age = 7;
17+
optional AddressDetail address_detail = 8; // Added for nested struct testing
18+
}
19+
20+
message AddressDetail {
21+
string street = 1;
22+
string city = 2;
23+
LocationDetail location_detail = 3;
24+
optional string country = 4; // Added for nested struct testing
25+
}
26+
27+
message LocationDetail {
28+
double latitude = 1;
29+
double longitude = 2;
30+
optional double altitude = 3; // Added for nested struct testing
31+
}
32+
33+
enum Gender {
34+
MALE = 0;
35+
FEMALE = 1;
36+
}

e2e_test/source_inline/kafka/protobuf/user_with_nested_struct_extended_pb2.py

Lines changed: 43 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

e2e_test/source_inline/kafka/protobuf/user_with_nested_struct_pb2.py

Lines changed: 43 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)