Skip to content

Commit

Permalink
storage/adapter: Opt-in migration of sources to the new table model (#…
Browse files Browse the repository at this point in the history
…30483)



---------

Co-authored-by: Roshan Jobanputra <[email protected]>
Co-authored-by: Rainer Niedermayr <[email protected]>
Co-authored-by: Dennis Felsing <[email protected]>
  • Loading branch information
4 people authored Jan 30, 2025
1 parent e280c78 commit 437ed13
Show file tree
Hide file tree
Showing 32 changed files with 1,731 additions and 137 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,17 @@ steps:
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: mysql-cdc-migration
label: MySQL CDC source-versioning migration tests
depends_on: build-aarch64
timeout_in_minutes: 360
plugins:
- ./ci/plugins/mzcompose:
composition: mysql-cdc-old-syntax
run: migration
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: mysql-cdc-resumption-old-syntax
label: MySQL CDC resumption tests (before source versioning)
depends_on: build-aarch64
Expand Down Expand Up @@ -646,6 +657,17 @@ steps:
queue: hetzner-aarch64-4cpu-8gb
# the mzbuild postgres version will be used, which depends on the Dockerfile specification

- id: pg-cdc-migration
label: Postgres CDC source-versioning migration tests
depends_on: build-aarch64
timeout_in_minutes: 360
plugins:
- ./ci/plugins/mzcompose:
composition: pg-cdc-old-syntax
run: migration
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: pg-cdc-resumption-old-syntax
label: Postgres CDC resumption tests (before source versioning)
depends_on: build-aarch64
Expand Down Expand Up @@ -677,6 +699,16 @@ steps:
agents:
queue: hetzner-aarch64-8cpu-16gb

- id: testdrive-kafka-migration
label: "Testdrive (before Kafka source versioning) migration tests"
depends_on: build-aarch64
timeout_in_minutes: 180
plugins:
- ./ci/plugins/mzcompose:
composition: testdrive-old-kafka-src-syntax
run: migration
agents:
queue: hetzner-aarch64-16cpu-32gb

- group: AWS
key: aws
Expand Down
1 change: 1 addition & 0 deletions ci/test/lint-main/checks/check-mzcompose-files.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ check_default_workflow_references_others() {
-not -wholename "./test/canary-environment/mzcompose.py" `# Only run manually` \
-not -wholename "./test/ssh-connection/mzcompose.py" `# Handled differently` \
-not -wholename "./test/scalability/mzcompose.py" `# Other workflows are for manual usage` \
-not -wholename "./test/testdrive-old-kafka-src-syntax/mzcompose.py" `# Other workflow is run separately` \
-not -wholename "./test/terraform/mzcompose.py" `# Handled differently` \
)

Expand Down
56 changes: 56 additions & 0 deletions misc/python/materialize/checks/all_checks/upsert.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,59 @@ def validate(self) -> Testdrive:
"""
)
)


class UpsertLegacy(Check):
"""
An upsert source test that uses the legacy syntax to create the source
on all versions to ensure the source is properly migrated with the
ActivateSourceVersioningMigration scenario
"""

def initialize(self) -> Testdrive:
return Testdrive(
schemas()
+ dedent(
"""
$ kafka-create-topic topic=upsert-legacy-syntax
$ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
{"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
> CREATE SOURCE upsert_insert_legacy
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-legacy-syntax-${testdrive.seed}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT
> CREATE MATERIALIZED VIEW upsert_insert_legacy_view AS SELECT COUNT(DISTINCT key1 || ' ' || f1) FROM upsert_insert_legacy;
"""
)
)

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(schemas() + dedent(s))
for s in [
"""
$ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
{"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
""",
"""
$ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
{"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
""",
]
]

def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
> SELECT COUNT(*), COUNT(DISTINCT key1), COUNT(DISTINCT f1) FROM upsert_insert_legacy
10000 10000 10000
> SELECT * FROM upsert_insert_legacy_view;
10000
"""
)
)
46 changes: 46 additions & 0 deletions misc/python/materialize/checks/scenarios_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,49 @@ def actions(self) -> list[Action]:
),
Validate(self),
]


