Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 86 additions & 1 deletion e2e_test/source_inline/kafka/protobuf/alter_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ rpk topic delete pb_alter_source_test || true; \
system ok
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "pb_alter_source_test" 20 user

statement ok
DROP SOURCE IF EXISTS src_user CASCADE;

statement ok
CREATE SOURCE src_user
INCLUDE timestamp -- include explicitly here to test a bug found in https://github.com/risingwavelabs/risingwave/pull/17293
Expand All @@ -24,6 +27,12 @@ FORMAT PLAIN ENCODE PROTOBUF(
);

# Create source with generated column to test ALTER SOURCE REFRESH SCHEMA with generated columns
statement ok
DROP SOURCE IF EXISTS src_user_gen CASCADE;

statement ok
DROP TABLE IF EXISTS t_user CASCADE;

statement ok
CREATE SOURCE src_user_gen (*, t int as id+1)
INCLUDE timestamp
Expand Down Expand Up @@ -168,5 +177,81 @@ DROP SOURCE src_user;
statement ok
DROP SOURCE src_user_gen;

# Test nested struct schema evolution (compatible changes)
system ok
rpk topic delete pb_nested_alter_test || true; \
(rpk sr subject delete 'pb_nested_alter_test-value' && rpk sr subject delete 'pb_nested_alter_test-value' --permanent) || true;

system ok
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "pb_nested_alter_test" 10 user_with_nested_struct

statement ok
DROP SOURCE IF EXISTS src_user_nested CASCADE;

statement ok
CREATE SOURCE src_user_nested
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'pb_nested_alter_test',
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User'
);

# Verify original nested struct schema
query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 'src_user_nested';
----
CREATE SOURCE src_user_nested (id INT, name CHARACTER VARYING, address CHARACTER VARYING, city CHARACTER VARYING, gender CHARACTER VARYING, sc STRUCT<file_name CHARACTER VARYING>, age INT, address_detail STRUCT<street CHARACTER VARYING, city CHARACTER VARYING, location_detail STRUCT<latitude DOUBLE, longitude DOUBLE>>)

statement ok
CREATE MATERIALIZED VIEW mv_user_nested AS SELECT * FROM src_user_nested;

# Verify we can query the nested fields
query I retry 3 backoff 5s
SELECT COUNT(*) FROM mv_user_nested WHERE (address_detail).city IS NOT NULL;
----
10

# Push events with extended nested fields (compatible: adds country to Address, altitude to Location, and industry at top level)
system ok
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "pb_nested_alter_test" 5 user_with_nested_struct_extended

# Change the source schema - this should succeed because adding fields to nested structs is compatible
statement ok
ALTER SOURCE src_user_nested REFRESH SCHEMA;

# Verify the schema now includes the new nested fields
query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 'src_user_nested';
----
CREATE SOURCE src_user_nested (id INT, name CHARACTER VARYING, address CHARACTER VARYING, city CHARACTER VARYING, gender CHARACTER VARYING, sc STRUCT<file_name CHARACTER VARYING>, age INT, address_detail STRUCT<street CHARACTER VARYING, city CHARACTER VARYING, location_detail STRUCT<latitude DOUBLE, longitude DOUBLE, altitude DOUBLE>, country CHARACTER VARYING>)

statement ok
CREATE MATERIALIZED VIEW mv_user_nested_extended AS SELECT * FROM src_user_nested;

# Verify the new nested fields are accessible
query I retry 3 backoff 5s
SELECT (address_detail).country FROM mv_user_nested_extended WHERE (address_detail).country IS NOT NULL;
----
Country_0
Country_1
Country_2
Country_3
Country_4

query I
SELECT COUNT(*) FROM mv_user_nested_extended WHERE ((address_detail).location_detail).altitude IS NOT NULL;
----
5

statement ok
DROP MATERIALIZED VIEW mv_user_nested_extended;

statement ok
DROP MATERIALIZED VIEW mv_user_nested;

statement ok
SET streaming_use_shared_source TO true;
DROP SOURCE src_user_nested;
62 changes: 56 additions & 6 deletions e2e_test/source_inline/kafka/protobuf/pb.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,33 +39,73 @@ def get_user_with_more_fields(i):
)


