diff --git a/Cargo.lock b/Cargo.lock index bcfe593a47693..db1313f558bca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5470,7 +5470,7 @@ dependencies = [ [[package]] name = "mz-balancerd" -version = "0.165.0-dev.0" +version = "26.0.0-dev.0" dependencies = [ "anyhow", "async-trait", @@ -5611,7 +5611,7 @@ dependencies = [ [[package]] name = "mz-catalog-debug" -version = "0.165.0-dev.0" +version = "26.0.0-dev.0" dependencies = [ "anyhow", "clap", @@ -5773,7 +5773,7 @@ dependencies = [ [[package]] name = "mz-clusterd" -version = "0.165.0-dev.0" +version = "26.0.0-dev.0" dependencies = [ "anyhow", "axum", @@ -6074,7 +6074,7 @@ dependencies = [ [[package]] name = "mz-environmentd" -version = "0.165.0-dev.0" +version = "26.0.0-dev.0" dependencies = [ "anyhow", "askama", @@ -6584,7 +6584,7 @@ dependencies = [ [[package]] name = "mz-materialized" -version = "0.165.0-dev.0" +version = "26.0.0-dev.0" dependencies = [ "mz-clusterd", "mz-environmentd", @@ -6781,7 +6781,7 @@ dependencies = [ [[package]] name = "mz-orchestratord" -version = "0.165.0-dev.0" +version = "26.0.0-dev.0" dependencies = [ "anyhow", "async-trait", @@ -6967,7 +6967,7 @@ dependencies = [ [[package]] name = "mz-persist-client" -version = "0.165.0-dev.0" +version = "26.0.0-dev.0" dependencies = [ "anyhow", "arrayvec 0.7.6", @@ -8073,7 +8073,7 @@ dependencies = [ [[package]] name = "mz-testdrive" -version = "0.165.0-dev.0" +version = "26.0.0-dev.0" dependencies = [ "anyhow", "arrow", diff --git a/bin/bump-version b/bin/bump-version index 2096e6ebbd875..34a76eba7e9dd 100755 --- a/bin/bump-version +++ b/bin/bump-version @@ -69,7 +69,7 @@ rm -f src/{clusterd,environmentd,materialized,persist-client,testdrive,catalog-d cargo update --workspace -bin/helm-chart-version-bump --bump-weekly-version --bump-orchestratord-version "v$version" +bin/helm-chart-version-bump --bump-orchestratord-version "v$version" helm_docs_version=1.14.2 if ! helm-docs --help > /dev/null; then diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index 2727859a097be..176dde4281f5e 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -1148,10 +1148,10 @@ steps: - id: checks-self-managed-random-upgrade-path-1 - label: "Checks Self-Managed first random upgrade path" + label: "Checks Self-Managed random upgrade path" depends_on: build-x86_64 timeout_in_minutes: 180 - parallelism: 3 + parallelism: 2 agents: queue: hetzner-x86-64-8cpu-16gb plugins: @@ -1161,10 +1161,10 @@ steps: - id: checks-self-managed-random-upgrade-path-2 - label: "Checks Self-Managed second random upgrade path" + label: "Checks Self-Managed random upgrade path" depends_on: build-x86_64 timeout_in_minutes: 180 - parallelism: 3 + parallelism: 2 agents: queue: hetzner-x86-64-8cpu-16gb plugins: @@ -1173,10 +1173,217 @@ steps: args: [--scenario=SelfManagedRandomUpgradePath, "--seed=BUILDKITE_JOB_ID"] - id: checks-self-managed-random-upgrade-path-3 - label: "Checks Self-Managed third random upgrade path" + label: "Checks Self-Managed random upgrade path" depends_on: build-x86_64 timeout_in_minutes: 180 - parallelism: 3 + parallelism: 2 + agents: + queue: hetzner-x86-64-8cpu-16gb + plugins: + - ./ci/plugins/mzcompose: + composition: platform-checks + args: [--scenario=SelfManagedRandomUpgradePath, "--seed=BUILDKITE_JOB_ID"] + + - id: checks-self-managed-random-upgrade-path-4 + label: "Checks Self-Managed random upgrade path" + depends_on: build-x86_64 + timeout_in_minutes: 180 + parallelism: 2 + agents: + queue: hetzner-x86-64-8cpu-16gb + plugins: + - ./ci/plugins/mzcompose: + composition: platform-checks + args: [--scenario=SelfManagedRandomUpgradePath, "--seed=$BUILDKITE_JOB_ID"] + + - id: checks-self-managed-random-upgrade-path-5 + label: "Checks Self-Managed random upgrade path" + depends_on: build-x86_64 + timeout_in_minutes: 180 + parallelism: 2 + agents: + queue: hetzner-x86-64-8cpu-16gb + plugins: + - ./ci/plugins/mzcompose: + composition: platform-checks + args: [--scenario=SelfManagedRandomUpgradePath, "--seed=BUILDKITE_JOB_ID"] + + - id: checks-self-managed-random-upgrade-path-6 + label: "Checks Self-Managed random upgrade path" + depends_on: build-x86_64 + timeout_in_minutes: 180 + parallelism: 2 + agents: + queue: hetzner-x86-64-8cpu-16gb + plugins: + - ./ci/plugins/mzcompose: + composition: platform-checks + args: [--scenario=SelfManagedRandomUpgradePath, "--seed=BUILDKITE_JOB_ID"] + + - id: checks-self-managed-random-upgrade-path-7 + label: "Checks Self-Managed random upgrade path" + depends_on: build-x86_64 + timeout_in_minutes: 180 + parallelism: 2 + agents: + queue: hetzner-x86-64-8cpu-16gb + plugins: + - ./ci/plugins/mzcompose: + composition: platform-checks + args: [--scenario=SelfManagedRandomUpgradePath, "--seed=BUILDKITE_JOB_ID"] + + - id: checks-self-managed-random-upgrade-path-8 + label: "Checks Self-Managed random upgrade path" + depends_on: build-x86_64 + timeout_in_minutes: 180 + parallelism: 2 + agents: + queue: hetzner-x86-64-8cpu-16gb + plugins: + - ./ci/plugins/mzcompose: + composition: platform-checks + args: [--scenario=SelfManagedRandomUpgradePath, "--seed=$BUILDKITE_JOB_ID"] + + + - id: checks-self-managed-random-upgrade-path-9 + label: "Checks Self-Managed random upgrade path" + depends_on: build-x86_64 + timeout_in_minutes: 180 + parallelism: 2 + agents: + queue: hetzner-x86-64-8cpu-16gb + plugins: + - ./ci/plugins/mzcompose: + composition: platform-checks + args: [--scenario=SelfManagedRandomUpgradePath, "--seed=BUILDKITE_JOB_ID"] + + - id: checks-self-managed-random-upgrade-path-10 + label: "Checks Self-Managed random upgrade path" + depends_on: build-x86_64 + timeout_in_minutes: 180 + parallelism: 2 + agents: + queue: hetzner-x86-64-8cpu-16gb + plugins: + - ./ci/plugins/mzcompose: + composition: platform-checks + args: [--scenario=SelfManagedRandomUpgradePath, "--seed=BUILDKITE_JOB_ID"] + - id: checks-self-managed-random-upgrade-path-11 + label: "Checks Self-Managed random upgrade path" + depends_on: build-x86_64 + timeout_in_minutes: 180 + parallelism: 2 + agents: + queue: hetzner-x86-64-8cpu-16gb + plugins: + - ./ci/plugins/mzcompose: + composition: platform-checks + args: [--scenario=SelfManagedRandomUpgradePath, "--seed=$BUILDKITE_JOB_ID"] + + + - id: checks-self-managed-random-upgrade-path-12 + label: "Checks Self-Managed random upgrade path" + depends_on: build-x86_64 + timeout_in_minutes: 180 + parallelism: 2 + agents: + queue: hetzner-x86-64-8cpu-16gb + plugins: + - ./ci/plugins/mzcompose: + composition: platform-checks + args: [--scenario=SelfManagedRandomUpgradePath, "--seed=BUILDKITE_JOB_ID"] + + - id: checks-self-managed-random-upgrade-path-13 + label: "Checks Self-Managed random upgrade path" + depends_on: build-x86_64 + timeout_in_minutes: 180 + parallelism: 2 + agents: + queue: hetzner-x86-64-8cpu-16gb + plugins: + - ./ci/plugins/mzcompose: + composition: platform-checks + args: [--scenario=SelfManagedRandomUpgradePath, "--seed=BUILDKITE_JOB_ID"] + + - id: checks-self-managed-random-upgrade-path-14 + label: "Checks Self-Managed random upgrade path" + depends_on: build-x86_64 + timeout_in_minutes: 180 + parallelism: 2 + agents: + queue: hetzner-x86-64-8cpu-16gb + plugins: + - ./ci/plugins/mzcompose: + composition: platform-checks + args: [--scenario=SelfManagedRandomUpgradePath, "--seed=$BUILDKITE_JOB_ID"] + + + - id: checks-self-managed-random-upgrade-path-15 + label: "Checks Self-Managed random upgrade path" + depends_on: build-x86_64 + timeout_in_minutes: 180 + parallelism: 2 + agents: + queue: hetzner-x86-64-8cpu-16gb + plugins: + - ./ci/plugins/mzcompose: + composition: platform-checks + args: [--scenario=SelfManagedRandomUpgradePath, "--seed=BUILDKITE_JOB_ID"] + + - id: checks-self-managed-random-upgrade-path-16 + label: "Checks Self-Managed random upgrade path" + depends_on: build-x86_64 + timeout_in_minutes: 180 + parallelism: 2 + agents: + queue: hetzner-x86-64-8cpu-16gb + plugins: + - ./ci/plugins/mzcompose: + composition: platform-checks + args: [--scenario=SelfManagedRandomUpgradePath, "--seed=BUILDKITE_JOB_ID"] + + - id: checks-self-managed-random-upgrade-path-17 + label: "Checks Self-Managed random upgrade path" + depends_on: build-x86_64 + timeout_in_minutes: 180 + parallelism: 2 + agents: + queue: hetzner-x86-64-8cpu-16gb + plugins: + - ./ci/plugins/mzcompose: + composition: platform-checks + args: [--scenario=SelfManagedRandomUpgradePath, "--seed=BUILDKITE_JOB_ID"] + + - id: checks-self-managed-random-upgrade-path-18 + label: "Checks Self-Managed random upgrade path" + depends_on: build-x86_64 + timeout_in_minutes: 180 + parallelism: 2 + agents: + queue: hetzner-x86-64-8cpu-16gb + plugins: + - ./ci/plugins/mzcompose: + composition: platform-checks + args: [--scenario=SelfManagedRandomUpgradePath, "--seed=$BUILDKITE_JOB_ID"] + + + - id: checks-self-managed-random-upgrade-path-19 + label: "Checks Self-Managed random upgrade path" + depends_on: build-x86_64 + timeout_in_minutes: 180 + parallelism: 2 + agents: + queue: hetzner-x86-64-8cpu-16gb + plugins: + - ./ci/plugins/mzcompose: + composition: platform-checks + args: [--scenario=SelfManagedRandomUpgradePath, "--seed=BUILDKITE_JOB_ID"] + + - id: checks-self-managed-random-upgrade-path-20 + label: "Checks Self-Managed random upgrade path" + depends_on: build-x86_64 + timeout_in_minutes: 180 + parallelism: 2 agents: queue: hetzner-x86-64-8cpu-16gb plugins: diff --git a/doc/user/content/releases/v0.165.md b/doc/user/content/releases/v26.0.md similarity index 69% rename from doc/user/content/releases/v0.165.md rename to doc/user/content/releases/v26.0.md index 08b1b262a7f65..82813f22c2984 100644 --- a/doc/user/content/releases/v0.165.md +++ b/doc/user/content/releases/v26.0.md @@ -1,5 +1,5 @@ --- -title: "Materialize v0.165" +title: "Materialize v26.0" date: 2025-11-19 released: false _build: diff --git a/misc/helm-charts/operator/Chart.yaml b/misc/helm-charts/operator/Chart.yaml index 54abbef790f65..a35a5368d7e5d 100644 --- a/misc/helm-charts/operator/Chart.yaml +++ b/misc/helm-charts/operator/Chart.yaml @@ -11,7 +11,7 @@ apiVersion: v2 name: materialize-operator description: Materialize Kubernetes Operator Helm Chart type: application -version: v25.3.0-beta.1 -appVersion: v0.165.0-dev.0 +version: v26.0.0-beta.1 +appVersion: v26.0.0-dev.0 icon: https://materialize.com/favicon.ico home: https://materialize.com diff --git a/misc/helm-charts/operator/README.md b/misc/helm-charts/operator/README.md index 6a0d8ac0028f0..77e275cd9384a 100644 --- a/misc/helm-charts/operator/README.md +++ b/misc/helm-charts/operator/README.md @@ -1,6 +1,6 @@ # Materialize Kubernetes Operator Helm Chart -![Version: v25.3.0-beta.1](https://img.shields.io/badge/Version-v25.3.0--beta.1-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: v0.165.0-dev.0](https://img.shields.io/badge/AppVersion-v0.165.0--dev.0-informational?style=flat-square) +![Version: v26.0.0-beta.1](https://img.shields.io/badge/Version-v26.0.0--beta.1-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: v26.0.0-dev.0](https://img.shields.io/badge/AppVersion-v26.0.0--dev.0-informational?style=flat-square) Materialize Kubernetes Operator Helm Chart @@ -159,7 +159,7 @@ The following table lists the configurable parameters of the Materialize operato | `operator.clusters.swap_enabled` | | ``true`` | | `operator.image.pullPolicy` | Policy for pulling the image: "IfNotPresent" avoids unnecessary re-pulling of images | ``"IfNotPresent"`` | | `operator.image.repository` | The Docker repository for the operator image | ``"materialize/orchestratord"`` | -| `operator.image.tag` | The tag/version of the operator image to be used | ``"v0.164.0"`` | +| `operator.image.tag` | The tag/version of the operator image to be used | ``"v26.0.0-dev.0"`` | | `operator.nodeSelector` | Node selector to use for the operator pod | ``nil`` | | `operator.resources.limits` | Resource limits for the operator's CPU and memory | ``{"memory":"512Mi"}`` | | `operator.resources.requests` | Resources requested by the operator for CPU and memory | ``{"cpu":"100m","memory":"512Mi"}`` | @@ -185,7 +185,7 @@ Specify each parameter using the `--set key=value[,key=value]` argument to `helm ```shell helm install my-materialize-operator \ - --set operator.image.tag=v0.165.0-dev.0 \ + --set operator.image.tag=v26.0.0-dev.0 \ materialize/materialize-operator ``` @@ -220,7 +220,7 @@ metadata: name: 12345678-1234-1234-1234-123456789012 namespace: materialize-environment spec: - environmentdImageRef: materialize/environmentd:v0.165.0-dev.0 + environmentdImageRef: materialize/environmentd:v26.0.0-dev.0 backendSecretName: materialize-backend environmentdResourceRequirements: limits: @@ -305,7 +305,7 @@ Or check the `Chart.yaml` file in the `misc/helm-charts/operator` directory: apiVersion: v2 name: materialize-operator # ... -version: v25.3.0-beta-1 +version: v26.0.0-dev.0 appVersion: v0.147.0 # Use this version for your Materialize instances ``` diff --git a/misc/helm-charts/operator/README.md.gotmpl b/misc/helm-charts/operator/README.md.gotmpl index 3c9343e94c891..a69d25b3ab480 100644 --- a/misc/helm-charts/operator/README.md.gotmpl +++ b/misc/helm-charts/operator/README.md.gotmpl @@ -246,7 +246,7 @@ Or check the `Chart.yaml` file in the `misc/helm-charts/operator` directory: apiVersion: v2 name: materialize-operator # ... -version: v25.3.0-beta-1 +version: v26.0.0-dev.0 appVersion: v0.147.0 # Use this version for your Materialize instances ``` diff --git a/misc/helm-charts/operator/tests/deployment_test.yaml b/misc/helm-charts/operator/tests/deployment_test.yaml index d91a34e5d4ffc..38d76a7e69aab 100644 --- a/misc/helm-charts/operator/tests/deployment_test.yaml +++ b/misc/helm-charts/operator/tests/deployment_test.yaml @@ -17,7 +17,7 @@ tests: of: Deployment - equal: path: spec.template.spec.containers[0].image - value: materialize/orchestratord:v0.164.0 + value: materialize/orchestratord:v26.0.0-dev.0 - equal: path: spec.template.spec.containers[0].imagePullPolicy value: IfNotPresent diff --git a/misc/helm-charts/operator/values.yaml b/misc/helm-charts/operator/values.yaml index d4e1c705a0d94..48e0fd744cf06 100644 --- a/misc/helm-charts/operator/values.yaml +++ b/misc/helm-charts/operator/values.yaml @@ -13,7 +13,7 @@ operator: # -- The Docker repository for the operator image repository: materialize/orchestratord # -- The tag/version of the operator image to be used - tag: v0.164.0 + tag: v26.0.0-dev.0 # -- Policy for pulling the image: "IfNotPresent" avoids unnecessary re-pulling of images pullPolicy: IfNotPresent diff --git a/misc/helm-charts/testing/materialize.yaml b/misc/helm-charts/testing/materialize.yaml index f097101ccdc3b..e983f2f9d5e9f 100644 --- a/misc/helm-charts/testing/materialize.yaml +++ b/misc/helm-charts/testing/materialize.yaml @@ -28,7 +28,7 @@ metadata: name: 12345678-1234-1234-1234-123456789012 namespace: materialize-environment spec: - environmentdImageRef: materialize/environmentd:v0.165.0-dev.0 + environmentdImageRef: materialize/environmentd:v26.0.0-dev.0 backendSecretName: materialize-backend authenticatorKind: None #balancerdExternalCertificateSpec: diff --git a/misc/python/materialize/cli/helm_chart_version_bump.py b/misc/python/materialize/cli/helm_chart_version_bump.py index daecbf2b9bb22..29be34945d5ad 100644 --- a/misc/python/materialize/cli/helm_chart_version_bump.py +++ b/misc/python/materialize/cli/helm_chart_version_bump.py @@ -22,11 +22,6 @@ def main() -> int: prog="helm-chart-version-bump", description="Bump environmentd (appVersion), orchestratord and helm-chart versions in helm chart.", ) - parser.add_argument( - "--bump-weekly-version", - action="store_true", - help="Also bump the weekly Materialize operator release instead of the bi-yearly one", - ) parser.add_argument( "--helm-chart-version", type=str, @@ -49,7 +44,12 @@ def main() -> int: mods = [ ( MZ_ROOT / "misc" / "helm-charts" / "operator" / "Chart.yaml", - lambda docs: docs[0].update({"appVersion": args.environmentd_version}), + lambda docs: docs[0].update( + { + "version": args.environmentd_version, + "appVersion": args.environmentd_version, + } + ), ), ( MZ_ROOT / "misc" / "helm-charts" / "testing" / "materialize.yaml", @@ -61,19 +61,6 @@ def main() -> int: ), ] - if args.bump_weekly_version: - mods += [ - ( - MZ_ROOT / "misc" / "helm-charts" / "operator-weekly" / "Chart.yaml", - lambda docs: docs[0].update( - { - "version": args.environmentd_version, - "appVersion": args.environmentd_version, - } - ), - ), - ] - if args.bump_orchestratord_version: # There are two cases that bump the version: # 1. Bump to new unreleased dev version: Use the latest released orchestratord version diff --git a/misc/python/materialize/version_list.py b/misc/python/materialize/version_list.py index 2e330075aa0fa..7505aa4fd7e40 100644 --- a/misc/python/materialize/version_list.py +++ b/misc/python/materialize/version_list.py @@ -83,7 +83,8 @@ def get_supported_self_managed_versions() -> list[MzVersion]: { v.version for v in self_managed_versions - if v.helm_version.major == 25 and v.helm_version.minor == 2 + if v.helm_version.major == 25 + and (v.helm_version.minor == 2 or v.helm_version.minor == 1) } ) diff --git a/src/balancerd/Cargo.toml b/src/balancerd/Cargo.toml index 4e60193e6a960..46364f101d1fa 100644 --- a/src/balancerd/Cargo.toml +++ b/src/balancerd/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "mz-balancerd" description = "Balancer service." -version = "0.165.0-dev.0" +version = "26.0.0-dev.0" edition.workspace = true rust-version.workspace = true publish = false diff --git a/src/catalog-debug/Cargo.toml b/src/catalog-debug/Cargo.toml index 3f3d9a5f8fa9a..44a1374c84509 100644 --- a/src/catalog-debug/Cargo.toml +++ b/src/catalog-debug/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "mz-catalog-debug" description = "Durable metadata storage debug tool." -version = "0.165.0-dev.0" +version = "26.0.0-dev.0" edition.workspace = true rust-version.workspace = true publish = false diff --git a/src/catalog/src/durable.rs b/src/catalog/src/durable.rs index e3828c17b6d75..fcb216e41bf09 100644 --- a/src/catalog/src/durable.rs +++ b/src/catalog/src/durable.rs @@ -450,7 +450,7 @@ impl TestCatalogStateBuilder { Self { persist_client, organization_id: Uuid::new_v4(), - version: semver::Version::new(0, 0, 0), + version: mz_build_info::DUMMY_BUILD_INFO.semver_version(), deploy_generation: None, metrics: Arc::new(Metrics::new(&MetricsRegistry::new())), } diff --git a/src/catalog/src/durable/persist.rs b/src/catalog/src/durable/persist.rs index f8a14ca306040..b869b479e7a0d 100644 --- a/src/catalog/src/durable/persist.rs +++ b/src/catalog/src/durable/persist.rs @@ -395,36 +395,27 @@ impl> PersistHandle { /// Increment the version in the catalog upgrade shard to the code's current version. async fn increment_catalog_upgrade_shard_version(&self, organization_id: Uuid) { let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED); - let mut write_handle: WriteHandle<(), (), Timestamp, StorageDiff> = self + + let () = self .persist_client - .open_writer( + .upgrade_version::<(), (), Timestamp, StorageDiff>( upgrade_shard_id, - Arc::new(UnitSchema::default()), - Arc::new(UnitSchema::default()), Diagnostics { shard_name: UPGRADE_SHARD_NAME.to_string(), - handle_purpose: "increment durable catalog upgrade shard version".to_string(), + handle_purpose: "durable catalog state upgrade".to_string(), }, ) .await .expect("invalid usage"); - const EMPTY_UPDATES: &[(((), ()), Timestamp, StorageDiff)] = &[]; - let mut upper = write_handle.fetch_recent_upper().await.clone(); - loop { - let next_upper = upper - .iter() - .map(|timestamp| timestamp.step_forward()) - .collect(); - match write_handle - .compare_and_append(EMPTY_UPDATES, upper, next_upper) - .await - .expect("invalid usage") - { - Ok(()) => break, - Err(upper_mismatch) => { - upper = upper_mismatch.current; - } - } + + if cfg!(debug_assertions) { + let fetched_version = + fetch_catalog_upgrade_shard_version(&self.persist_client, upgrade_shard_id).await; + assert_eq!( + Some(&self.catalog_content_version), + fetched_version.as_ref(), + "code version should match the upgraded data version" + ); } } @@ -998,16 +989,12 @@ impl UnopenedPersistCatalogState { // If this is `None`, no version was found in the upgrade shard. This is a brand-new // environment, and we don't need to worry about fencing existing users. if let Some(version_in_upgrade_shard) = version_in_upgrade_shard { - // IMPORTANT: We swap the order of arguments here! Normally it's - // `code_version, data_version`, and we check whether a given code - // version, which is usually _older_, is allowed to touch a shard - // that has been touched by a _future_ version. - // - // By inverting argument order, we check if our version is too far - // ahead of the version in the shard. - if mz_persist_client::cfg::check_data_version(&version_in_upgrade_shard, &version) - .is_err() - { + // Check that the current version of the code can handle data from the shard. + // (We used to reverse this check, to confirm that whatever code wrote the data + // in the shard would be able to read data written by the current version... but + // we now require the current code to be able to maintain compat with whatever + // data format versions pass this check.) + if !mz_persist_client::cfg::code_can_write_data(&version, &version_in_upgrade_shard) { return Err(DurableCatalogError::IncompatiblePersistVersion { found_version: version_in_upgrade_shard, catalog_version: version, diff --git a/src/catalog/src/durable/persist/tests.rs b/src/catalog/src/durable/persist/tests.rs index 8c610144140e0..fe8d66fcb0d33 100644 --- a/src/catalog/src/durable/persist/tests.rs +++ b/src/catalog/src/durable/persist/tests.rs @@ -12,20 +12,18 @@ use mz_persist_client::PersistLocation; use mz_persist_client::cache::PersistClientCache; use uuid::Uuid; -use crate::durable::persist::{ - CATALOG_SEED, UPGRADE_SEED, fetch_catalog_upgrade_shard_version, shard_id, -}; +use crate::durable::persist::{UPGRADE_SEED, fetch_catalog_upgrade_shard_version, shard_id}; use crate::durable::{DurableCatalogError, TestCatalogStateBuilder, test_bootstrap_args}; /// Test that the catalog forces users to upgrade one version at a time. #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` async fn test_upgrade_shard() { - let first_version = semver::Version::parse("0.10.0").expect("failed to parse version"); - let second_version = semver::Version::parse("0.11.0").expect("failed to parse version"); + let first_version = semver::Version::parse("0.147.0").expect("failed to parse version"); + let second_version = semver::Version::parse("26.0.0").expect("failed to parse version"); let second_dev_version = - semver::Version::parse("0.11.0-dev.0").expect("failed to parse version"); - let third_version = semver::Version::parse("0.12.0").expect("failed to parse version"); + semver::Version::parse("26.0.0-dev.0").expect("failed to parse version"); + let third_version = semver::Version::parse("27.1.0").expect("failed to parse version"); let organization_id = Uuid::new_v4(); let deploy_generation = 0; let mut persist_cache = PersistClientCache::new_no_metrics(); @@ -156,12 +154,12 @@ async fn test_upgrade_shard() { #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` async fn test_version_regression() { - let first_version = semver::Version::parse("0.10.0").expect("failed to parse version"); - let second_version = semver::Version::parse("0.11.0").expect("failed to parse version"); + let first_version = semver::Version::parse("0.147.0").expect("failed to parse version"); + let second_version = semver::Version::parse("26.1.0").expect("failed to parse version"); let organization_id = Uuid::new_v4(); let deploy_generation = 0; let mut persist_cache = PersistClientCache::new_no_metrics(); - let catalog_shard_id = shard_id(organization_id, CATALOG_SEED); + let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED); persist_cache.cfg.build_version = first_version.clone(); let persist_client = persist_cache @@ -171,7 +169,7 @@ async fn test_version_regression() { assert_eq!( None, - fetch_catalog_upgrade_shard_version(&persist_client, catalog_shard_id).await + fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await ); let persist_openable_state = TestCatalogStateBuilder::new(persist_client.clone()) @@ -188,7 +186,7 @@ async fn test_version_regression() { assert_eq!( Some(first_version.clone()), - fetch_catalog_upgrade_shard_version(&persist_client, catalog_shard_id).await + fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await ); persist_cache.cfg.build_version = second_version.clone(); @@ -210,30 +208,32 @@ async fn test_version_regression() { assert_eq!( Some(second_version.clone()), - fetch_catalog_upgrade_shard_version(&persist_client, catalog_shard_id).await + fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await ); - persist_cache.cfg.build_version = first_version.clone(); - let persist_client = persist_cache - .open(PersistLocation::new_in_mem()) - .await - .expect("in-mem location is valid"); - let err = TestCatalogStateBuilder::new(persist_client.clone()) - .with_organization_id(organization_id) - .with_deploy_generation(deploy_generation) - .with_version(first_version.clone()) - .build() - .await - .expect_err("skipping versions should error"); - assert!( - matches!( - &err, - DurableCatalogError::IncompatiblePersistVersion { - found_version, - catalog_version - } - if found_version == &second_version && catalog_version == &first_version - ), - "Unexpected error: {err:?}" - ); + // NB: running the following is expected to halt the process, but we can't catch halts in tests. + // + // persist_cache.cfg.build_version = first_version.clone(); + // let persist_client = persist_cache + // .open(PersistLocation::new_in_mem()) + // .await + // .expect("in-mem location is valid"); + // let err = TestCatalogStateBuilder::new(persist_client.clone()) + // .with_organization_id(organization_id) + // .with_deploy_generation(deploy_generation) + // .with_version(first_version.clone()) + // .build() + // .await + // .expect_err("skipping versions should error"); + // assert!( + // matches!( + // &err, + // DurableCatalogError::IncompatiblePersistVersion { + // found_version, + // catalog_version + // } + // if found_version == &second_version && catalog_version == &first_version + // ), + // "Unexpected error: {err:?}" + // ); } diff --git a/src/catalog/src/durable/transaction.rs b/src/catalog/src/durable/transaction.rs index 26a3d131fe7d7..3f8422f22c4ad 100644 --- a/src/catalog/src/durable/transaction.rs +++ b/src/catalog/src/durable/transaction.rs @@ -3407,12 +3407,13 @@ where mod tests { use super::*; use mz_ore::{assert_none, assert_ok}; - - use mz_ore::now::SYSTEM_TIME; - use mz_persist_client::PersistClient; + use semver::Version; use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args}; use crate::memory; + use mz_ore::now::SYSTEM_TIME; + use mz_persist_client::cache::PersistClientCache; + use mz_persist_types::PersistLocation; #[mz_ore::test] fn test_table_transaction_simple() { @@ -3942,9 +3943,16 @@ mod tests { #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` async fn test_savepoint() { - let persist_client = PersistClient::new_for_tests().await; - let state_builder = - TestCatalogStateBuilder::new(persist_client).with_default_deploy_generation(); + const VERSION: Version = Version::new(26, 0, 0); + let mut persist_cache = PersistClientCache::new_no_metrics(); + persist_cache.cfg.build_version = VERSION; + let persist_client = persist_cache + .open(PersistLocation::new_in_mem()) + .await + .unwrap(); + let state_builder = TestCatalogStateBuilder::new(persist_client) + .with_default_deploy_generation() + .with_version(VERSION); // Initialize catalog. let _ = state_builder diff --git a/src/catalog/tests/open.rs b/src/catalog/tests/open.rs index 94320c730c6f4..28ed7c9214524 100644 --- a/src/catalog/tests/open.rs +++ b/src/catalog/tests/open.rs @@ -571,12 +571,14 @@ async fn test_open(state_builder: TestCatalogStateBuilder) { #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` async fn test_persist_unopened_deploy_generation_fencing() { - let persist_client = PersistClient::new_for_tests().await; + let mut persist_cache = PersistClientCache::new_no_metrics(); + persist_cache.cfg.build_version = semver::Version::new(0, 1, 0); + let persist_client = persist_cache + .open(PersistLocation::new_in_mem()) + .await + .unwrap(); let state_builder = TestCatalogStateBuilder::new(persist_client); - test_unopened_deploy_generation_fencing(state_builder).await; -} -async fn test_unopened_deploy_generation_fencing(state_builder: TestCatalogStateBuilder) { // Initialize catalog. let deploy_generation = 0; let version = semver::Version::new(0, 1, 0); @@ -667,10 +669,7 @@ async fn test_unopened_deploy_generation_fencing(state_builder: TestCatalogState .await .unwrap_err(); assert!( - matches!( - err, - DurableCatalogError::Fence(FenceError::DeployGeneration { .. }) - ), + matches!(err, DurableCatalogError::IncompatiblePersistVersion { .. }), "unexpected err: {err:?}" ); } @@ -906,8 +905,14 @@ async fn test_persist_version_fencing() { } testcase("0.10.0", "0.10.0", Ok(())).await; - testcase("0.10.0", "0.11.0", Ok(())).await; - testcase("0.10.0", "0.12.0", Err(())).await; + testcase("0.147.0", "0.148.1", Ok(())).await; + testcase("0.10.0", "0.148.1", Err(())).await; + testcase("0.147.0", "0.158.0", Ok(())).await; + testcase("0.147.0", "26.0.0", Ok(())).await; + testcase("0.160.0", "26.0.0", Ok(())).await; + testcase("26.0.0", "26.10.0", Ok(())).await; + testcase("26.1.0", "27.0.0", Ok(())).await; + testcase("0.147.0", "27.0.0", Err(())).await; } #[mz_ore::test(tokio::test)] diff --git a/src/catalog/tests/snapshots/debug__opened_trace.snap b/src/catalog/tests/snapshots/debug__opened_trace.snap index d4a351212007b..197566f46e153 100644 --- a/src/catalog/tests/snapshots/debug__opened_trace.snap +++ b/src/catalog/tests/snapshots/debug__opened_trace.snap @@ -2386,7 +2386,7 @@ Trace { name: "catalog_content_version", }, SettingValue { - value: "0.0.0", + value: "0.0.0+dummy", }, ), 2, diff --git a/src/catalog/tests/snapshots/open__initial_snapshot.snap b/src/catalog/tests/snapshots/open__initial_snapshot.snap index ac72663dc1b5b..0ab7b4f54b610 100644 --- a/src/catalog/tests/snapshots/open__initial_snapshot.snap +++ b/src/catalog/tests/snapshots/open__initial_snapshot.snap @@ -1,6 +1,5 @@ --- source: src/catalog/tests/open.rs -assertion_line: 510 expression: test_snapshot --- Snapshot { @@ -1461,7 +1460,7 @@ Snapshot { SettingKey { name: "catalog_content_version", }: SettingValue { - value: "0.0.0", + value: "0.0.0+dummy", }, }, source_references: {}, diff --git a/src/clusterd/Cargo.toml b/src/clusterd/Cargo.toml index 7179f2e571a8d..6bf5b60d330bb 100644 --- a/src/clusterd/Cargo.toml +++ b/src/clusterd/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "mz-clusterd" description = "Materialize's cluster server." -version = "0.165.0-dev.0" +version = "26.0.0-dev.0" edition.workspace = true rust-version.workspace = true publish = false diff --git a/src/environmentd/Cargo.toml b/src/environmentd/Cargo.toml index 93225b25c36b8..134421f68da65 100644 --- a/src/environmentd/Cargo.toml +++ b/src/environmentd/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "mz-environmentd" description = "Manages a single Materialize environment." -version = "0.165.0-dev.0" +version = "26.0.0-dev.0" authors = ["Materialize, Inc."] license = "proprietary" edition.workspace = true diff --git a/src/materialized/Cargo.toml b/src/materialized/Cargo.toml index 7442f2bfb26fc..a4a2546c289e1 100644 --- a/src/materialized/Cargo.toml +++ b/src/materialized/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "mz-materialized" description = "Materialize's unified binary." -version = "0.165.0-dev.0" +version = "26.0.0-dev.0" edition.workspace = true rust-version.workspace = true publish = false diff --git a/src/orchestratord/Cargo.toml b/src/orchestratord/Cargo.toml index db0f87ebde25f..fbe01cf17217d 100644 --- a/src/orchestratord/Cargo.toml +++ b/src/orchestratord/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "mz-orchestratord" description = "Kubernetes operator for Materialize regions" -version = "0.165.0-dev.0" +version = "26.0.0-dev.0" edition.workspace = true rust-version.workspace = true publish = false diff --git a/src/persist-client/Cargo.toml b/src/persist-client/Cargo.toml index 9b602e3c3b2bf..a078ccb340d0c 100644 --- a/src/persist-client/Cargo.toml +++ b/src/persist-client/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "mz-persist-client" description = "Client for Materialize pTVC durability system" -version = "0.165.0-dev.0" +version = "26.0.0-dev.0" edition.workspace = true rust-version.workspace = true publish = false diff --git a/src/persist-client/src/cfg.rs b/src/persist-client/src/cfg.rs index ff06d4de02c13..39ed08c67b51e 100644 --- a/src/persist-client/src/cfg.rs +++ b/src/persist-client/src/cfg.rs @@ -37,7 +37,7 @@ use crate::operators::STORAGE_SOURCE_DECODE_FUEL; use crate::read::READER_LEASE_DURATION; // Ignores the patch version -const SELF_MANAGED_VERSIONS: &[Version] = &[ +const SELF_MANAGED_VERSIONS: &[Version; 2] = &[ // 25.1 Version::new(0, 130, 0), // 25.2 @@ -99,7 +99,8 @@ const SELF_MANAGED_VERSIONS: &[Version] = &[ pub struct PersistConfig { /// Info about which version of the code is running. pub build_version: Version, - /// Hostname of this persist user. Stored in state and used for debugging. + /// An opaque string describing the host of this persist client. + /// Stored in state and used for debugging. pub hostname: String, /// Whether this persist instance is running in a "cc" sized cluster. pub is_cc_active: bool, @@ -181,7 +182,13 @@ impl PersistConfig { // separate --log-prefix into --service-name and --enable-log-prefix // options, where the first is always provided and the second is // conditionally enabled by the process orchestrator. - hostname: std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_owned()), + hostname: { + use std::fmt::Write; + let mut name = std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_owned()); + write!(&mut name, " {}", build_info.version) + .expect("writing to string should not fail"); + name + }, } } @@ -634,87 +641,57 @@ impl BlobKnobs for PersistConfig { } } -pub fn check_data_version(code_version: &Version, data_version: &Version) -> Result<(), String> { - check_data_version_with_self_managed_versions(code_version, data_version, SELF_MANAGED_VERSIONS) +/// If persist gets some encoded ProtoState from the future (e.g. two versions of +/// code are running simultaneously against the same shard), it might have a +/// field that the current code doesn't know about. This would be silently +/// discarded at proto decode time: our Proto library can't handle unknown fields, +/// and old versions of code might not be able to respect the semantics of the new +/// fields even if they did. +/// +/// [1]: https://developers.google.com/protocol-buffers/docs/proto3#unknowns +/// +/// To detect the bad situation and disallow it, we tag every version of state +/// written to consensus with the version of code it's compatible with. Then at +/// decode time, we're able to compare the current version against any we receive +/// and assert as necessary. The current version is typically the version of code +/// used to write the state, but it may be lower when code is intentionally emulating +/// an older version during eg. a graceful upgrade process. +/// +/// We could do the same for blob data, but it shouldn't be necessary. Any blob +/// data we read is going to be because we fetched it using a pointer stored in +/// some persist state. If we can handle the state, we can handle the blobs it +/// references, too. +pub fn code_can_read_data(code_version: &Version, data_version: &Version) -> bool { + // For now, Persist can read arbitrarily old state data. + // We expect to add a floor to this in future versions. + code_version.cmp_precedence(data_version).is_ge() } -// If persist gets some encoded ProtoState from the future (e.g. two versions of -// code are running simultaneously against the same shard), it might have a -// field that the current code doesn't know about. This would be silently -// discarded at proto decode time. Unknown Fields [1] are a tool we can use in -// the future to help deal with this, but in the short-term, it's best to keep -// the persist read-modify-CaS loop simple for as long as we can get away with -// it (i.e. until we have to offer the ability to do rollbacks). -// -// [1]: https://developers.google.com/protocol-buffers/docs/proto3#unknowns -// -// To detect the bad situation and disallow it, we tag every version of state -// written to consensus with the version of code used to encode it. Then at -// decode time, we're able to compare the current version against any we receive -// and assert as necessary. -// -// Initially we allow any from the past (permanent backward compatibility) and -// one minor version into the future (forward compatibility). This allows us to -// run two versions concurrently for rolling upgrades. We'll have to revisit -// this logic if/when we start using major versions other than 0. -// -// We could do the same for blob data, but it shouldn't be necessary. Any blob -// data we read is going to be because we fetched it using a pointer stored in -// some persist state. If we can handle the state, we can handle the blobs it -// references, too. -pub(crate) fn check_data_version_with_self_managed_versions( - code_version: &Version, - data_version: &Version, - self_managed_versions: &[Version], -) -> Result<(), String> { - // Allow upgrades specifically between consecutive Self-Managed releases. - let base_code_version = Version { - patch: 0, - ..code_version.clone() - }; - let base_data_version = Version { - patch: 0, - ..data_version.clone() - }; - if data_version >= code_version { - for window in self_managed_versions.windows(2) { - if base_code_version == window[0] && base_data_version <= window[1] { - return Ok(()); - } - } - - if let Some(last) = self_managed_versions.last() { - if base_code_version == *last - // kind of arbitrary, but just ensure we don't accidentally - // upgrade too far (the previous check should ensure that a - // new version won't take over from a too-old previous - // version, but we want to make sure the other side also - // doesn't get confused) - && base_data_version - .minor - .saturating_sub(base_code_version.minor) - < 40 - { - return Ok(()); - } - } +/// Can the given version of the code generate data that older versions can understand? +/// Imagine the case of eg. garbage collection after a version upgrade... we may need to read old +/// diffs to be able to find blobs to delete, even if we no longer have code to generate data in +/// that format. +pub fn code_can_write_data(code_version: &Version, data_version: &Version) -> bool { + if !code_can_read_data(code_version, data_version) { + return false; } - // Allow one minor version of forward compatibility. We could avoid the - // clone with some nested comparisons of the semver fields, but this code - // isn't particularly performance sensitive and I find this impl easier to - // reason about. - let max_allowed_data_version = Version::new( - code_version.major, - code_version.minor.saturating_add(1), - u64::MAX, - ); - - if &max_allowed_data_version < data_version { - Err(format!( - "{code_version} received persist state from the future {data_version}", - )) + if code_version.major == 0 && code_version.minor <= SELF_MANAGED_VERSIONS[1].minor { + // This code was added well after the last ad-hoc version was released, + // so we don't strictly model compatibility with earlier releases. + true + } else if code_version.major == 0 { + // Self-managed versions 25.2+ must be upgradeable from 25.1+. + SELF_MANAGED_VERSIONS[0] + .cmp_precedence(data_version) + .is_le() + } else if code_version.major <= 26 { + // Versions 26.x must be upgradeable from the last pre-1.0 release. + SELF_MANAGED_VERSIONS[1] + .cmp_precedence(data_version) + .is_le() } else { - Ok(()) + // Otherwise, the data must be from at earliest the _previous_ major version. + code_version.major - 1 <= data_version.major } } diff --git a/src/persist-client/src/cli/admin.rs b/src/persist-client/src/cli/admin.rs index 8d32cba8604fa..a024bec5c0734 100644 --- a/src/persist-client/src/cli/admin.rs +++ b/src/persist-client/src/cli/admin.rs @@ -590,9 +590,9 @@ where // code with this logic. let safe_version_change = match (commit, expected_version) { // We never actually write out state changes, so increasing the version is okay. - (false, _) => cfg.build_version >= state.applier_version, + (false, _) => cfg.build_version >= state.collections.version, // If the versions match that's okay because any commits won't change it. - (true, None) => cfg.build_version == state.applier_version, + (true, None) => cfg.build_version == state.collections.version, // !!DANGER ZONE!! (true, Some(expected)) => { // If we're not _extremely_ careful, the persistcli could make shards unreadable by @@ -602,7 +602,7 @@ where // We only allow a mismatch in version if we provided the expected version to the // command, and the expected version is less than the current build, which // indicates this is an old shard. - state.applier_version == expected && expected <= cfg.build_version + state.collections.version == expected && expected <= cfg.build_version } }; if !safe_version_change { @@ -610,7 +610,7 @@ where return Err(anyhow!( "version of this tool {} does not match version of state {} when --commit is {commit}. bailing so we don't corrupt anything", cfg.build_version, - state.applier_version + state.collections.version )); } break; diff --git a/src/persist-client/src/error.rs b/src/persist-client/src/error.rs index c2d2b6188dea7..e97fbf56b7a02 100644 --- a/src/persist-client/src/error.rs +++ b/src/persist-client/src/error.rs @@ -19,6 +19,11 @@ use crate::ShardId; #[derive(Debug)] #[cfg_attr(any(test, debug_assertions), derive(PartialEq))] pub enum InvalidUsage { + /// The data format of the shard is not compatible with the current code version. + IncompatibleVersion { + /// The version of the metadata. + version: semver::Version, + }, /// Append bounds were invalid InvalidBounds { /// The given lower bound @@ -84,6 +89,9 @@ pub enum InvalidUsage { impl std::fmt::Display for InvalidUsage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { + InvalidUsage::IncompatibleVersion { version } => { + write!(f, "incompatible with data version {}", version) + } InvalidUsage::InvalidBounds { lower, upper } => { write!(f, "invalid bounds [{:?}, {:?})", lower, upper) } diff --git a/src/persist-client/src/internal/apply.rs b/src/persist-client/src/internal/apply.rs index 163a9d554c201..5362f91babb12 100644 --- a/src/persist-client/src/internal/apply.rs +++ b/src/persist-client/src/internal/apply.rs @@ -15,15 +15,6 @@ use std::ops::ControlFlow::{self, Break, Continue}; use std::sync::Arc; use std::time::Instant; -use differential_dataflow::difference::Monoid; -use differential_dataflow::lattice::Lattice; -use mz_ore::cast::CastFrom; -use mz_persist::location::{CaSResult, Indeterminate, SeqNo, VersionedData}; -use mz_persist_types::schema::SchemaId; -use mz_persist_types::{Codec, Codec64}; -use timely::progress::{Antichain, Timestamp}; -use tracing::debug; - use crate::cache::{LockingTypedState, StateCache}; use crate::error::{CodecMismatch, InvalidUsage}; use crate::internal::gc::GcReq; @@ -43,7 +34,16 @@ use crate::internal::watch::StateWatch; use crate::read::LeasedReaderId; use crate::rpc::{PUBSUB_PUSH_DIFF_ENABLED, PubSubSender}; use crate::schema::SchemaCache; -use crate::{Diagnostics, PersistConfig, ShardId}; +use crate::{Diagnostics, PersistConfig, ShardId, cfg}; +use differential_dataflow::difference::Monoid; +use differential_dataflow::lattice::Lattice; +use mz_ore::cast::CastFrom; +use mz_ore::soft_assert_or_log; +use mz_persist::location::{CaSResult, Indeterminate, SeqNo, VersionedData}; +use mz_persist_types::schema::SchemaId; +use mz_persist_types::{Codec, Codec64}; +use timely::progress::{Antichain, Timestamp}; +use tracing::debug; /// An applier of persist commands. /// @@ -449,6 +449,15 @@ where work_ret, } = next_state; + { + let build_version = &cfg.build_version; + let state_version = &state.state.collections.version; + soft_assert_or_log!( + cfg::code_can_write_data(build_version, state_version), + "current version {build_version} does not support state format {state_version}" + ); + } + // SUBTLE! Unlike the other consensus and blob uses, we can't // automatically retry indeterminate ExternalErrors here. However, // if the state change itself is _idempotent_, then we're free to diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index d15f43f6d33df..320a031a1e81f 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -202,14 +202,14 @@ pub(crate) fn parse_id(id_prefix: &str, id_type: &str, encoded: &str) -> Result< Ok(*uuid.as_bytes()) } -pub(crate) fn check_data_version(code_version: &Version, data_version: &Version) { - if let Err(msg) = cfg::check_data_version(code_version, data_version) { +pub(crate) fn assert_code_can_read_data(code_version: &Version, data_version: &Version) { + if !cfg::code_can_read_data(code_version, data_version) { // We can't catch halts, so panic in test, so we can get unit test // coverage. if cfg!(test) { - panic!("{msg}"); + panic!("code at version {code_version} cannot read data with version {data_version}"); } else { - halt!("{msg}"); + halt!("code at version {code_version} cannot read data with version {data_version}"); } } } @@ -324,7 +324,7 @@ impl StateDiff { // case, fail loudly. .expect("internal error: invalid encoded state"); let diff = Self::from_proto(proto).expect("internal error: invalid encoded state"); - check_data_version(build_version, &diff.applier_version); + assert_code_can_read_data(build_version, &diff.applier_version); diff } } @@ -736,7 +736,7 @@ impl UntypedState { let state = Rollup::from_proto(proto) .expect("internal error: invalid encoded state") .state; - check_data_version(build_version, &state.state.applier_version); + assert_code_can_read_data(build_version, &state.state.collections.version); state } } @@ -864,7 +864,7 @@ impl RustType for InlinedDiffs { impl RustType for Rollup { fn into_proto(&self) -> ProtoRollup { ProtoRollup { - applier_version: self.state.state.applier_version.to_string(), + applier_version: self.state.state.collections.version.to_string(), shard_id: self.state.state.shard_id.into_proto(), seqno: self.state.state.seqno.into_proto(), walltime_ms: self.state.state.walltime_ms.into_proto(), @@ -972,6 +972,7 @@ impl RustType for Rollup { .transpose()?; let active_gc = x.active_gc.map(|gc| gc.into_rust()).transpose()?; let collections = StateCollections { + version: applier_version.clone(), rollups, active_rollup, active_gc, @@ -983,7 +984,6 @@ impl RustType for Rollup { trace: x.trace.into_rust_if_some("trace")?, }; let state = State { - applier_version, shard_id: x.shard_id.into_rust()?, seqno: x.seqno.into_rust()?, walltime_ms: x.walltime_ms, @@ -2157,161 +2157,6 @@ mod tests { proptest!(|(state in any_state::(0..3))| testcase(state)); } - #[mz_ore::test] - fn check_data_versions_with_self_managed_versions() { - #[track_caller] - fn testcase( - code: &str, - data: &str, - self_managed_versions: &[Version], - expected: Result<(), ()>, - ) { - let code = Version::parse(code).unwrap(); - let data = Version::parse(data).unwrap(); - let actual = cfg::check_data_version_with_self_managed_versions( - &code, - &data, - self_managed_versions, - ) - .map_err(|_| ()); - assert_eq!(actual, expected); - } - - let none = []; - let one = [Version::new(0, 130, 0)]; - let two = [Version::new(0, 130, 0), Version::new(0, 140, 0)]; - let three = [ - Version::new(0, 130, 0), - Version::new(0, 140, 0), - Version::new(0, 150, 0), - ]; - - testcase("0.130.0", "0.128.0", &none, Ok(())); - testcase("0.130.0", "0.129.0", &none, Ok(())); - testcase("0.130.0", "0.130.0", &none, Ok(())); - testcase("0.130.0", "0.130.1", &none, Ok(())); - testcase("0.130.1", "0.130.0", &none, Ok(())); - testcase("0.130.0", "0.131.0", &none, Ok(())); - testcase("0.130.0", "0.132.0", &none, Err(())); - - testcase("0.129.0", "0.127.0", &none, Ok(())); - testcase("0.129.0", "0.128.0", &none, Ok(())); - testcase("0.129.0", "0.129.0", &none, Ok(())); - testcase("0.129.0", "0.129.1", &none, Ok(())); - testcase("0.129.1", "0.129.0", &none, Ok(())); - testcase("0.129.0", "0.130.0", &none, Ok(())); - testcase("0.129.0", "0.131.0", &none, Err(())); - - testcase("0.130.0", "0.128.0", &one, Ok(())); - testcase("0.130.0", "0.129.0", &one, Ok(())); - testcase("0.130.0", "0.130.0", &one, Ok(())); - testcase("0.130.0", "0.130.1", &one, Ok(())); - testcase("0.130.1", "0.130.0", &one, Ok(())); - testcase("0.130.0", "0.131.0", &one, Ok(())); - testcase("0.130.0", "0.132.0", &one, Ok(())); - - testcase("0.129.0", "0.127.0", &one, Ok(())); - testcase("0.129.0", "0.128.0", &one, Ok(())); - testcase("0.129.0", "0.129.0", &one, Ok(())); - testcase("0.129.0", "0.129.1", &one, Ok(())); - testcase("0.129.1", "0.129.0", &one, Ok(())); - testcase("0.129.0", "0.130.0", &one, Ok(())); - testcase("0.129.0", "0.131.0", &one, Err(())); - - testcase("0.131.0", "0.129.0", &one, Ok(())); - testcase("0.131.0", "0.130.0", &one, Ok(())); - testcase("0.131.0", "0.131.0", &one, Ok(())); - testcase("0.131.0", "0.131.1", &one, Ok(())); - testcase("0.131.1", "0.131.0", &one, Ok(())); - testcase("0.131.0", "0.132.0", &one, Ok(())); - testcase("0.131.0", "0.133.0", &one, Err(())); - - testcase("0.130.0", "0.128.0", &two, Ok(())); - testcase("0.130.0", "0.129.0", &two, Ok(())); - testcase("0.130.0", "0.130.0", &two, Ok(())); - testcase("0.130.0", "0.130.1", &two, Ok(())); - testcase("0.130.1", "0.130.0", &two, Ok(())); - testcase("0.130.0", "0.131.0", &two, Ok(())); - testcase("0.130.0", "0.132.0", &two, Ok(())); - testcase("0.130.0", "0.135.0", &two, Ok(())); - testcase("0.130.0", "0.138.0", &two, Ok(())); - testcase("0.130.0", "0.139.0", &two, Ok(())); - testcase("0.130.0", "0.140.0", &two, Ok(())); - testcase("0.130.9", "0.140.0", &two, Ok(())); - testcase("0.130.0", "0.140.1", &two, Ok(())); - testcase("0.130.3", "0.140.1", &two, Ok(())); - testcase("0.130.3", "0.140.9", &two, Ok(())); - testcase("0.130.0", "0.141.0", &two, Err(())); - testcase("0.129.0", "0.133.0", &two, Err(())); - testcase("0.129.0", "0.140.0", &two, Err(())); - testcase("0.131.0", "0.133.0", &two, Err(())); - testcase("0.131.0", "0.140.0", &two, Err(())); - - testcase("0.130.0", "0.128.0", &three, Ok(())); - testcase("0.130.0", "0.129.0", &three, Ok(())); - testcase("0.130.0", "0.130.0", &three, Ok(())); - testcase("0.130.0", "0.130.1", &three, Ok(())); - testcase("0.130.1", "0.130.0", &three, Ok(())); - testcase("0.130.0", "0.131.0", &three, Ok(())); - testcase("0.130.0", "0.132.0", &three, Ok(())); - testcase("0.130.0", "0.135.0", &three, Ok(())); - testcase("0.130.0", "0.138.0", &three, Ok(())); - testcase("0.130.0", "0.139.0", &three, Ok(())); - testcase("0.130.0", "0.140.0", &three, Ok(())); - testcase("0.130.9", "0.140.0", &three, Ok(())); - testcase("0.130.0", "0.140.1", &three, Ok(())); - testcase("0.130.3", "0.140.1", &three, Ok(())); - testcase("0.130.3", "0.140.9", &three, Ok(())); - testcase("0.130.0", "0.141.0", &three, Err(())); - testcase("0.129.0", "0.133.0", &three, Err(())); - testcase("0.129.0", "0.140.0", &three, Err(())); - testcase("0.131.0", "0.133.0", &three, Err(())); - testcase("0.131.0", "0.140.0", &three, Err(())); - testcase("0.130.0", "0.150.0", &three, Err(())); - - testcase("0.140.0", "0.138.0", &three, Ok(())); - testcase("0.140.0", "0.139.0", &three, Ok(())); - testcase("0.140.0", "0.140.0", &three, Ok(())); - testcase("0.140.0", "0.140.1", &three, Ok(())); - testcase("0.140.1", "0.140.0", &three, Ok(())); - testcase("0.140.0", "0.141.0", &three, Ok(())); - testcase("0.140.0", "0.142.0", &three, Ok(())); - testcase("0.140.0", "0.145.0", &three, Ok(())); - testcase("0.140.0", "0.148.0", &three, Ok(())); - testcase("0.140.0", "0.149.0", &three, Ok(())); - testcase("0.140.0", "0.150.0", &three, Ok(())); - testcase("0.140.9", "0.150.0", &three, Ok(())); - testcase("0.140.0", "0.150.1", &three, Ok(())); - testcase("0.140.3", "0.150.1", &three, Ok(())); - testcase("0.140.3", "0.150.9", &three, Ok(())); - testcase("0.140.0", "0.151.0", &three, Err(())); - testcase("0.139.0", "0.143.0", &three, Err(())); - testcase("0.139.0", "0.150.0", &three, Err(())); - testcase("0.141.0", "0.143.0", &three, Err(())); - testcase("0.141.0", "0.150.0", &three, Err(())); - - testcase("0.150.0", "0.148.0", &three, Ok(())); - testcase("0.150.0", "0.149.0", &three, Ok(())); - testcase("0.150.0", "0.150.0", &three, Ok(())); - testcase("0.150.0", "0.150.1", &three, Ok(())); - testcase("0.150.1", "0.150.0", &three, Ok(())); - testcase("0.150.0", "0.151.0", &three, Ok(())); - testcase("0.150.0", "0.152.0", &three, Ok(())); - testcase("0.150.0", "0.155.0", &three, Ok(())); - testcase("0.150.0", "0.158.0", &three, Ok(())); - testcase("0.150.0", "0.159.0", &three, Ok(())); - testcase("0.150.0", "0.160.0", &three, Ok(())); - testcase("0.150.9", "0.160.0", &three, Ok(())); - testcase("0.150.0", "0.160.1", &three, Ok(())); - testcase("0.150.3", "0.160.1", &three, Ok(())); - testcase("0.150.3", "0.160.9", &three, Ok(())); - testcase("0.150.0", "0.161.0", &three, Ok(())); - testcase("0.149.0", "0.153.0", &three, Err(())); - testcase("0.149.0", "0.160.0", &three, Err(())); - testcase("0.151.0", "0.153.0", &three, Err(())); - testcase("0.151.0", "0.160.0", &three, Err(())); - } - #[mz_ore::test] fn check_data_versions() { #[track_caller] @@ -2319,48 +2164,55 @@ mod tests { let code = Version::parse(code).unwrap(); let data = Version::parse(data).unwrap(); #[allow(clippy::disallowed_methods)] - let actual = - std::panic::catch_unwind(|| check_data_version(&code, &data)).map_err(|_| ()); - assert_eq!(actual, expected); + let actual = cfg::code_can_write_data(&code, &data) + .then_some(()) + .ok_or(()); + assert_eq!(actual, expected, "data at {data} read by code {code}"); } - testcase("0.10.0-dev", "0.10.0-dev", Ok(())); - testcase("0.10.0-dev", "0.10.0", Ok(())); + testcase("0.160.0-dev", "0.160.0-dev", Ok(())); + testcase("0.160.0-dev", "0.160.0", Err(())); // Note: Probably useful to let tests use two arbitrary shas on main, at // the very least for things like git bisect. - testcase("0.10.0-dev", "0.11.0-dev", Ok(())); - testcase("0.10.0-dev", "0.11.0", Ok(())); - testcase("0.10.0-dev", "0.12.0-dev", Err(())); - testcase("0.10.0-dev", "0.12.0", Err(())); - testcase("0.10.0-dev", "0.13.0-dev", Err(())); - - testcase("0.10.0", "0.8.0-dev", Ok(())); - testcase("0.10.0", "0.8.0", Ok(())); - testcase("0.10.0", "0.9.0-dev", Ok(())); - testcase("0.10.0", "0.9.0", Ok(())); - testcase("0.10.0", "0.10.0-dev", Ok(())); - testcase("0.10.0", "0.10.0", Ok(())); - // Note: This is what it would look like to run a version of the catalog - // upgrade checker built from main. - testcase("0.10.0", "0.11.0-dev", Ok(())); - testcase("0.10.0", "0.11.0", Ok(())); - testcase("0.10.0", "0.11.1", Ok(())); - testcase("0.10.0", "0.11.1000000", Ok(())); - testcase("0.10.0", "0.12.0-dev", Err(())); - testcase("0.10.0", "0.12.0", Err(())); - testcase("0.10.0", "0.13.0-dev", Err(())); - - testcase("0.10.1", "0.9.0", Ok(())); - testcase("0.10.1", "0.10.0", Ok(())); - testcase("0.10.1", "0.11.0", Ok(())); - testcase("0.10.1", "0.11.1", Ok(())); - testcase("0.10.1", "0.11.100", Ok(())); - - // This is probably a bad idea (seems as if we've downgraded from - // running v0.10.1 to v0.10.0, an earlier patch version of the same - // minor version), but not much we can do, given the `state_version = - // max(code_version, prev_state_version)` logic we need to prevent - // rolling back an arbitrary number of versions. - testcase("0.10.0", "0.10.1", Ok(())); + testcase("0.160.0-dev", "0.161.0-dev", Err(())); + testcase("0.160.0-dev", "0.161.0", Err(())); + testcase("0.160.0-dev", "0.162.0-dev", Err(())); + testcase("0.160.0-dev", "0.162.0", Err(())); + testcase("0.160.0-dev", "0.163.0-dev", Err(())); + + testcase("0.160.0", "0.158.0-dev", Ok(())); + testcase("0.160.0", "0.158.0", Ok(())); + testcase("0.160.0", "0.159.0-dev", Ok(())); + testcase("0.160.0", "0.159.0", Ok(())); + testcase("0.160.0", "0.160.0-dev", Ok(())); + testcase("0.160.0", "0.160.0", Ok(())); + + testcase("0.160.0", "0.161.0-dev", Err(())); + testcase("0.160.0", "0.161.0", Err(())); + testcase("0.160.0", "0.161.1", Err(())); + testcase("0.160.0", "0.161.1000000", Err(())); + testcase("0.160.0", "0.162.0-dev", Err(())); + testcase("0.160.0", "0.162.0", Err(())); + testcase("0.160.0", "0.163.0-dev", Err(())); + + testcase("0.160.1", "0.159.0", Ok(())); + testcase("0.160.1", "0.160.0", Ok(())); + testcase("0.160.1", "0.161.0", Err(())); + testcase("0.160.1", "0.161.1", Err(())); + testcase("0.160.1", "0.161.100", Err(())); + testcase("0.160.0", "0.160.1", Err(())); + + testcase("0.160.1", "26.0.0", Err(())); + testcase("26.0.0", "0.160.1", Ok(())); + testcase("26.2.0", "0.160.1", Ok(())); + testcase("26.200.200", "0.160.1", Ok(())); + + testcase("27.0.0", "0.160.1", Err(())); + testcase("27.0.0", "0.16000.1", Err(())); + testcase("27.0.0", "26.0.1", Ok(())); + testcase("27.1000.100", "26.0.1", Ok(())); + testcase("28.0.0", "26.0.1", Err(())); + testcase("28.0.0", "26.1000.1", Err(())); + testcase("28.0.0", "27.0.0", Ok(())); } } diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 562804acdcb76..ddc66853e5a2f 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -10,7 +10,7 @@ //! Implementation of the persist state machine. use std::fmt::Debug; -use std::ops::ControlFlow::{self, Continue}; +use std::ops::ControlFlow::{self, Break, Continue}; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; @@ -19,12 +19,12 @@ use differential_dataflow::lattice::Lattice; use futures::FutureExt; use futures::future::{self, BoxFuture}; use mz_dyncfg::{Config, ConfigSet}; -use mz_ore::assert_none; use mz_ore::cast::CastFrom; use mz_ore::error::ErrorExt; #[allow(unused_imports)] // False positive. use mz_ore::fmt::FormatBuffer; use mz_ore::task::JoinHandle; +use mz_ore::{assert_none, soft_assert_no_log}; use mz_persist::location::{ExternalError, Indeterminate, SeqNo}; use mz_persist::retry::Retry; use mz_persist_types::schema::SchemaId; @@ -189,6 +189,35 @@ where (removed_rollup_seqnos, maintenance) } + /// Attempt to upgrade the state to the latest version. If that's not possible, return the + /// actual data version of the shard. + pub async fn upgrade_version(&self) -> Result { + let metrics = Arc::clone(&self.applier.metrics); + let (_seqno, upgrade_result, maintenance) = self + .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, cfg, state| { + if state.version <= cfg.build_version { + // This would be the place to remove any deprecated items from state, now + // that we're dropping compatibility with any previous versions. + state.version = cfg.build_version.clone(); + Continue(Ok(())) + } else { + Break(NoOpStateTransition(Err(state.version.clone()))) + } + }) + .await; + + match upgrade_result { + Ok(()) => Ok(maintenance), + Err(version) => { + soft_assert_no_log!( + maintenance.is_empty(), + "should not generate maintenance on failed upgrade" + ); + Err(version) + } + } + } + pub async fn register_leased_reader( &self, reader_id: &LeasedReaderId, @@ -2567,14 +2596,16 @@ pub mod tests { use mz_ore::task::spawn; use mz_persist::intercept::{InterceptBlob, InterceptHandle}; use mz_persist::location::SeqNo; + use mz_persist_types::PersistLocation; + use semver::Version; use timely::progress::Antichain; - use crate::ShardId; use crate::batch::BatchBuilderConfig; use crate::cache::StateCache; use crate::internal::gc::{GarbageCollector, GcReq}; use crate::internal::state::{HandleDebugState, ROLLUP_THRESHOLD}; - use crate::tests::new_test_client; + use crate::tests::{new_test_client, new_test_client_cache}; + use crate::{Diagnostics, PersistClient, ShardId}; #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))] #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance` @@ -2722,4 +2753,59 @@ pub mod tests { // state after an upper mismatch then this call would (incorrectly) fail write2.expect_compare_and_append(&data[1..2], 2, 3).await; } + + #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))] + #[cfg_attr(miri, ignore)] + async fn version_upgrade(dyncfgs: ConfigUpdates) { + let mut cache = new_test_client_cache(&dyncfgs); + cache.cfg.build_version = Version::new(26, 1, 0); + let shard_id = ShardId::new(); + + async fn fetch_catalog_upgrade_shard_version( + persist_client: &PersistClient, + upgrade_shard_id: ShardId, + ) -> Option { + let shard_state = persist_client + .inspect_shard::(&upgrade_shard_id) + .await + .ok()?; + let json_state = serde_json::to_value(shard_state).expect("state serialization error"); + let upgrade_version = json_state + .get("applier_version") + .cloned() + .expect("missing applier_version"); + let upgrade_version = + serde_json::from_value(upgrade_version).expect("version deserialization error"); + Some(upgrade_version) + } + + cache.cfg.build_version = Version::new(26, 1, 0); + let client = cache.open(PersistLocation::new_in_mem()).await.unwrap(); + let (_, mut reader) = client.expect_open::(shard_id).await; + reader.downgrade_since(&Antichain::from_elem(1)).await; + assert_eq!( + fetch_catalog_upgrade_shard_version(&client, shard_id).await, + Some(Version::new(26, 1, 0)), + ); + + // Merely opening and operating on the shard at a new version doesn't bump version... + cache.cfg.build_version = Version::new(27, 1, 0); + let client = cache.open(PersistLocation::new_in_mem()).await.unwrap(); + let (_, mut reader) = client.expect_open::(shard_id).await; + reader.downgrade_since(&Antichain::from_elem(2)).await; + assert_eq!( + fetch_catalog_upgrade_shard_version(&client, shard_id).await, + Some(Version::new(26, 1, 0)), + ); + + // ...but an explicit call will. + client + .upgrade_version::(shard_id, Diagnostics::for_tests()) + .await + .unwrap(); + assert_eq!( + fetch_catalog_upgrade_shard_version(&client, shard_id).await, + Some(Version::new(27, 1, 0)), + ); + } } diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs index f2fe790f5faf5..fc76d66551135 100644 --- a/src/persist-client/src/internal/state.rs +++ b/src/persist-client/src/internal/state.rs @@ -1304,6 +1304,11 @@ pub struct NoOpStateTransition(pub T); #[derive(Debug, Clone)] #[cfg_attr(any(test, debug_assertions), derive(PartialEq))] pub struct StateCollections { + /// The version of this state. This is typically identical to the version of the code + /// that wrote it, but may diverge during 0dt upgrades and similar operations when a + /// new version of code is intentionally interoperating with an older state format. + pub(crate) version: Version, + // - Invariant: `<= all reader.since` // - Invariant: Doesn't regress across state versions. pub(crate) last_gc_req: SeqNo, @@ -2221,7 +2226,6 @@ where #[derive(Debug)] #[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))] pub struct State { - pub(crate) applier_version: semver::Version, pub(crate) shard_id: ShardId, pub(crate) seqno: SeqNo, @@ -2251,10 +2255,9 @@ pub struct TypedState { impl TypedState { #[cfg(any(test, debug_assertions))] - pub(crate) fn clone(&self, applier_version: Version, hostname: String) -> Self { + pub(crate) fn clone(&self, hostname: String) -> Self { TypedState { state: State { - applier_version, shard_id: self.shard_id.clone(), seqno: self.seqno.clone(), walltime_ms: self.walltime_ms, @@ -2268,7 +2271,6 @@ impl TypedState { pub(crate) fn clone_for_rollup(&self) -> Self { TypedState { state: State { - applier_version: self.applier_version.clone(), shard_id: self.shard_id.clone(), seqno: self.seqno.clone(), walltime_ms: self.walltime_ms, @@ -2335,12 +2337,12 @@ where walltime_ms: u64, ) -> Self { let state = State { - applier_version, shard_id, seqno: SeqNo::minimum(), walltime_ms, hostname, collections: StateCollections { + version: applier_version, last_gc_req: SeqNo::minimum(), rollups: BTreeMap::new(), active_rollup: None, @@ -2366,19 +2368,15 @@ where where WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections) -> ControlFlow, { - // Now that we support one minor version of forward compatibility, tag - // each version of state with the _max_ version of code that has ever - // contributed to it. Otherwise, we'd erroneously allow rolling back an - // arbitrary number of versions if they were done one-by-one. - let new_applier_version = std::cmp::max(&self.applier_version, &cfg.build_version); + // We do not increment the version by default, though work_fn can if it chooses to. let mut new_state = State { - applier_version: new_applier_version.clone(), shard_id: self.shard_id, seqno: self.seqno.next(), walltime_ms: (cfg.now)(), hostname: cfg.hostname.clone(), collections: self.collections.clone(), }; + // Make sure walltime_ms is strictly increasing, in case clocks are // offset. if new_state.walltime_ms <= self.walltime_ms { @@ -2749,13 +2747,13 @@ fn serialize_diffs_sum(val: &Option<[u8; 8]>, s: S) -> Result Serialize for State { fn serialize(&self, s: S) -> Result { let State { - applier_version, shard_id, seqno, walltime_ms, hostname, collections: StateCollections { + version: applier_version, last_gc_req, rollups, active_rollup, @@ -2835,12 +2833,12 @@ pub(crate) mod tests { use proptest::strategy::ValueTree; use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval}; - use crate::PersistLocation; use crate::cache::PersistClientCache; use crate::internal::encoding::any_some_lazy_part_stats; use crate::internal::paths::RollupId; use crate::internal::trace::tests::any_trace; use crate::tests::new_test_client_cache; + use crate::{Diagnostics, PersistLocation}; use super::*; @@ -3137,12 +3135,12 @@ pub(crate) mod tests { (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup), (active_gc, leased_readers, critical_readers, writers, schemas, trace), )| State { - applier_version: semver::Version::new(1, 2, 3), shard_id, seqno, walltime_ms, hostname, collections: StateCollections { + version: Version::new(1, 2, 3), last_gc_req, rollups, active_rollup, @@ -4382,6 +4380,10 @@ pub(crate) mod tests { let client = clients.open(PersistLocation::new_in_mem()).await.unwrap(); // Run in a task so we can catch the panic. mz_ore::task::spawn(|| version.to_string(), async move { + let () = client + .upgrade_version::(shard_id, Diagnostics::for_tests()) + .await + .expect("valid usage"); let (mut write, _) = client.expect_open::(shard_id).await; let current = *write.upper().as_option().unwrap(); // Do a write so that we tag the state with the version. @@ -4400,9 +4402,9 @@ pub(crate) mod tests { let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await; assert_ok!(res); - // Downgrade to v0.10.0 is allowed. + // Downgrade to v0.10.0 is no longer allowed. let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await; - assert_ok!(res); + assert!(res.unwrap_err().is_panic()); // Downgrade to v0.9.0 is _NOT_ allowed. let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await; diff --git a/src/persist-client/src/internal/state_diff.rs b/src/persist-client/src/internal/state_diff.rs index 4f542e06b6224..a3ae09485e7de 100644 --- a/src/persist-client/src/internal/state_diff.rs +++ b/src/persist-client/src/internal/state_diff.rs @@ -146,13 +146,13 @@ impl StateDiff { // Deconstruct from and to so we get a compile failure if new // fields are added. let State { - applier_version: _, shard_id: from_shard_id, seqno: from_seqno, hostname: from_hostname, walltime_ms: _, // Intentionally unused collections: StateCollections { + version: _, last_gc_req: from_last_gc_req, rollups: from_rollups, active_rollup: from_active_rollup, @@ -165,13 +165,13 @@ impl StateDiff { }, } = from; let State { - applier_version: to_applier_version, shard_id: to_shard_id, seqno: to_seqno, walltime_ms: to_walltime_ms, hostname: to_hostname, collections: StateCollections { + version: to_applier_version, last_gc_req: to_last_gc_req, rollups: to_rollups, active_rollup: to_active_rollup, @@ -319,10 +319,7 @@ impl StateDiff { use crate::internal::state::ProtoStateDiff; - let mut roundtrip_state = from_state.clone( - from_state.applier_version.clone(), - from_state.hostname.clone(), - ); + let mut roundtrip_state = from_state.clone(from_state.hostname.clone()); roundtrip_state.apply_diff(metrics, diff.clone())?; if &roundtrip_state != to_state { @@ -397,7 +394,8 @@ impl State { // issues that may arise from diff application. We pass along the original // Bytes it decoded from just so we can decode in this error path, while // avoiding any extraneous clones in the expected Ok path. - let diff = StateDiff::::decode(&self.applier_version, data); + // FIXME: this passes the state version but the method requires the build version. + let diff = StateDiff::::decode(&self.collections.version, data); panic!( "state diff should apply cleanly: {} diff {:?} state {:?}", err, diff, self @@ -446,7 +444,6 @@ impl State { )); } self.seqno = diff_seqno_to; - self.applier_version = diff_applier_version; self.walltime_ms = diff_walltime_ms; force_apply_diffs_single( &self.shard_id, @@ -460,6 +457,7 @@ impl State { // Deconstruct collections so we get a compile failure if new fields are // added. let StateCollections { + version, last_gc_req, rollups, active_rollup, @@ -471,6 +469,7 @@ impl State { trace, } = &mut self.collections; + *version = diff_applier_version; apply_diffs_map("rollups", diff_rollups, rollups)?; apply_diffs_single("last_gc_req", diff_last_gc_req, last_gc_req)?; apply_diffs_single_option("active_rollup", diff_active_rollup, active_rollup)?; diff --git a/src/persist-client/src/internal/state_versions.rs b/src/persist-client/src/internal/state_versions.rs index fb620931dcadf..d236102710987 100644 --- a/src/persist-client/src/internal/state_versions.rs +++ b/src/persist-client/src/internal/state_versions.rs @@ -1074,7 +1074,6 @@ impl StateVersionsIter { pub fn into_rollup_proto_without_diffs(&self) -> impl serde::Serialize + use { Rollup::from_state_without_diffs( State { - applier_version: self.state.applier_version.clone(), shard_id: self.state.shard_id.clone(), seqno: self.state.seqno.clone(), walltime_ms: self.state.walltime_ms.clone(), diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index a454461a18559..7773fb4828e9c 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -791,6 +791,33 @@ impl PersistClient { Ok(()) } + /// Upgrade the state to the latest version. This should only be called once we will no longer + /// need to interoperate with older versions, like after a successful upgrade. + pub async fn upgrade_version( + &self, + shard_id: ShardId, + diagnostics: Diagnostics, + ) -> Result<(), InvalidUsage> + where + K: Debug + Codec, + V: Debug + Codec, + T: Timestamp + Lattice + Codec64 + Sync, + D: Monoid + Codec64 + Send + Sync, + { + let machine = self + .make_machine::(shard_id, diagnostics) + .await?; + + match machine.upgrade_version().await { + Ok(maintenance) => { + let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime)); + let () = maintenance.perform(&machine, &gc).await; + Ok(()) + } + Err(version) => Err(InvalidUsage::IncompatibleVersion { version }), + } + } + /// Returns the internal state of the shard for debugging and QA. /// /// We'll be thoughtful about making unnecessary changes, but the **output diff --git a/src/persist-client/src/write.rs b/src/persist-client/src/write.rs index b07b6a17a0590..449f69b772b8f 100644 --- a/src/persist-client/src/write.rs +++ b/src/persist-client/src/write.rs @@ -44,7 +44,7 @@ use crate::fetch::{ EncodedPart, FetchBatchFilter, FetchedPart, PartDecodeFormat, VALIDATE_PART_BOUNDS_ON_READ, }; use crate::internal::compact::{CompactConfig, Compactor}; -use crate::internal::encoding::{Schemas, check_data_version}; +use crate::internal::encoding::{Schemas, assert_code_can_read_data}; use crate::internal::machine::{CompareAndAppendRes, ExpireFn, Machine}; use crate::internal::metrics::{BatchWriteMetrics, Metrics, ShardMetrics}; use crate::internal::state::{BatchPart, HandleDebugState, HollowBatch, RunOrder, RunPart}; @@ -567,7 +567,7 @@ where handle_shard: self.machine.shard_id(), }); } - check_data_version(&self.cfg.build_version, &batch.version); + assert_code_can_read_data(&self.cfg.build_version, &batch.version); if self.cfg.build_version > batch.version { info!( shard_id =? self.machine.shard_id(), diff --git a/src/testdrive/Cargo.toml b/src/testdrive/Cargo.toml index f5603cf624845..09a1d6b2f5c25 100644 --- a/src/testdrive/Cargo.toml +++ b/src/testdrive/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "mz-testdrive" description = "Integration test driver for Materialize." -version = "0.165.0-dev.0" +version = "26.0.0-dev.0" edition.workspace = true rust-version.workspace = true publish = false diff --git a/test/legacy-upgrade/mzcompose.py b/test/legacy-upgrade/mzcompose.py index 403c6414bca50..da65187627d2d 100644 --- a/test/legacy-upgrade/mzcompose.py +++ b/test/legacy-upgrade/mzcompose.py @@ -96,9 +96,12 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: ) current_version = MzVersion.parse_cargo() - min_upgradable_version = MzVersion.create( - current_version.major, current_version.minor - 1, 0 - ) + if current_version.major == 26 and current_version.minor == 0: + min_upgradable_version = MzVersion.create(0, 164, 0) + else: + min_upgradable_version = MzVersion.create( + current_version.major, current_version.minor - 1, 0 + ) for version in tested_versions: # Building the latest release might have failed, don't block PRs on diff --git a/test/terraform/aws-persistent/main.tf b/test/terraform/aws-persistent/main.tf index c8e41632afc28..e63b78555ff72 100644 --- a/test/terraform/aws-persistent/main.tf +++ b/test/terraform/aws-persistent/main.tf @@ -18,7 +18,7 @@ resource "random_password" "db_password" { variable "operator_version" { type = string - default = "v25.3.0-beta.1.tgz" + default = "v26.0.0-beta.1.tgz" } variable "orchestratord_version" { @@ -34,7 +34,7 @@ module "materialize_infrastructure" { environment = "dev" install_materialize_operator = true use_local_chart = true - helm_chart = "materialize-operator-v25.3.0-beta.1.tgz" + helm_chart = "materialize-operator-v26.0.0-beta.1.tgz" operator_version = var.operator_version orchestratord_version = var.orchestratord_version diff --git a/test/terraform/aws-temporary/main.tf b/test/terraform/aws-temporary/main.tf index 63d256f074605..1b914d585a218 100644 --- a/test/terraform/aws-temporary/main.tf +++ b/test/terraform/aws-temporary/main.tf @@ -42,7 +42,7 @@ resource "random_password" "db_password" { variable "operator_version" { type = string - default = "v25.3.0-beta.1.tgz" + default = "v26.0.0-beta.1.tgz" } variable "orchestratord_version" { @@ -67,7 +67,7 @@ module "materialize_infrastructure" { install_materialize_operator = true use_local_chart = true - helm_chart = "materialize-operator-v25.3.0-beta.1.tgz" + helm_chart = "materialize-operator-v26.0.0-beta.1.tgz" operator_version = var.operator_version orchestratord_version = var.orchestratord_version diff --git a/test/terraform/aws-upgrade/main.tf b/test/terraform/aws-upgrade/main.tf index f2edc48b5b086..76e23a7aa5f4b 100644 --- a/test/terraform/aws-upgrade/main.tf +++ b/test/terraform/aws-upgrade/main.tf @@ -42,7 +42,7 @@ resource "random_password" "db_password" { variable "operator_version" { type = string - default = "v25.3.0-beta.1.tgz" + default = "v26.0.0-beta.1.tgz" } variable "orchestratord_version" { @@ -67,7 +67,7 @@ module "materialize_infrastructure" { install_materialize_operator = true use_local_chart = true - helm_chart = "materialize-operator-v25.3.0-beta.1.tgz" + helm_chart = "materialize-operator-v26.0.0-beta.1.tgz" operator_version = var.operator_version orchestratord_version = var.orchestratord_version diff --git a/test/terraform/azure-temporary/main.tf b/test/terraform/azure-temporary/main.tf index 30529d05bc198..deaabf8f4ad10 100644 --- a/test/terraform/azure-temporary/main.tf +++ b/test/terraform/azure-temporary/main.tf @@ -45,7 +45,7 @@ resource "random_password" "pass" { variable "operator_version" { type = string - default = "v25.3.0-beta.1.tgz" + default = "v26.0.0-beta.1.tgz" } variable "orchestratord_version" { @@ -67,7 +67,7 @@ module "materialize" { install_materialize_operator = true use_local_chart = true - helm_chart = "materialize-operator-v25.3.0-beta.1.tgz" + helm_chart = "materialize-operator-v26.0.0-beta.1.tgz" operator_version = var.operator_version orchestratord_version = var.orchestratord_version diff --git a/test/terraform/gcp-temporary/main.tf b/test/terraform/gcp-temporary/main.tf index d27c5817fa24e..a65bf1823ac39 100644 --- a/test/terraform/gcp-temporary/main.tf +++ b/test/terraform/gcp-temporary/main.tf @@ -54,7 +54,7 @@ module "materialize" { install_materialize_operator = true use_local_chart = true - helm_chart = "materialize-operator-v25.3.0-beta.1.tgz" + helm_chart = "materialize-operator-v26.0.0-beta.1.tgz" operator_version = var.operator_version orchestratord_version = var.orchestratord_version @@ -97,7 +97,7 @@ variable "database_password" { variable "operator_version" { type = string - default = "v25.3.0-beta.1.tgz" + default = "v26.0.0-beta.1.tgz" } variable "orchestratord_version" { diff --git a/test/terraform/mzcompose.py b/test/terraform/mzcompose.py index ecff7ff0f2c76..1ac25307c90b6 100644 --- a/test/terraform/mzcompose.py +++ b/test/terraform/mzcompose.py @@ -678,7 +678,7 @@ def setup( vars = [ "-var", - "operator_version=v25.3.0-beta.1", + "operator_version=v26.0.0-beta.1", ] vars += [ "-var", @@ -1149,7 +1149,7 @@ def workflow_gcp_temporary(c: Composition, parser: WorkflowArgumentParser) -> No vars = [ "-var", - "operator_version=v25.3.0-beta.1", + "operator_version=v26.0.0-beta.1", ] vars += [ "-var", @@ -1260,7 +1260,7 @@ def workflow_azure_temporary(c: Composition, parser: WorkflowArgumentParser) -> vars = [ "-var", - "operator_version=v25.3.0-beta.1", + "operator_version=v26.0.0-beta.1", ] vars += [ "-var",