class ActivateSourceVersioningMigration(Scenario):
"""
Starts MZ, initializes and manipulates, then forces the migration
of sources to the new table model (introducing Source Versioning).
"""

def base_version(self) -> MzVersion:
return get_last_version()

def actions(self) -> list[Action]:
print(f"Upgrading from tag {self.base_version()}")
return [
StartMz(
self,
tag=self.base_version(),
),
Initialize(self),
Manipulate(self, phase=1),
KillMz(
capture_logs=True
), # We always use True here otherwise docker-compose will lose the pre-upgrade logs
StartMz(
self,
tag=None,
# Activate the `force_source_table_syntax` flag
# which should trigger the migration of sources
# using the old syntax to the new table model.
additional_system_parameter_defaults={
"force_source_table_syntax": "true",
},
),
Manipulate(self, phase=2),
Validate(self),
# A second restart while already on the new version
KillMz(capture_logs=True),
StartMz(
self,
tag=None,
additional_system_parameter_defaults={
"force_source_table_syntax": "true",
},
),
Validate(self),
]
2 changes: 2 additions & 0 deletions misc/python/materialize/cli/ci_annotate_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@
| (FAIL|TIMEOUT)\s+\[\s*\d+\.\d+s\]
# parallel-workload
| worker_.*\ still\ running: [\s\S]* Threads\ have\ not\ stopped\ within\ 5\ minutes,\ exiting\ hard
# source-table migration
| source-table-migration\ issue
)
.* $
""",
Expand Down
7 changes: 5 additions & 2 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@


def get_default_system_parameters(
version: MzVersion | None = None, zero_downtime: bool = False
version: MzVersion | None = None,
zero_downtime: bool = False,
force_source_table_syntax: bool = False,
) -> dict[str, str]:
"""For upgrade tests we only want parameters set when all environmentd /
clusterd processes have reached a specific version (or higher)
Expand Down Expand Up @@ -89,7 +91,7 @@ def get_default_system_parameters(
"enable_0dt_deployment": "true" if zero_downtime else "false",
"enable_0dt_deployment_panic_after_timeout": "true",
"enable_0dt_deployment_sources": (
"true" if version >= MzVersion.parse_mz("v0.125.0-dev") else "false"
"true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false"
),
"enable_alter_swap": "true",
"enable_columnation_lgalloc": "true",
Expand Down Expand Up @@ -125,6 +127,7 @@ def get_default_system_parameters(
"persist_record_schema_id": (
"true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"
),
"force_source_table_syntax": "true" if force_source_table_syntax else "false",
"persist_batch_columnar_format": "both_v2",
"persist_batch_delete_enabled": "true",
"persist_batch_structured_order": "true",
Expand Down
69 changes: 69 additions & 0 deletions misc/python/materialize/source_table_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

"""Utilities for testing the source table migration"""
from materialize.mz_version import MzVersion
from materialize.mzcompose.composition import Composition


def verify_sources_after_source_table_migration(
c: Composition, file: str, fail: bool = False
) -> None:
source_names_rows = c.sql_query(
"SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';"
)
source_names = [row[0] for row in source_names_rows]

print(f"Sources created in {file} are: {source_names}")

c.sql("SET statement_timeout = '20s'")

for source_name in source_names:
_verify_source(c, file, source_name, fail=fail)


def _verify_source(
c: Composition, file: str, source_name: str, fail: bool = False
) -> None:
try:
print(f"Checking source: {source_name}")

# must not crash
statement = f"SELECT count(*) FROM {source_name};"
c.sql_query(statement)

statement = f"SHOW CREATE SOURCE {source_name};"
result = c.sql_query(statement)
sql = result[0][1]
assert "FOR TABLE" not in sql, f"FOR TABLE found in: {sql}"
assert "FOR ALL TABLES" not in sql, f"FOR ALL TABLES found in: {sql}"

if not source_name.endswith("_progress"):
assert "CREATE SUBSOURCE" not in sql, f"CREATE SUBSOURCE found in: {sql}"

print("OK.")
except Exception as e:
print(f"source-table-migration issue in {file}: {str(e)}")

if fail:
raise e


def check_source_table_migration_test_sensible() -> None:
assert MzVersion.parse_cargo() < MzVersion.parse_mz(
"v0.139.0"
), "migration test probably no longer needed"


def get_old_image_for_source_table_migration_test() -> str:
return "materialize/materialized:v0.131.0"


def get_new_image_for_source_table_migration_test() -> str | None:
return None
6 changes: 6 additions & 0 deletions src/adapter/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ rust_library(
"//src/pgrepr:mz_pgrepr",
"//src/pgwire-common:mz_pgwire_common",
"//src/postgres-util:mz_postgres_util",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/rocksdb-types:mz_rocksdb_types",
"//src/secrets:mz_secrets",
Expand Down Expand Up @@ -119,6 +120,7 @@ rust_test(
"//src/pgrepr:mz_pgrepr",
"//src/pgwire-common:mz_pgwire_common",
"//src/postgres-util:mz_postgres_util",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/rocksdb-types:mz_rocksdb_types",
"//src/secrets:mz_secrets",
Expand Down Expand Up @@ -165,6 +167,7 @@ rust_doc_test(
"//src/pgrepr:mz_pgrepr",
"//src/pgwire-common:mz_pgwire_common",
"//src/postgres-util:mz_postgres_util",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/rocksdb-types:mz_rocksdb_types",
"//src/secrets:mz_secrets",
Expand Down Expand Up @@ -231,6 +234,7 @@ rust_test(
"//src/pgrepr:mz_pgrepr",
"//src/pgwire-common:mz_pgwire_common",
"//src/postgres-util:mz_postgres_util",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/rocksdb-types:mz_rocksdb_types",
"//src/secrets:mz_secrets",
Expand Down Expand Up @@ -297,6 +301,7 @@ rust_test(
"//src/pgrepr:mz_pgrepr",
"//src/pgwire-common:mz_pgwire_common",
"//src/postgres-util:mz_postgres_util",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/rocksdb-types:mz_rocksdb_types",
"//src/secrets:mz_secrets",
Expand Down Expand Up @@ -363,6 +368,7 @@ rust_test(
"//src/pgrepr:mz_pgrepr",
"//src/pgwire-common:mz_pgwire_common",
"//src/postgres-util:mz_postgres_util",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/rocksdb-types:mz_rocksdb_types",
"//src/secrets:mz_secrets",
Expand Down
3 changes: 3 additions & 0 deletions src/adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ enum-kinds = "0.5.1"
fail = { version = "0.5.1", features = ["failpoints"] }
futures = "0.3.25"
governor = "0.6.0"
hex = "0.4.3"
http = "1.1.0"
ipnet = "2.5.0"
itertools = "0.12.1"
Expand Down Expand Up @@ -53,6 +54,7 @@ mz-pgcopy = { path = "../pgcopy" }
mz-pgrepr = { path = "../pgrepr" }
mz-pgwire-common = { path = "../pgwire-common" }
mz-postgres-util = { path = "../postgres-util" }
mz-proto = { path = "../proto" }
mz-repr = { path = "../repr", features = ["tracing_"] }
mz-rocksdb-types = { path = "../rocksdb-types" }
mz-secrets = { path = "../secrets" }
Expand All @@ -68,6 +70,7 @@ mz-transform = { path = "../transform" }
mz-timestamp-oracle = { path = "../timestamp-oracle" }
opentelemetry = { version = "0.24.0", features = ["trace"] }
prometheus = { version = "0.13.3", default-features = false }
prost = { version = "0.13.2", features = ["no-recursion-limit"] }
qcell = "0.5"
rand = "0.8.5"
rand_chacha = "0.3"
Expand Down
Loading

0 comments on commit 437ed13

Please sign in to comment.