|
21 | 21 | from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
|
22 | 22 | from materialize.mzcompose.service import Service, ServiceConfig
|
23 | 23 | from materialize.mzcompose.services.materialized import Materialized
|
24 |
| -from materialize.mzcompose.services.postgres import Postgres |
| 24 | +from materialize.mzcompose.services.postgres import ( |
| 25 | + CockroachOrPostgresMetadata, |
| 26 | + Postgres, PostgresMetadata, |
| 27 | +) |
25 | 28 | from materialize.mzcompose.services.test_certs import TestCerts
|
26 | 29 | from materialize.mzcompose.services.testdrive import Testdrive
|
27 | 30 | from materialize.mzcompose.services.toxiproxy import Toxiproxy
|
@@ -90,6 +93,7 @@ def create_postgres(
|
90 | 93 | },
|
91 | 94 | ),
|
92 | 95 | Testdrive(),
|
| 96 | + PostgresMetadata(), |
93 | 97 | TestCerts(),
|
94 | 98 | Toxiproxy(),
|
95 | 99 | create_postgres(pg_version=None),
|
@@ -354,3 +358,86 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
|
354 | 358 |
|
355 | 359 | with c.test_case(name):
|
356 | 360 | c.workflow(name)
|
| 361 | + |
| 362 | + |
| 363 | +def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None: |
| 364 | + matching_files = glob.glob("*.td", root_dir="test/pg-cdc-old-syntax") |
| 365 | + sharded_files: list[str] = sorted( |
| 366 | + buildkite.shard_list(matching_files, lambda file: file) |
| 367 | + ) |
| 368 | + print(f"Files: {sharded_files}") |
| 369 | + |
| 370 | + ssl_ca = c.run("test-certs", "cat", "/secrets/ca.crt", capture=True).stdout |
| 371 | + ssl_cert = c.run("test-certs", "cat", "/secrets/certuser.crt", capture=True).stdout |
| 372 | + ssl_key = c.run("test-certs", "cat", "/secrets/certuser.key", capture=True).stdout |
| 373 | + ssl_wrong_cert = c.run( |
| 374 | + "test-certs", "cat", "/secrets/postgres.crt", capture=True |
| 375 | + ).stdout |
| 376 | + ssl_wrong_key = c.run( |
| 377 | + "test-certs", "cat", "/secrets/postgres.key", capture=True |
| 378 | + ).stdout |
| 379 | + |
| 380 | + pg_version = get_targeted_pg_version(parser) |
| 381 | + |
| 382 | + for file in sharded_files: |
| 383 | + |
| 384 | + mz_old = Materialized( |
| 385 | + name="materialized", |
| 386 | + image="materialize/materialized:v0.122.0", |
| 387 | + volumes_extra=["secrets:/share/secrets"], |
| 388 | + metadata_store="postgres-metadata", |
| 389 | + external_metadata_store=True, |
| 390 | + additional_system_parameter_defaults={ |
| 391 | + "log_filter": "mz_storage::source::postgres=trace,debug,info,warn,error" |
| 392 | + }, |
| 393 | + ) |
| 394 | + |
| 395 | + mz_new = Materialized( |
| 396 | + name="materialized", |
| 397 | + image=None, |
| 398 | + volumes_extra=["secrets:/share/secrets"], |
| 399 | + metadata_store="postgres-metadata", |
| 400 | + external_metadata_store=True, |
| 401 | + additional_system_parameter_defaults={ |
| 402 | + "log_filter": "mz_storage::source::postgres=trace,debug,info,warn,error", |
| 403 | + "force_source_table_syntax": "true", |
| 404 | + }, |
| 405 | + ) |
| 406 | + with c.override(mz_old, create_postgres(pg_version=pg_version)): |
| 407 | + c.up("materialized", "test-certs", "postgres") |
| 408 | + |
| 409 | + print(f"Running {file} with mz_old") |
| 410 | + |
| 411 | + c.run_testdrive_files( |
| 412 | + f"--var=ssl-ca={ssl_ca}", |
| 413 | + f"--var=ssl-cert={ssl_cert}", |
| 414 | + f"--var=ssl-key={ssl_key}", |
| 415 | + f"--var=ssl-wrong-cert={ssl_wrong_cert}", |
| 416 | + f"--var=ssl-wrong-key={ssl_wrong_key}", |
| 417 | + f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}", |
| 418 | + f"--var=default-storage-size={Materialized.Size.DEFAULT_SIZE}-1", |
| 419 | + "--no-reset", |
| 420 | + file, |
| 421 | + ) |
| 422 | + c.kill("materialized", wait=True) |
| 423 | + |
| 424 | + with c.override(mz_new): |
| 425 | + c.up("materialized") |
| 426 | + |
| 427 | + print("Running mz_new") |
| 428 | + |
| 429 | + source_names = c.sql_query( |
| 430 | + "SELECT name FROM mz_sources WHERE id LIKE 'u%';" |
| 431 | + ) |
| 432 | + |
| 433 | + for row in source_names: |
| 434 | + source_name = row[0] |
| 435 | + print(f"Checking source: {source_name}") |
| 436 | + c.sql_query(f"SELECT count(*) FROM {source_name};") |
| 437 | + else: |
| 438 | + print("No sources found") |
| 439 | + |
| 440 | + c.kill("materialized", wait=True) |
| 441 | + c.kill("postgres", wait=True) |
| 442 | + c.rm("materialized") |
| 443 | + c.rm("postgres") |
0 commit comments