def get_user_with_nested_struct(i):
return user_pb2.User(
id=i,
name="User_{}".format(i),
address="Address_{}".format(i),
city="City_{}".format(i),
gender=user_pb2.MALE if i % 2 == 0 else user_pb2.FEMALE,
sc=SourceContext(file_name="source/context_{:03}.proto".format(i)),
address_detail=user_pb2.AddressDetail(
street="Street_{}".format(i),
city="City_{}".format(i),
location_detail=user_pb2.LocationDetail(
latitude=37.7749 + i * 0.01,
longitude=-122.4194 + i * 0.01,
),
),
)


def get_user_with_nested_struct_extended(i):
return user_pb2.User(
id=i,
name="User_{}".format(i),
address="Address_{}".format(i),
city="City_{}".format(i),
gender=user_pb2.MALE if i % 2 == 0 else user_pb2.FEMALE,
sc=SourceContext(file_name="source/context_{:03}.proto".format(i)),
address_detail=user_pb2.AddressDetail(
street="Street_{}".format(i),
city="City_{}".format(i),
location_detail=user_pb2.LocationDetail(
latitude=37.7749 + i * 0.01,
longitude=-122.4194 + i * 0.01,
altitude=100.0 + i * 10.0,
),
country="Country_{}".format(i),
),
)


def send_to_kafka(
producer_conf, schema_registry_conf, topic, num_records, get_user_fn, pb_message
producer_conf, schema_registry_conf, topic, num_records, get_message_fn, pb_message_class
):
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
serializer = ProtobufSerializer(
pb_message,
pb_message_class,
schema_registry_client,
{"use.deprecated.format": False, "skip.known.types": True},
)

producer = Producer(producer_conf)
for i in range(num_records):
user = get_user_fn(i)
message = get_message_fn(i)

producer.produce(
topic=topic,
partition=0,
key=json.dumps({"id": i}), # RisingWave does not handle key schema, so we use json
value=serializer(user, SerializationContext(topic, MessageField.VALUE)),
value=serializer(message, SerializationContext(topic, MessageField.VALUE)),
on_delivery=delivery_report,
)
producer.flush()
print("Send {} records to kafka\n".format(num_records))


if __name__ == "__main__":
if len(sys.argv) < 6:
if len(sys.argv) < 5:
print(
"pb.py <brokerlist> <schema-registry-url> <topic> <num-records> <pb_message>"
)
Expand All @@ -82,6 +122,8 @@ def send_to_kafka(
all_pb_messages = {
"user": get_user,
"user_with_more_fields": get_user_with_more_fields,
"user_with_nested_struct": get_user_with_nested_struct,
"user_with_nested_struct_extended": get_user_with_nested_struct_extended,
}

assert (
Expand All @@ -91,14 +133,22 @@ def send_to_kafka(
schema_registry_conf = {"url": schema_registry_url}
producer_conf = {"bootstrap.servers": broker_list}

# Determine the protobuf message class based on the message name
pb_message_classes = {
"user": user_pb2.User,
"user_with_more_fields": user_pb2.User,
"user_with_nested_struct": user_pb2.User,
"user_with_nested_struct_extended": user_pb2.User,
}

try:
send_to_kafka(
producer_conf,
schema_registry_conf,
topic,
num_records,
all_pb_messages[pb_message],
user_pb2.User,
pb_message_classes[pb_message],
)
except Exception as e:
print("Send Protobuf data to schema registry and kafka failed {}", e)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
syntax = "proto3";

package test;

import "google/protobuf/source_context.proto";

// Extended from user_with_more_fields.proto

message User {
int32 id = 1;
string name = 2;
string address = 3;
string city = 4;
Gender gender = 5;
google.protobuf.SourceContext sc = 6;
int32 age = 7;
AddressDetail address_detail = 8; // Added for nested struct testing
}

message AddressDetail {
string street = 1;
string city = 2;
LocationDetail location_detail = 3;
}

message LocationDetail {
double latitude = 1;
double longitude = 2;
}

enum Gender {
MALE = 0;
FEMALE = 1;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
syntax = "proto3";

package test;

import "google/protobuf/source_context.proto";

// Extended from user_with_nested_struct.proto

message User {
int32 id = 1;
string name = 2;
string address = 3;
string city = 4;
Gender gender = 5;
google.protobuf.SourceContext sc = 6;
int32 age = 7;
optional AddressDetail address_detail = 8; // Added for nested struct testing
}

message AddressDetail {
string street = 1;
string city = 2;
LocationDetail location_detail = 3;
optional string country = 4; // Added for nested struct testing
}

message LocationDetail {
double latitude = 1;
double longitude = 2;
optional double altitude = 3; // Added for nested struct testing
}

enum Gender {
MALE = 0;
FEMALE = 1;
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading