From a7cc759b46aba04aab29151c830009ea79cc467c Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Sun, 9 Nov 2025 14:21:21 +0100 Subject: [PATCH 1/5] tests: Add a retry loop to Composition.cloud_hostname --- .../materialize/mzcompose/composition.py | 57 ++++++++++++++++--- 1 file changed, 49 insertions(+), 8 deletions(-) diff --git a/misc/python/materialize/mzcompose/composition.py b/misc/python/materialize/mzcompose/composition.py index 336c914388f50..32c271706e2c2 100644 --- a/misc/python/materialize/mzcompose/composition.py +++ b/misc/python/materialize/mzcompose/composition.py @@ -1667,16 +1667,57 @@ def promote_mz(self, mz_service: str = "materialized") -> None: ) assert result["result"] == "Success", f"Unexpected result {result}" - def cloud_hostname(self, quiet: bool = False) -> str: - """Uses the mz command line tool to get the hostname of the cloud instance""" + def cloud_hostname( + self, quiet: bool = False, timeout_secs: int = 180, poll_interval: float = 2.0 + ) -> str: + """Uses the mz command line tool to get the hostname of the cloud instance, waiting until the region is ready.""" if not quiet: print("Obtaining hostname of cloud instance ...") - region_status = self.run("mz", "region", "show", capture=True, rm=True) - sql_line = region_status.stdout.split("\n")[2] - cloud_url = sql_line.split("\t")[1].strip() - # It is necessary to append the 'https://' protocol; otherwise, urllib can't parse it correctly. - cloud_hostname = urllib.parse.urlparse("https://" + cloud_url).hostname - return str(cloud_hostname) + + deadline = time.time() + timeout_secs + last_msg = "" + + while time.time() < deadline: + proc = self.run( + "mz", + "region", + "show", + capture=True, + capture_stderr=True, + rm=True, + check=False, + silent=True, + ) + out = proc.stdout or "" + err = proc.stderr or "" + + if proc.returncode == 0: + lines = out.splitlines() + if len(lines) >= 3: + line = lines[2] + parts = line.split("\t") + if len(parts) >= 2: + cloud_url = parts[1].strip() + # It is necessary to append the 'https://' protocol; otherwise, urllib can't parse it correctly. + hostname = urllib.parse.urlparse( + "https://" + cloud_url + ).hostname + if hostname: + return str(hostname) + else: + last_msg = f"failed to parse hostname from URL: {cloud_url}" + else: + last_msg = f"unexpected region show output (no tab in line 3): {line!r}" + else: + last_msg = f"unexpected region show output (too few lines): {out!r}" + else: + last_msg = (out + "\n" + err).strip() + + time.sleep(poll_interval) + + raise UIError( + f"failed to obtain cloud hostname within {timeout_secs}s: {last_msg}" + ) T = TypeVar("T") From d493ba0dd2b5a256c978ca96e3fe8f06f1856201 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Thu, 30 Oct 2025 15:48:42 +0100 Subject: [PATCH 2/5] tests: Add QPS to Cluster Spec Sheet --- test/cluster-spec-sheet/README.md | 8 +- test/cluster-spec-sheet/mzcompose.py | 686 +++++++++++++++++++++++++-- test/dbbench/Dockerfile | 49 ++ test/dbbench/mzbuild.yml | 10 + 4 files changed, 699 insertions(+), 54 deletions(-) create mode 100644 test/dbbench/Dockerfile create mode 100644 test/dbbench/mzbuild.yml diff --git a/test/cluster-spec-sheet/README.md b/test/cluster-spec-sheet/README.md index 5cf4dea3f7cf3..e25f51c77df7e 100644 --- a/test/cluster-spec-sheet/README.md +++ b/test/cluster-spec-sheet/README.md @@ -8,8 +8,6 @@ Reproduce data for the cluster spec sheet effort. This will run all scenarios currently defined for the cluster spec sheet. -The test expects a default cluster. - Pass `--cleanup` to disable the region after the test. # Running @@ -48,3 +46,9 @@ In this case, the environment variables are not required. ``` bin/mzcompose --find cluster-spec-sheet run default --target=docker ``` + +## Scenarios + +There are two kinds of scenarios: +- cluster scaling: These measure run times and arrangement sizes. +- envd scaling: These measure QPS. diff --git a/test/cluster-spec-sheet/mzcompose.py b/test/cluster-spec-sheet/mzcompose.py index 2d0f12606000b..cdac47b9a8add 100644 --- a/test/cluster-spec-sheet/mzcompose.py +++ b/test/cluster-spec-sheet/mzcompose.py @@ -8,7 +8,7 @@ # by the Apache License, Version 2.0. """ -Reproduces cluster spec sheet results on Materialize Cloud. +Reproduces cluster spec sheet results on Materialize Cloud (or local Docker). """ import argparse @@ -17,6 +17,7 @@ import itertools import os import re +import shlex import time from abc import ABC, abstractmethod from collections.abc import Callable, Hashable @@ -34,6 +35,7 @@ Composition, WorkflowArgumentParser, ) +from materialize.mzcompose.service import Service as MzComposeService from materialize.mzcompose.services.materialized import Materialized from materialize.mzcompose.services.mz import Mz from materialize.test_analytics.config.test_analytics_db_config import ( @@ -43,7 +45,7 @@ cluster_spec_sheet_result_storage, ) from materialize.test_analytics.test_analytics_db import TestAnalyticsDb -from materialize.ui import UIError +from materialize.ui import CommandFailureCausedUIError, UIError CLUSTER_SPEC_SHEET_VERSION = "1.0.0" # Used for uploading test analytics results @@ -52,6 +54,11 @@ USERNAME = os.getenv("NIGHTLY_MZ_USERNAME", "infra+bot@materialize.com") APP_PASSWORD = os.getenv("MZ_CLI_APP_PASSWORD") +MATERIALIZED_ADDITIONAL_SYSTEM_PARAMETER_DEFAULTS = { + "memory_limiter_interval": "0s", + "max_credit_consumption_rate": "1024", +} + SERVICES = [ Mz( region=REGION, @@ -60,9 +67,13 @@ ), Materialized( propagate_crashes=True, - additional_system_parameter_defaults={ - "memory_limiter_interval": "0s", - "max_credit_consumption_rate": "1024", + additional_system_parameter_defaults=MATERIALIZED_ADDITIONAL_SYSTEM_PARAMETER_DEFAULTS, + ), + # dbbench service built from our mzbuild Dockerfile (test/dbbench) + MzComposeService( + "dbbench", + { + "mzbuild": "dbbench", }, ), ] @@ -73,6 +84,7 @@ SCENARIO_TPCH_QUERIES_STRONG = "tpch_queries_strong" SCENARIO_TPCH_QUERIES_WEAK = "tpch_queries_weak" SCENARIO_TPCH_STRONG = "tpch_strong" +SCENARIO_QPS_ENVD_STRONG_SCALING = "qps_envd_strong_scaling" ALL_SCENARIOS = [ SCENARIO_AUCTION_STRONG, SCENARIO_AUCTION_WEAK, @@ -80,6 +92,7 @@ SCENARIO_TPCH_QUERIES_STRONG, SCENARIO_TPCH_QUERIES_WEAK, SCENARIO_TPCH_STRONG, + SCENARIO_QPS_ENVD_STRONG_SCALING, ] @@ -129,6 +142,7 @@ def __init__( connection: ConnectionHandler, results_writer: csv.DictWriter, replica_size: Any, + target: "BenchTarget", ) -> None: self.scenario = scenario self.scenario_version = scenario_version @@ -137,6 +151,8 @@ def __init__( self.connection = connection self.results_writer = results_writer self.replica_size = replica_size + self.target = target + self.envd_cpus: int | None = None # Used only in QPS scenarios def add_result( self, @@ -144,7 +160,8 @@ def add_result( name: str, repetition: int, size_bytes: int | None, - time: float, + time: float | None = None, + qps: float | None = None, ) -> None: self.results_writer.writerow( { @@ -155,9 +172,11 @@ def add_result( "category": category, "test_name": name, "cluster_size": self.replica_size, + "envd_cpus": self.envd_cpus, "repetition": repetition, "size_bytes": size_bytes, - "time_ms": int(time * 1000), + "time_ms": int(time * 1000) if time is not None else None, + "qps": qps, } ) @@ -210,6 +229,145 @@ def inner() -> None: self.connection.retryable(inner) + def measure_dbbench( + self, + category: str, + name: str, + *, + setup: list[str], + query: list[str], + after: list[str], + duration: str, + concurrency: int | None = None, + ) -> None: + """ + Run dbbench via the 'dbbench' mzcompose service and record its final QPS. + + Parameters somewhat mirror ScenarioRunner.measure for consistency, but are applied to a + generated dbbench.ini: + - setup: SQL statements run once before the workload (dbbench [setup]) + - query: SQL statements that form the workload (dbbench [loadtest] query=...) + - after: SQL statements run once after the workload (dbbench [teardown]) + - duration: length of the workload (top-level duration=...) + - concurrency: concurrent sessions for the workload. + + We capture and parse dbbench's output (stderr or stdout) to find the last + occurrence of a "QPS" value that dbbench reports in its summary, and store that + as the 'qps' column in the results CSV. If no QPS is found, the test fails. + (We omit time_ms for these rows since wall-clock time is not meaningful here.) + """ + + # Build a dbbench command that targets the current Materialize instance and + # the active cluster 'c' using target-provided flags. + flags = self.target.dbbench_connection_flags() + + # Render a minimal INI file for dbbench. + def ini_escape(s: str) -> str: + return s.replace("\n", " ") + + lines: list[str] = [] + lines.append(f"duration={duration}") + # [setup] + if setup: + lines.append("") + lines.append("[setup]") + for q in setup: + q = dedent(q).strip() + if q: + lines.append(f"query={ini_escape(q)}") + # [teardown] + if after: + lines.append("") + lines.append("[teardown]") + for q in after: + q = dedent(q).strip() + if q: + lines.append(f"query={ini_escape(q)}") + # [loadtest] + lines.append("") + lines.append("[loadtest]") + for q in query: + q = dedent(q).strip() + if q: + lines.append(f"query={ini_escape(q)}") + if concurrency is not None: + lines.append(f"concurrency={int(concurrency)}") + + ini_text = "\n".join(lines) + "\n" + + # Construct a shell snippet to write the INI and execute dbbench + quoted_flags = " ".join(shlex.quote(x) for x in flags) + script = ( + "set -euo pipefail; " + "cat > /tmp/run.ini; " + f"exec dbbench {quoted_flags} -intermediate-stats=false /tmp/run.ini" + ) + + print(f"--- Running dbbench step '{name}' for {self.envd_cpus} ...") + # Execute the command in the 'dbbench' service container and capture output + result = self.target.c.run( + "dbbench", + "-lc", # sh arg to make it run `script` + script, + entrypoint="sh", + rm=True, + capture_and_print=True, + stdin=ini_text, + ) + + # dbbench writes its final summary to stderr; prefer stderr but also + # consider stdout just in case. + stderr_out = result.stderr or "" + stdout_out = result.stdout or "" + combined_out = f"{stderr_out}\n{stdout_out}".strip() + + # Persist full dbbench output to a log file for later inspection + logs_dir = os.path.join("test", "cluster-spec-sheet", "dbbench-logs") + os.makedirs(logs_dir, exist_ok=True) + + # Build a descriptive, filesystem-safe filename + def _slug(x: Any) -> str: + return re.sub(r"[^A-Za-z0-9._=-]+", "-", str(x)) + + fname = f"{_slug(self.scenario)}_{_slug(self.mode)}_{_slug(self.envd_cpus)}_{_slug(name)}.log" + log_path = os.path.join(logs_dir, fname) + with open(log_path, "w", encoding="utf-8") as f: + f.write(combined_out + ("\n" if not combined_out.endswith("\n") else "")) + print(f"dbbench logs saved to {log_path}") + + # Parse QPS from output. + # (Alternatively, we could make our fork of dbbench print a more machine-readable output.) + # TODO: Later, we'll want to also look at latency (avg and tail-latency), but seems too unstable for now. + qps_val: float | None = None + try: + qps_matches = re.findall(r"([0-9]+(?:\.[0-9]+)?)\s*QPS", combined_out) + if len(qps_matches) > 1: + raise UIError( + f"dbbench: found multiple QPS values in output: {qps_matches}" + ) + if qps_matches: + qps_val = float(qps_matches[0]) + except Exception as e: + raise UIError(f"Failed to parse dbbench QPS from output: {e}") + + if qps_val is None: + tail = "\n".join(combined_out.splitlines()[-25:]) + raise UIError("dbbench: failed to find QPS in output. Last lines:\n" + tail) + else: + print(f"dbbench parsed QPS: {qps_val}") + + # Record a result row: put QPS into the 'qps' column, and omit time_ms (not applicable) + self.add_result( + category, + name, + # we have `duration` instead of `repetitions` in QPS benchmarks currently (but maybe it's good to keep + # `repetition` in the schema in case we ever want also multiple repetitions for QPS) + 0, + None, + None, + qps=qps_val, + ) + def size_of_dataflow(self, object: str) -> int | None: retries = 10 while retries > 0: @@ -1672,29 +1830,218 @@ def run(self, runner: ScenarioRunner) -> None: ) -def disable_region(c: Composition) -> None: +class QpsEnvdStrongScalingScenario(Scenario): + + def name(self) -> str: + return SCENARIO_QPS_ENVD_STRONG_SCALING + + def materialize_views(self) -> list[str]: + return [] + + def setup(self) -> list[str]: + return [] + + def drop(self) -> list[str]: + return [] + + def run(self, runner: ScenarioRunner) -> None: + runner.measure_dbbench( + category="peek_qps", + name="dbbench_256_conns", + setup=[ + "create view if not exists gen_view as select generate_series as x from generate_series(1, 10)", + "create default index on gen_view", + "select * from gen_view", # Wait for hydration + ], + query=[ + "select * from gen_view", + ], + after=[ + "drop view gen_view cascade", + ], + duration="40s", + concurrency=256, + ) + + runner.measure_dbbench( + category="peek_qps", + name="dbbench_512_conns", + setup=[ + "create view if not exists gen_view as select generate_series as x from generate_series(1, 10)", + "create default index on gen_view", + "select * from gen_view", # Wait for hydration + ], + query=[ + "select * from gen_view", + ], + after=[ + "drop view gen_view cascade", + ], + duration="40s", + concurrency=512, + ) + + # TODO: Add more scenarios as the QPS/CPS work progresses: + # - different connection counts + # - distribute queries across more clusters; + # see manual test results with multiple clusters here: + # https://docs.google.com/presentation/d/1bIyTWaRiyEqBXFxoxpHwWSywztSW1jRw_JP3M-Zj_6A/edit?slide=id.g39de8b7440c_0_86#slide=id.g39de8b7440c_0_86 + # - explicit transactions (which are currently super slow) + # - (slow-path queries are kinda expected to be slow, so it's not so important to measure them) + # - I think dbbench uses the "Simple Query Protocol" by default. We might want to also measure the + # "Extended Query Protocol" / prepared statements. + # - Lookups in a large index, especially on a larger replica. + # - Larger result sets. + # + # We'll also want to measure latency, including tail latency. + + +def disable_region(c: Composition, hard: bool) -> None: print(f"Shutting down region {REGION} ...") try: - c.run("mz", "region", "disable", "--hard", rm=True) + if hard: + c.run("mz", "region", "disable", "--hard", rm=True) + else: + c.run("mz", "region", "disable", rm=True) except UIError: # Can return: status 404 Not Found pass -def wait_for_cloud(c: Composition) -> None: - print(f"Waiting for cloud cluster to come up with username {USERNAME} ...") - _wait_for_pg( - host=c.cloud_hostname(), - user=USERNAME, - password=APP_PASSWORD, - port=6875, - query="SELECT 1", - expected=[(1,)], - timeout_secs=900, - dbname="materialize", - sslmode="require", - ) +def cloud_disable_enable_and_wait( + target: "BenchTarget", + environmentd_cpu_allocation: int | None = None, +) -> None: + """ + Soft-disable and then enable the Cloud region, then wait for environmentd readiness. + + The disabling is needed because `mz region enable` does a 0dt rollout. This means that, + without a `disable`, we'd get an intrusive envd restart some time later after `wait_for_envd` + already succeeded (with the old envd). Therefore, we do an `mz region disable` first, so + that `wait_for_envd` can't succeed before the read-only env promotes. + + When `environmentd_cpu_allocation` is provided, it is passed to `mz region enable` via + `--environmentd-cpu-allocation` to reconfigure environmentd's CPU allocation. + """ + disable_region(target.c, hard=False) + + if environmentd_cpu_allocation is None: + target.c.run("mz", "region", "enable", rm=True) + else: + target.c.run( + "mz", + "region", + "enable", + "--environmentd-cpu-allocation", + str(environmentd_cpu_allocation), + rm=True, + ) + + time.sleep(10) + + assert "materialize.cloud" in target.c.cloud_hostname() + wait_for_envd(target) + + +def reconfigure_envd_cpus( + target: "BenchTarget", envd_cpus: int, runner: ScenarioRunner +) -> None: + """ + Reconfigure the number of CPU cores allocated to environmentd for the given target. + + - Docker target: recreate the local `materialized` container with a CPU limit equal to envd_cpus, + wait for SQL readiness, and force the benchmark connection to reconnect. + - Cloud target: soft-disable/enable the region with the desired envd CPU allocation, wait for + SQL readiness, and force the benchmark connection to reconnect. + """ + if isinstance(target, DockerTarget): + # For Docker target: restart `materialized` with a CPU limit equal to envd_cpus. + try: + # Create a temporary override of the materialized service with updated CPU limits. + # Keep other defaults consistent with SERVICES. + overridden = Materialized( + propagate_crashes=True, + additional_system_parameter_defaults=MATERIALIZED_ADDITIONAL_SYSTEM_PARAMETER_DEFAULTS, + # This is just an upper limit; it won't make a noise if your local machine doesn't have enough cores. + # If you'd like to avoid going over your machine's core count, you can use `--max-scale`. + cpu=str(envd_cpus), + ) + print(f"--- Reconfiguring local environmentd CPUs to {envd_cpus}") + with target.c.override(overridden): + # Recreate the container to apply new limits, but preserve volumes. + try: + target.c.rm("materialized", stop=True, destroy_volumes=False) + except CommandFailureCausedUIError as e: + # Ignore only the benign case where the container does not yet exist. + if not (e.stderr and "No such container" in e.stderr): + raise + target.c.up("materialized") + wait_for_envd(target, timeout_secs=60) + except Exception as e: + raise UIError(f"failed to apply Docker CPU override for environmentd: {e}") + else: + # Cloud target: reconfigure environmentd CPUs via `mz region`. + try: + print(f"--- Reconfiguring Cloud environmentd CPUs to {envd_cpus}") + cloud_disable_enable_and_wait(target, environmentd_cpu_allocation=envd_cpus) + except Exception as e: + raise UIError( + f"failed to apply Cloud CPU override for environmentd via 'mz region': {e}" + ) + + # Force reconnection to the (potentially restarted) service. + try: + runner.connection.connection.close() + except Exception: + pass + + +def wait_for_envd(target: "BenchTarget", timeout_secs: int = 300) -> None: + """ + Wait until the environmentd SQL endpoint is ready. + + - Cloud: uses cloud hostname:6875, sslmode=require, and prefers the per-run + app password if available; falls back to MZ_CLI_APP_PASSWORD. + - Docker: probes SQL readiness via Composition.sql_query + """ + if isinstance(target, CloudTarget): + host = target.c.cloud_hostname() + user = USERNAME + # Prefer the newly created app password when present; fall back to the CLI password. + password = target.new_app_password or APP_PASSWORD or "" + sslmode = "require" + print( + f"Waiting for cloud environmentd at {host}:6875 to come up with username {user} ..." + ) + _wait_for_pg( + host=host, + user=user, + password=password, + port=6875, + query="SELECT 1", + expected=[(1,)], + timeout_secs=timeout_secs, + dbname="materialize", + sslmode=sslmode, + ) + else: + # Docker target: use the composition helper to query the service via the + # host-mapped port on 127.0.0.1; the container hostname "materialized" + # is not resolvable from the host network when using psycopg directly. + print("Waiting for local environmentd (docker) at materialized:6875 ...") + deadline = time.time() + timeout_secs + last_err: Exception | None = None + while time.time() < deadline: + try: + target.c.sql_query("SELECT 1", service="materialized") + return + except Exception as e: + last_err = e + time.sleep(1) + raise UIError( + f"materialized did not accept SQL connections within {timeout_secs}s after restart: {last_err}" + ) def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: @@ -1716,7 +2063,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: "--analyze", default=True, action=argparse.BooleanOptionalAction, - help="Analyze results after completing test.", + help="Analyze results after completing test. Dispatches to cluster-scale or envd-scale focused analyses based on the file suffix: `.cluster.csv` or `.envd.csv`.", ) parser.add_argument( "--target", @@ -1725,7 +2072,10 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: help="Target to deploy to (default: cloud).", ) parser.add_argument( - "--max-scale", type=int, default=32, help="Maximum scale to test." + "--max-scale", + type=int, + default=32, + help="Maximum scale to test. For QPS scenarios, this directly corresponds to the number of CPU cores given to envd.", ) parser.add_argument( "--scale-tpch", type=float, default=8, help="TPCH scale factor." @@ -1734,7 +2084,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: "--scale-tpch-queries", type=float, default=4, help="TPCH queries scale factor." ) parser.add_argument( - "--scale-auction", type=int, default=3, help="TPCH scale factor." + "--scale-auction", type=int, default=3, help="Auction scale factor." ) parser.add_argument( "scenarios", @@ -1765,9 +2115,18 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: target.cleanup() target.initialize() - results_file = open(args.record, "w", newline="") - results_writer = csv.DictWriter( - results_file, + + # Derive two result files (cluster and envd-focused) from the provided --record path + base_name = os.path.splitext(args.record)[0] + cluster_path = f"{base_name}.cluster.csv" + envd_path = f"{base_name}.envd.csv" + + cluster_file = open(cluster_path, "w", newline="") + envd_file = open(envd_path, "w", newline="") + + # Traditional scenarios: cluster-focused schema + cluster_writer = csv.DictWriter( + cluster_file, fieldnames=[ "scenario", "scenario_version", @@ -1780,20 +2139,45 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: "size_bytes", "time_ms", ], + extrasaction="ignore", + ) + cluster_writer.writeheader() + + # Envd-focused scenarios: QPS schema + envd_writer = csv.DictWriter( + envd_file, + fieldnames=[ + "scenario", + "scenario_version", + "scale", + "mode", + "category", + "test_name", + "envd_cpus", + "repetition", + "qps", + ], + extrasaction="ignore", ) - results_writer.writeheader() + envd_writer.writeheader() def process(scenario: str) -> None: with c.test_case(scenario): conn = ConnectionHandler(target.new_connection) + # This cluster is just for misc setup queries. + size = "50cc" if isinstance(target, CloudTarget) else "scale=1,workers=1" + with conn as cur: + cur.execute("DROP CLUSTER IF EXISTS quickstart;") + cur.execute(f"CREATE CLUSTER quickstart SIZE '{size}';".encode()) + if scenario == SCENARIO_TPCH_STRONG: print("--- SCENARIO: Running TPC-H Index strong scaling") run_scenario_strong( scenario=TpchScenario( args.scale_tpch, target.replica_size_for_scale(1) ), - results_writer=results_writer, + results_writer=cluster_writer, connection=conn, target=target, max_scale=max_scale, @@ -1804,7 +2188,7 @@ def process(scenario: str) -> None: scenario=TpchScenarioMV( args.scale_tpch, target.replica_size_for_scale(1) ), - results_writer=results_writer, + results_writer=cluster_writer, connection=conn, target=target, max_scale=max_scale, @@ -1815,7 +2199,7 @@ def process(scenario: str) -> None: scenario=TpchScenarioQueriesIndexedInputs( args.scale_tpch_queries, target.replica_size_for_scale(1) ), - results_writer=results_writer, + results_writer=cluster_writer, connection=conn, target=target, max_scale=max_scale, @@ -1826,7 +2210,7 @@ def process(scenario: str) -> None: scenario=TpchScenarioQueriesIndexedInputs( args.scale_tpch_queries, None ), - results_writer=results_writer, + results_writer=cluster_writer, connection=conn, target=target, max_scale=max_scale, @@ -1837,7 +2221,7 @@ def process(scenario: str) -> None: scenario=AuctionScenario( args.scale_auction, target.replica_size_for_scale(1) ), - results_writer=results_writer, + results_writer=cluster_writer, connection=conn, target=target, max_scale=max_scale, @@ -1846,7 +2230,18 @@ def process(scenario: str) -> None: print("--- SCENARIO: Running Auction weak scaling") run_scenario_weak( scenario=AuctionScenario(args.scale_auction, None), - results_writer=results_writer, + results_writer=cluster_writer, + connection=conn, + target=target, + max_scale=max_scale, + ) + if scenario == SCENARIO_QPS_ENVD_STRONG_SCALING: + print("--- SCENARIO: Running QPS envd strong scaling") + run_scenario_envd_strong_scaling( + scenario=QpsEnvdStrongScalingScenario( + 1, target.replica_size_for_scale(1) + ), + results_writer=envd_writer, connection=conn, target=target, max_scale=max_scale, @@ -1858,23 +2253,33 @@ def process(scenario: str) -> None: c.test_parts(scenarios, process) test_failed = False finally: - results_file.close() + cluster_file.close() + envd_file.close() # Clean up if args.cleanup: target.cleanup() - upload_results_to_test_analytics(c, args.record, not test_failed) + # Upload only cluster scaling results to Test Analytics for now, until the Test Analytics schema is extended. + # TODO: See slack discussion: + # https://materializeinc.slack.com/archives/C01LKF361MZ/p1762351652336819?thread_ts=1762348361.164759&cid=C01LKF361MZ + upload_results_to_test_analytics(c, cluster_path, not test_failed) assert not test_failed if buildkite.is_in_buildkite(): - buildkite.upload_artifact(args.record, cwd=MZ_ROOT, quiet=True) + # Upload both CSVs as artifacts + buildkite.upload_artifact(cluster_path, cwd=MZ_ROOT, quiet=True) + buildkite.upload_artifact(envd_path, cwd=MZ_ROOT, quiet=True) if args.analyze: - analyze_file(args.record) + # Analyze both files separately (each has its own schema) + analyze_results_file(cluster_path) + analyze_results_file(envd_path) class BenchTarget: + c: Composition + @abstractmethod def initialize(self) -> None: ... @abstractmethod @@ -1894,20 +2299,43 @@ def max_scale(self) -> int | None: """ return None + @abstractmethod + def dbbench_connection_flags(self) -> list[str]: + """ + Return dbbench connection flags appropriate for this target, excluding the + workload file path. The result should include driver, host, port, username, + optional password, database, and params (including sslmode and cluster=c). + """ + ... + class CloudTarget(BenchTarget): def __init__(self, c: Composition) -> None: self.c = c self.new_app_password: str | None = None - def initialize(self) -> None: - print("Enabling region using Mz ...") - self.c.run("mz", "region", "enable", rm=True) - - time.sleep(10) + def dbbench_connection_flags(self) -> list[str]: + assert self.new_app_password is not None + return [ + "-driver", + "postgres", + "-host", + self.c.cloud_hostname(), + "-port", + "6875", + "-username", + USERNAME, + "-password", + self.new_app_password, + "-database", + "materialize", + "-params", + "sslmode=require&cluster=c", + ] - assert "materialize.cloud" in self.c.cloud_hostname() - wait_for_cloud(self.c) + def initialize(self) -> None: + print("Soft-disabling and then enabling region using Mz ...") + cloud_disable_enable_and_wait(self) # Create new app password. new_app_password_name = "Materialize CLI (mz) - Cluster Spec Sheet" @@ -1936,7 +2364,7 @@ def new_connection(self) -> psycopg.Connection: return conn def cleanup(self) -> None: - disable_region(self.c) + disable_region(self.c, hard=True) def replica_size_for_scale(self, scale: int) -> str: """ @@ -1949,6 +2377,22 @@ class DockerTarget(BenchTarget): def __init__(self, c: Composition) -> None: self.c = c + def dbbench_connection_flags(self) -> list[str]: + return [ + "-driver", + "postgres", + "-host", + "materialized", + "-port", + "6875", + "-username", + "materialize", + "-database", + "materialize", + "-params", + "sslmode=disable&cluster=c", + ] + def initialize(self) -> None: print("Starting local Materialize instance ...") self.c.up("materialized") @@ -1988,6 +2432,7 @@ def run_scenario_strong( connection, results_writer, replica_size=None, + target=target, ) for query in scenario.drop(): @@ -2021,6 +2466,92 @@ def run_scenario_strong( scenario.run(runner) +def run_scenario_envd_strong_scaling( + scenario: Scenario, + results_writer: csv.DictWriter, + connection: ConnectionHandler, + target: BenchTarget, + max_scale: int, +) -> None: + """ + Run envd-focused scaling scenarios, where we keep the compute cluster size + fixed and scale the CPU resources available to environmentd instead. + + For the Docker target, we change the CPU limit of the local `materialized` + container (which runs environmentd) before each scale point. + For the Cloud target, we reconfigure environmentd using `mz region enable` + with the `--environmentd-cpu-allocation` flag. + """ + + runner = ScenarioRunner( + scenario.name(), + scenario.VERSION, + scenario.scale, + "strong", + connection, + results_writer, + replica_size=None, + target=target, + ) + + # Prepare a tiny table for cluster availability checks. + for query in [ + "DROP TABLE IF EXISTS t CASCADE;", + "CREATE TABLE t (a int);", + "INSERT INTO t VALUES (1);", + ]: + runner.run_query(query) + + # Scenario-specific setup. + for query in scenario.setup(): + runner.run_query(query) + + for name in scenario.materialize_views(): + runner.run_query(f"SELECT COUNT(*) > 0 FROM {name};") + + fixed_replica_size = target.replica_size_for_scale(1) + + try: + # (So far, I haven't seen a difference between 16 and 32 in manual testing in cloud. When we start seeing a + # difference, consider extending to 64.) + for envd_cpus in [1, 2, 4, 8, 16, 32]: + if envd_cpus > max_scale: + break + + print( + f"--- Running envd-scaling scenario {scenario.name()} with envd_cpus={envd_cpus}; compute size fixed at {fixed_replica_size}" + ) + + reconfigure_envd_cpus(target, envd_cpus, runner) + + # (Re)create a fixed-size compute cluster. + def recreate_cluster() -> None: + runner.run_query("DROP CLUSTER IF EXISTS c CASCADE") + runner.run_query(f"CREATE CLUSTER c SIZE '{fixed_replica_size}'") + runner.run_query("SET cluster = 'c';") + runner.run_query("SELECT * FROM t;") + + runner.connection.retryable(recreate_cluster) + + # Record envd CPU cores for this step. (We intentionally do not set replica_size, because that would be fixed + # in this scenario, so there is no meaningful analysis to be done on that.) + runner.envd_cpus = envd_cpus + + scenario.run(runner) + finally: + if isinstance(target, CloudTarget): + # We reset the cloud envd's core count in any case, to avoid accidentally burning a lot of money. + print("--- Resetting Cloud environmentd CPUs to the default") + target.c.run( + "mz", + "region", + "enable", + "--environmentd-cpu-allocation", + "2", + rm=True, + ) + + def run_scenario_weak( scenario: Scenario, results_writer: csv.DictWriter, @@ -2057,6 +2588,7 @@ def run_scenario_weak( connection, results_writer, replica_size, + target=target, ) for query in scenario.drop(): runner.run_query(query) @@ -2091,12 +2623,26 @@ def workflow_plot(c: Composition, parser: WorkflowArgumentParser) -> None: args = parser.parse_args() for file in itertools.chain(*map(glob.iglob, args.files)): - analyze_file(str(file)) + analyze_results_file(str(file)) -def analyze_file(file: str): +def analyze_results_file(file: str): print(f"--- Analyzing file {file} ...") + base_name = os.path.basename(file) + if base_name.endswith(".cluster.csv"): + analyze_cluster_results_file(file) + elif base_name.endswith(".envd.csv"): + analyze_envd_results_file(file) + else: + # Backward compatibility: fall back to cluster analyzer for legacy filenames + print( + f"Warning: Legacy analyzer fallback: treating {file} as cluster results (no .cluster/.envd suffix)" + ) + analyze_cluster_results_file(file) + + +def analyze_cluster_results_file(file: str) -> None: def extract_cluster_size(s: str) -> float: match = re.search(r"(\d+)(?:(cc)|(C))", s) if match: @@ -2154,7 +2700,7 @@ def extract_cluster_size(s: str) -> float: & (df_all["mode"] == mode) ) df = df_all[indexes] - title = f"{str(benchmark).replace("_", " ")} - {str(category).replace('_', ' ')} ({mode})" + title = f"{str(benchmark).replace('_', ' ')} - {str(category).replace('_', ' ')} ({mode})" slug = f"{benchmark}_{category}_{mode}".replace(" ", "_") plot( @@ -2165,6 +2711,7 @@ def extract_cluster_size(s: str) -> float: f"{slug}_time_ms", "Time [ms]", "Normalized time", + x="credits_per_h", ) plot( plot_dir, @@ -2174,6 +2721,37 @@ def extract_cluster_size(s: str) -> float: f"{slug}_credits", "Cost [centi-credits]", "Normalized cost", + x="credits_per_h", + ) + + +def analyze_envd_results_file(file: str) -> None: + df = pd.read_csv(file) + if df.empty: + print(f"^^^ +++ File {file} is empty, skipping") + return + + base_name = os.path.basename(file).split(".")[0] + plot_dir = os.path.join("test", "cluster-spec-sheet", "plots", base_name) + os.makedirs(plot_dir, exist_ok=True) + + for (benchmark, category, mode), sub in df.groupby( + ["scenario", "category", "mode"] + ): + title = f"{str(benchmark).replace('_',' ')} - {str(category).replace('_',' ')} ({mode})" + slug = f"{benchmark}_{category}_{mode}".replace(" ", "_") + sub_q = sub[sub["qps"].notna() & (sub["qps"] > 0)] + if sub_q.empty: + raise UIError(f"No QPS data found for {title} in {file}") + plot( + plot_dir, + sub_q, + "qps", + f"{title} (QPS)", + f"{slug}_qps", + "QPS", + "Normalized QPS", + x="envd_cpus", ) @@ -2220,18 +2798,20 @@ def plot( slug: str, data_label: str, normalized_label: str, + x: str, ): df2 = data.pivot_table( - index=["credits_per_h"], + index=[x], columns=["test_name"], values=[value], aggfunc="min", ).sort_index(axis=1) - (level, dropped) = labels_to_drop(df2) + (level, _dropped) = labels_to_drop(df2) filtered = df2.droplevel(level, axis=1).dropna(axis=1, how="all") if filtered.empty: print(f"Warning: No data to plot for {title}") return + filtered.index.name = x plot = filtered.plot( kind="bar", figsize=(12, 6), @@ -2240,6 +2820,7 @@ def plot( title=f"{title}", grid=True, ) + plot.set_xlabel(x) plot.legend( loc="upper left", bbox_to_anchor=(1.0, 1.0), @@ -2255,6 +2836,7 @@ def plot( title=f"{title}", grid=True, ) + plot.set_xlabel(x) plot.legend( loc="upper left", bbox_to_anchor=(1.0, 1.0), diff --git a/test/dbbench/Dockerfile b/test/dbbench/Dockerfile new file mode 100644 index 0000000000000..83b4b20f55723 --- /dev/null +++ b/test/dbbench/Dockerfile @@ -0,0 +1,49 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Build the dbbench Go binary from https://github.com/sjwiesman/dbbench +# We use the standard Materialize mzbuild base image. + +MZFROM ubuntu-base + +WORKDIR /workdir + +# Install build dependencies for Go and Git +RUN apt-get update && TZ=UTC DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \ + ca-certificates \ + git \ + golang-go \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* \ + && rm -rf /usr/share/doc/* /usr/share/man/* /usr/share/info/* /usr/share/locale/* /var/cache/* /var/log/* + +ENV CGO_ENABLED=0 + +# Pin to specific commit for reproducibility +# TODO: Fork it under the MaterializeInc organization (like e.g. sqlsmith). +ADD https://api.github.com/repos/sjwiesman/dbbench/git/commits/10b2a0b5159f06945646fa1179bda4be51fe02b4 version.json + +# Clone and build dbbench. Some forks place main under ./cmd/dbbench; handle both. +RUN git clone https://github.com/sjwiesman/dbbench /workdir/dbbench \ + && cd /workdir/dbbench \ + && git checkout 10b2a0b5159f06945646fa1179bda4be51fe02b4 \ + && go mod download \ + && if [ -d cmd/dbbench ]; then \ + go build -trimpath -ldflags="-s -w" -o /usr/local/bin/dbbench ./cmd/dbbench; \ + else \ + go build -trimpath -ldflags="-s -w" -o /usr/local/bin/dbbench .; \ + fi \ + && cd /workdir \ + && rm -rf /workdir/dbbench + +# (No default dbbench.ini; scenarios generate an INI at runtime.) + +# Default command shows dbbench help if executed without arguments +ENTRYPOINT ["/usr/local/bin/dbbench"] +CMD ["--help"] diff --git a/test/dbbench/mzbuild.yml b/test/dbbench/mzbuild.yml new file mode 100644 index 0000000000000..6b8d0a6224f2e --- /dev/null +++ b/test/dbbench/mzbuild.yml @@ -0,0 +1,10 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +name: dbbench From 1eef74b3e7ab0899068c412fa83dfb41d72259f0 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Mon, 10 Nov 2025 20:47:56 +0100 Subject: [PATCH 3/5] Use `mktemp` --- test/cluster-spec-sheet/mzcompose.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/cluster-spec-sheet/mzcompose.py b/test/cluster-spec-sheet/mzcompose.py index cdac47b9a8add..02cdf04e1dc52 100644 --- a/test/cluster-spec-sheet/mzcompose.py +++ b/test/cluster-spec-sheet/mzcompose.py @@ -299,8 +299,9 @@ def ini_escape(s: str) -> str: quoted_flags = " ".join(shlex.quote(x) for x in flags) script = ( "set -euo pipefail; " - "cat > /tmp/run.ini; " - f"exec dbbench {quoted_flags} -intermediate-stats=false /tmp/run.ini" + 'tmp="$(mktemp -t dbbench.XXXXXX)"; ' + 'cat > "$tmp"; ' + f'exec dbbench {quoted_flags} -intermediate-stats=false "$tmp"' ) print(f"--- Running dbbench step '{name}' for {self.envd_cpus} ...") From a600dbee6a09fb19ea4d4bb3a660df9aeadc89f8 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Tue, 11 Nov 2025 13:58:52 +0000 Subject: [PATCH 4/5] cluster spec sheet: Introduce separate staging env and record into separate table --- ci/plugins/mzcompose/hooks/command | 2 +- .../pipeline.template.yml | 43 +- .../cluster_spec_sheet_result_storage.py | 61 +++ .../setup/cleanup/remove-build.sql | 1 + .../16-cluster-spec-sheet-environmentd.sql | 27 + .../setup/views/100-data-integrity.sql | 4 + .../test_analytics/test_analytics_db.py | 4 + test/cluster-spec-sheet/mzcompose.py | 462 +++++++++++------- 8 files changed, 404 insertions(+), 200 deletions(-) create mode 100644 misc/python/materialize/test_analytics/setup/tables/16-cluster-spec-sheet-environmentd.sql diff --git a/ci/plugins/mzcompose/hooks/command b/ci/plugins/mzcompose/hooks/command index 36fbb2fe1b317..8521e4dd261e3 100644 --- a/ci/plugins/mzcompose/hooks/command +++ b/ci/plugins/mzcompose/hooks/command @@ -345,7 +345,7 @@ cleanup() { && [ "$BUILDKITE_LABEL" != "Parallel Benchmark against QA Benchmarking Staging Environment" ] \ && [[ ! "$BUILDKITE_LABEL" =~ Terraform\ .* ]] \ && [[ ! "$BUILDKITE_LABEL" =~ Orchestratord\ test\ .* ]] \ - && [ "$BUILDKITE_LABEL" != "Cluster spec sheet" ]; then + && [[ ! "$BUILDKITE_LABEL" =~ Cluster\ spec\ sheet.* ]]; then echo "+++ services.log is empty, failing" exit 1 fi diff --git a/ci/release-qualification/pipeline.template.yml b/ci/release-qualification/pipeline.template.yml index 14018e23efbc4..de9ec9a33c6b2 100644 --- a/ci/release-qualification/pipeline.template.yml +++ b/ci/release-qualification/pipeline.template.yml @@ -530,16 +530,33 @@ steps: agents: queue: hetzner-x86-64-dedi-48cpu-192gb # 1 TB disk - - id: cluster-spec-sheet - label: Cluster spec sheet - depends_on: build-aarch64 - timeout_in_minutes: 3600 - concurrency: 1 - concurrency_group: 'cluster-spec-sheet' - plugins: - - ./ci/plugins/mzcompose: - composition: cluster-spec-sheet - run: default - args: [--cleanup] - agents: - queue: linux-aarch64-small + - group: Cluster spec sheet + key: cluster-spec-sheet + steps: + - id: cluster-spec-sheet-cluster + label: "Cluster spec sheet: Cluster (against Production)" + depends_on: build-aarch64 + timeout_in_minutes: 3600 + concurrency: 1 + concurrency_group: 'cluster-spec-sheet' + plugins: + - ./ci/plugins/mzcompose: + composition: cluster-spec-sheet + run: default + args: [--cleanup, --target=cloud-production, cluster] + agents: + queue: linux-aarch64-small + + - id: cluster-spec-sheet-environmentd + label: "Cluster spec sheet: Environmentd (against Staging)" + depends_on: build-aarch64 + timeout_in_minutes: 3600 + concurrency: 1 + concurrency_group: 'cluster-spec-sheet-cluster' + plugins: + - ./ci/plugins/mzcompose: + composition: cluster-spec-sheet + run: default + args: [--cleanup, --target=cloud-staging, environmentd] + agents: + queue: linux-aarch64-small diff --git a/misc/python/materialize/test_analytics/data/cluster_spec_sheet/cluster_spec_sheet_result_storage.py b/misc/python/materialize/test_analytics/data/cluster_spec_sheet/cluster_spec_sheet_result_storage.py index ec55f4a4583b9..67f83a202ce96 100644 --- a/misc/python/materialize/test_analytics/data/cluster_spec_sheet/cluster_spec_sheet_result_storage.py +++ b/misc/python/materialize/test_analytics/data/cluster_spec_sheet/cluster_spec_sheet_result_storage.py @@ -28,6 +28,19 @@ class ClusterSpecSheetResultEntry: time_ms: int | None +@dataclass +class ClusterSpecSheetEnvironmentdResultEntry: + scenario: str + scenario_version: str + scale: int + mode: str + category: str + test_name: str + envd_cpus: int + repetition: int + qps: float | None + + class ClusterSpecSheetResultStorage(BaseDataStorage): def add_result( @@ -76,3 +89,51 @@ def add_result( ) self.database_connector.add_update_statements(sql_statements) + + +class ClusterSpecSheetEnvironmentdResultStorage(BaseDataStorage): + + def add_result( + self, + framework_version: str, + results: list[ClusterSpecSheetEnvironmentdResultEntry], + ) -> None: + job_id = buildkite.get_var(BuildkiteEnvVar.BUILDKITE_JOB_ID) + + sql_statements = [] + + for result_entry in results: + # TODO: remove NULL castings when database-issues#8100 is resolved + sql_statements.append( + f""" + INSERT INTO cluster_spec_sheet_environmentd_result + ( + build_job_id, + framework_version, + scenario, + scenario_version, + scale, + mode, + category, + test_name, + envd_cpus, + repetition, + qps + ) + SELECT + {as_sanitized_literal(job_id)}, + {as_sanitized_literal(framework_version)}, + {as_sanitized_literal(result_entry.scenario)}, + {as_sanitized_literal(result_entry.scenario_version)}, + {result_entry.scale}, + {as_sanitized_literal(result_entry.mode)}, + {as_sanitized_literal(result_entry.category)}, + {as_sanitized_literal(result_entry.test_name)}, + {result_entry.envd_cpus}, + {result_entry.repetition}, + {result_entry.qps or 'NULL::FLOAT'} + ; + """ + ) + + self.database_connector.add_update_statements(sql_statements) diff --git a/misc/python/materialize/test_analytics/setup/cleanup/remove-build.sql b/misc/python/materialize/test_analytics/setup/cleanup/remove-build.sql index 0b2773aa9f5e5..5e65d8c5c4266 100644 --- a/misc/python/materialize/test_analytics/setup/cleanup/remove-build.sql +++ b/misc/python/materialize/test_analytics/setup/cleanup/remove-build.sql @@ -14,6 +14,7 @@ DELETE FROM scalability_framework_result WHERE build_job_id IN (SELECT build_id DELETE FROM parallel_benchmark_result WHERE build_job_id IN (SELECT build_id FROM build_job WHERE build_id IN (%build-ids%)); DELETE FROM product_limits_result WHERE build_job_id IN (SELECT build_id FROM build_job WHERE build_id IN (%build-ids%)); DELETE FROM cluster_spec_sheet_result WHERE build_job_id IN (SELECT build_id FROM build_job WHERE build_id IN (%build-ids%)); +DELETE FROM cluster_spec_sheet_environmentd_result WHERE build_job_id IN (SELECT build_id FROM build_job WHERE build_id IN (%build-ids%)); DELETE FROM build_annotation_error WHERE build_job_id IN (SELECT build_job_id FROM build_annotation WHERE build_id IN (%build-ids%)); DELETE FROM build_annotation WHERE build_id IN (%build-ids%); DELETE FROM build_job WHERE build_id IN (%build-ids%); diff --git a/misc/python/materialize/test_analytics/setup/tables/16-cluster-spec-sheet-environmentd.sql b/misc/python/materialize/test_analytics/setup/tables/16-cluster-spec-sheet-environmentd.sql new file mode 100644 index 0000000000000..9b4399fb64263 --- /dev/null +++ b/misc/python/materialize/test_analytics/setup/tables/16-cluster-spec-sheet-environmentd.sql @@ -0,0 +1,27 @@ +-- Copyright Materialize, Inc. and contributors. All rights reserved. +-- +-- Use of this software is governed by the Business Source License +-- included in the LICENSE file at the root of this repository. +-- +-- As of the Change Date specified in that file, in accordance with +-- the Business Source License, use of this software will be governed +-- by the Apache License, Version 2.0. + + +-- result of individual product limits scenarios +CREATE TABLE cluster_spec_sheet_environmentd_result ( + build_job_id TEXT NOT NULL, + framework_version TEXT NOT NULL, + scenario TEXT NOT NULL, + scenario_version TEXT NOT NULL, + scale INT NOT NULL, + mode TEXT NOT NULL, + category TEXT NOT NULL, + test_name TEXT NOT NULL, + envd_cpus INT NOT NULL, + repetition INT NOT NULL, + qps FLOAT +); + +ALTER TABLE cluster_spec_sheet_environmentd_result OWNER TO qa; +GRANT SELECT, INSERT, UPDATE ON TABLE cluster_spec_sheet_environmentd_result TO "hetzner-ci"; diff --git a/misc/python/materialize/test_analytics/setup/views/100-data-integrity.sql b/misc/python/materialize/test_analytics/setup/views/100-data-integrity.sql index 61c1a45ffbda7..56b93309c5c94 100644 --- a/misc/python/materialize/test_analytics/setup/views/100-data-integrity.sql +++ b/misc/python/materialize/test_analytics/setup/views/100-data-integrity.sql @@ -33,6 +33,10 @@ CREATE OR REPLACE VIEW v_data_integrity (table_name, own_item_key, referenced_it FROM cluster_spec_sheet_result WHERE build_job_id NOT IN (SELECT build_job_id FROM build_job) UNION + SELECT 'cluster_spec_sheet_environmentd_result', build_job_id, build_job_id, 'cluster spec sheet environmentd result references missing build job' + FROM cluster_spec_sheet_environmentd_result + WHERE build_job_id NOT IN (SELECT build_job_id FROM build_job) + UNION SELECT 'build_annotation', build_job_id, build_job_id, 'build annotation references missing build job' FROM build_annotation WHERE build_job_id NOT IN (SELECT build_job_id FROM build_job) diff --git a/misc/python/materialize/test_analytics/test_analytics_db.py b/misc/python/materialize/test_analytics/test_analytics_db.py index d14aee010a8f2..ccbdc076a4d97 100644 --- a/misc/python/materialize/test_analytics/test_analytics_db.py +++ b/misc/python/materialize/test_analytics/test_analytics_db.py @@ -26,6 +26,7 @@ BuildAnnotationStorage, ) from materialize.test_analytics.data.cluster_spec_sheet.cluster_spec_sheet_result_storage import ( + ClusterSpecSheetEnvironmentdResultStorage, ClusterSpecSheetResultStorage, ) from materialize.test_analytics.data.feature_benchmark.feature_benchmark_result_storage import ( @@ -79,6 +80,9 @@ def __init__(self, config: MzDbConfig): self.cluster_spec_sheet_results = ClusterSpecSheetResultStorage( self.database_connector ) + self.cluster_spec_sheet_environmentd_results = ( + ClusterSpecSheetEnvironmentdResultStorage(self.database_connector) + ) def _create_database_connector(self, config: MzDbConfig) -> DatabaseConnector: if config.enabled: diff --git a/test/cluster-spec-sheet/mzcompose.py b/test/cluster-spec-sheet/mzcompose.py index 02cdf04e1dc52..0f1e80ee46a03 100644 --- a/test/cluster-spec-sheet/mzcompose.py +++ b/test/cluster-spec-sheet/mzcompose.py @@ -49,10 +49,17 @@ CLUSTER_SPEC_SHEET_VERSION = "1.0.0" # Used for uploading test analytics results -REGION = os.getenv("REGION", "aws/us-east-1") -ENVIRONMENT = os.getenv("ENVIRONMENT", "production") -USERNAME = os.getenv("NIGHTLY_MZ_USERNAME", "infra+bot@materialize.com") -APP_PASSWORD = os.getenv("MZ_CLI_APP_PASSWORD") +PRODUCTION_REGION = "aws/us-east-1" +PRODUCTION_ENVIRONMENT = "production" +PRODUCTION_USERNAME = os.getenv("NIGHTLY_MZ_USERNAME", "infra+bot@materialize.com") +PRODUCTION_APP_PASSWORD = os.getenv("MZ_CLI_APP_PASSWORD") + +STAGING_REGION = "aws/eu-west-1" +STAGING_ENVIRONMENT = "staging" +STAGING_USERNAME = os.getenv( + "NIGHTLY_CANARY_USERNAME", "infra+nightly-canary@materialize.com" +) +STAGING_APP_PASSWORD = os.getenv("NIGHTLY_CANARY_APP_PASSWORD") MATERIALIZED_ADDITIONAL_SYSTEM_PARAMETER_DEFAULTS = { "memory_limiter_interval": "0s", @@ -60,11 +67,8 @@ } SERVICES = [ - Mz( - region=REGION, - environment=ENVIRONMENT, - app_password=APP_PASSWORD or "", - ), + # Overridden below + Mz(app_password=""), Materialized( propagate_crashes=True, additional_system_parameter_defaults=MATERIALIZED_ADDITIONAL_SYSTEM_PARAMETER_DEFAULTS, @@ -85,15 +89,25 @@ SCENARIO_TPCH_QUERIES_WEAK = "tpch_queries_weak" SCENARIO_TPCH_STRONG = "tpch_strong" SCENARIO_QPS_ENVD_STRONG_SCALING = "qps_envd_strong_scaling" -ALL_SCENARIOS = [ + +SCENARIOS_CLUSTERD = [ SCENARIO_AUCTION_STRONG, SCENARIO_AUCTION_WEAK, SCENARIO_TPCH_MV_STRONG, SCENARIO_TPCH_QUERIES_STRONG, SCENARIO_TPCH_QUERIES_WEAK, SCENARIO_TPCH_STRONG, +] +SCENARIOS_ENVIRONMENTD = [ SCENARIO_QPS_ENVD_STRONG_SCALING, ] +ALL_SCENARIOS = SCENARIOS_CLUSTERD + SCENARIOS_ENVIRONMENTD + +SCENARIO_GROUPS = { + "cluster": SCENARIOS_CLUSTERD, + "environmentd": SCENARIOS_ENVIRONMENTD, + "all": ALL_SCENARIOS, +} class ConnectionHandler: @@ -1898,7 +1912,7 @@ def run(self, runner: ScenarioRunner) -> None: def disable_region(c: Composition, hard: bool) -> None: - print(f"Shutting down region {REGION} ...") + print("Shutting down region ...") try: if hard: @@ -2008,9 +2022,9 @@ def wait_for_envd(target: "BenchTarget", timeout_secs: int = 300) -> None: """ if isinstance(target, CloudTarget): host = target.c.cloud_hostname() - user = USERNAME + user = target.username # Prefer the newly created app password when present; fall back to the CLI password. - password = target.new_app_password or APP_PASSWORD or "" + password = target.new_app_password or target.app_password or "" sslmode = "require" print( f"Waiting for cloud environmentd at {host}:6875 to come up with username {user} ..." @@ -2068,9 +2082,9 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: ) parser.add_argument( "--target", - default="cloud", - choices=["cloud", "docker"], - help="Target to deploy to (default: cloud).", + default="cloud-production", + choices=["cloud-production", "cloud-staging", "docker"], + help="Target to deploy to (default: cloud-production).", ) parser.add_argument( "--max-scale", @@ -2090,192 +2104,223 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: parser.add_argument( "scenarios", nargs="*", - default=ALL_SCENARIOS, - help="Scenarios to run.", + default=["all"], + choices=ALL_SCENARIOS + list(SCENARIO_GROUPS.keys()), + help="Scenarios to run, supports individual scenario names as well as 'all', 'cluster', 'environmentd'.", ) args = parser.parse_args() - scenarios = set(args.scenarios) + + scenarios: set[str] = set() + for s in args.scenarios: + if s in SCENARIO_GROUPS: + scenarios.update(SCENARIO_GROUPS[s]) + else: + scenarios.add(s) + unknown = scenarios - set(ALL_SCENARIOS) if unknown: raise ValueError(f"Unknown scenarios: {unknown}") print(f"--- Running scenarios: {', '.join(scenarios)}") - if args.target == "cloud": - target: BenchTarget = CloudTarget(c) + if args.target == "cloud-production": + target: BenchTarget = CloudTarget( + c, PRODUCTION_USERNAME, PRODUCTION_APP_PASSWORD or "" + ) + mz = Mz( + region=PRODUCTION_REGION, + environment=PRODUCTION_ENVIRONMENT, + app_password=PRODUCTION_APP_PASSWORD or "", + ) + elif args.target == "cloud-staging": + target: BenchTarget = CloudTarget( + c, STAGING_USERNAME, STAGING_APP_PASSWORD or "" + ) + mz = Mz( + region=STAGING_REGION, + environment=STAGING_ENVIRONMENT, + app_password=STAGING_APP_PASSWORD or "", + ) elif args.target == "docker": target = DockerTarget(c) + mz = Mz(app_password="") else: raise ValueError(f"Unknown target: {args.target}") - max_scale = args.max_scale - if target.max_scale() is not None: - max_scale = min(max_scale, target.max_scale()) - - if args.cleanup: - target.cleanup() - - target.initialize() - - # Derive two result files (cluster and envd-focused) from the provided --record path - base_name = os.path.splitext(args.record)[0] - cluster_path = f"{base_name}.cluster.csv" - envd_path = f"{base_name}.envd.csv" - - cluster_file = open(cluster_path, "w", newline="") - envd_file = open(envd_path, "w", newline="") - - # Traditional scenarios: cluster-focused schema - cluster_writer = csv.DictWriter( - cluster_file, - fieldnames=[ - "scenario", - "scenario_version", - "scale", - "mode", - "category", - "test_name", - "cluster_size", - "repetition", - "size_bytes", - "time_ms", - ], - extrasaction="ignore", - ) - cluster_writer.writeheader() - - # Envd-focused scenarios: QPS schema - envd_writer = csv.DictWriter( - envd_file, - fieldnames=[ - "scenario", - "scenario_version", - "scale", - "mode", - "category", - "test_name", - "envd_cpus", - "repetition", - "qps", - ], - extrasaction="ignore", - ) - envd_writer.writeheader() - - def process(scenario: str) -> None: - with c.test_case(scenario): - conn = ConnectionHandler(target.new_connection) - - # This cluster is just for misc setup queries. - size = "50cc" if isinstance(target, CloudTarget) else "scale=1,workers=1" - with conn as cur: - cur.execute("DROP CLUSTER IF EXISTS quickstart;") - cur.execute(f"CREATE CLUSTER quickstart SIZE '{size}';".encode()) - - if scenario == SCENARIO_TPCH_STRONG: - print("--- SCENARIO: Running TPC-H Index strong scaling") - run_scenario_strong( - scenario=TpchScenario( - args.scale_tpch, target.replica_size_for_scale(1) - ), - results_writer=cluster_writer, - connection=conn, - target=target, - max_scale=max_scale, - ) - if scenario == SCENARIO_TPCH_MV_STRONG: - print("--- SCENARIO: Running TPC-H Materialized view strong scaling") - run_scenario_strong( - scenario=TpchScenarioMV( - args.scale_tpch, target.replica_size_for_scale(1) - ), - results_writer=cluster_writer, - connection=conn, - target=target, - max_scale=max_scale, - ) - if scenario == SCENARIO_TPCH_QUERIES_STRONG: - print("--- SCENARIO: Running TPC-H Queries strong scaling") - run_scenario_strong( - scenario=TpchScenarioQueriesIndexedInputs( - args.scale_tpch_queries, target.replica_size_for_scale(1) - ), - results_writer=cluster_writer, - connection=conn, - target=target, - max_scale=max_scale, - ) - if scenario == SCENARIO_TPCH_QUERIES_WEAK: - print("--- SCENARIO: Running TPC-H Queries weak scaling") - run_scenario_weak( - scenario=TpchScenarioQueriesIndexedInputs( - args.scale_tpch_queries, None - ), - results_writer=cluster_writer, - connection=conn, - target=target, - max_scale=max_scale, - ) - if scenario == SCENARIO_AUCTION_STRONG: - print("--- SCENARIO: Running Auction strong scaling") - run_scenario_strong( - scenario=AuctionScenario( - args.scale_auction, target.replica_size_for_scale(1) - ), - results_writer=cluster_writer, - connection=conn, - target=target, - max_scale=max_scale, - ) - if scenario == SCENARIO_AUCTION_WEAK: - print("--- SCENARIO: Running Auction weak scaling") - run_scenario_weak( - scenario=AuctionScenario(args.scale_auction, None), - results_writer=cluster_writer, - connection=conn, - target=target, - max_scale=max_scale, - ) - if scenario == SCENARIO_QPS_ENVD_STRONG_SCALING: - print("--- SCENARIO: Running QPS envd strong scaling") - run_scenario_envd_strong_scaling( - scenario=QpsEnvdStrongScalingScenario( - 1, target.replica_size_for_scale(1) - ), - results_writer=envd_writer, - connection=conn, - target=target, - max_scale=max_scale, - ) + with c.override(mz): + max_scale = args.max_scale + if target.max_scale() is not None: + max_scale = min(max_scale, target.max_scale()) - test_failed = True - try: - scenarios = buildkite.shard_list(sorted(args.scenarios), lambda s: s) - c.test_parts(scenarios, process) - test_failed = False - finally: - cluster_file.close() - envd_file.close() - # Clean up if args.cleanup: target.cleanup() - # Upload only cluster scaling results to Test Analytics for now, until the Test Analytics schema is extended. - # TODO: See slack discussion: - # https://materializeinc.slack.com/archives/C01LKF361MZ/p1762351652336819?thread_ts=1762348361.164759&cid=C01LKF361MZ - upload_results_to_test_analytics(c, cluster_path, not test_failed) + target.initialize() + + # Derive two result files (cluster and envd-focused) from the provided --record path + base_name = os.path.splitext(args.record)[0] + cluster_path = f"{base_name}.cluster.csv" + envd_path = f"{base_name}.envd.csv" + + cluster_file = open(cluster_path, "w", newline="") + envd_file = open(envd_path, "w", newline="") + + # Traditional scenarios: cluster-focused schema + cluster_writer = csv.DictWriter( + cluster_file, + fieldnames=[ + "scenario", + "scenario_version", + "scale", + "mode", + "category", + "test_name", + "cluster_size", + "repetition", + "size_bytes", + "time_ms", + ], + extrasaction="ignore", + ) + cluster_writer.writeheader() + + # Envd-focused scenarios: QPS schema + envd_writer = csv.DictWriter( + envd_file, + fieldnames=[ + "scenario", + "scenario_version", + "scale", + "mode", + "category", + "test_name", + "envd_cpus", + "repetition", + "qps", + ], + extrasaction="ignore", + ) + envd_writer.writeheader() - assert not test_failed + def process(scenario: str) -> None: + with c.test_case(scenario): + conn = ConnectionHandler(target.new_connection) - if buildkite.is_in_buildkite(): - # Upload both CSVs as artifacts - buildkite.upload_artifact(cluster_path, cwd=MZ_ROOT, quiet=True) - buildkite.upload_artifact(envd_path, cwd=MZ_ROOT, quiet=True) + # This cluster is just for misc setup queries. + size = ( + "50cc" if isinstance(target, CloudTarget) else "scale=1,workers=1" + ) + with conn as cur: + cur.execute("DROP CLUSTER IF EXISTS quickstart;") + cur.execute(f"CREATE CLUSTER quickstart SIZE '{size}';".encode()) + + if scenario == SCENARIO_TPCH_STRONG: + print("--- SCENARIO: Running TPC-H Index strong scaling") + run_scenario_strong( + scenario=TpchScenario( + args.scale_tpch, target.replica_size_for_scale(1) + ), + results_writer=cluster_writer, + connection=conn, + target=target, + max_scale=max_scale, + ) + if scenario == SCENARIO_TPCH_MV_STRONG: + print( + "--- SCENARIO: Running TPC-H Materialized view strong scaling" + ) + run_scenario_strong( + scenario=TpchScenarioMV( + args.scale_tpch, target.replica_size_for_scale(1) + ), + results_writer=cluster_writer, + connection=conn, + target=target, + max_scale=max_scale, + ) + if scenario == SCENARIO_TPCH_QUERIES_STRONG: + print("--- SCENARIO: Running TPC-H Queries strong scaling") + run_scenario_strong( + scenario=TpchScenarioQueriesIndexedInputs( + args.scale_tpch_queries, target.replica_size_for_scale(1) + ), + results_writer=cluster_writer, + connection=conn, + target=target, + max_scale=max_scale, + ) + if scenario == SCENARIO_TPCH_QUERIES_WEAK: + print("--- SCENARIO: Running TPC-H Queries weak scaling") + run_scenario_weak( + scenario=TpchScenarioQueriesIndexedInputs( + args.scale_tpch_queries, None + ), + results_writer=cluster_writer, + connection=conn, + target=target, + max_scale=max_scale, + ) + if scenario == SCENARIO_AUCTION_STRONG: + print("--- SCENARIO: Running Auction strong scaling") + run_scenario_strong( + scenario=AuctionScenario( + args.scale_auction, target.replica_size_for_scale(1) + ), + results_writer=cluster_writer, + connection=conn, + target=target, + max_scale=max_scale, + ) + if scenario == SCENARIO_AUCTION_WEAK: + print("--- SCENARIO: Running Auction weak scaling") + run_scenario_weak( + scenario=AuctionScenario(args.scale_auction, None), + results_writer=cluster_writer, + connection=conn, + target=target, + max_scale=max_scale, + ) + if scenario == SCENARIO_QPS_ENVD_STRONG_SCALING: + print("--- SCENARIO: Running QPS envd strong scaling") + run_scenario_envd_strong_scaling( + scenario=QpsEnvdStrongScalingScenario( + 1, target.replica_size_for_scale(1) + ), + results_writer=envd_writer, + connection=conn, + target=target, + max_scale=max_scale, + ) - if args.analyze: - # Analyze both files separately (each has its own schema) - analyze_results_file(cluster_path) - analyze_results_file(envd_path) + test_failed = True + try: + scenarios_list = buildkite.shard_list(sorted(list(scenarios)), lambda s: s) + c.test_parts(scenarios_list, process) + test_failed = False + finally: + cluster_file.close() + envd_file.close() + # Clean up + if args.cleanup: + target.cleanup() + + # Upload only cluster scaling results to Test Analytics for now, until the Test Analytics schema is extended. + # TODO: See slack discussion: + # https://materializeinc.slack.com/archives/C01LKF361MZ/p1762351652336819?thread_ts=1762348361.164759&cid=C01LKF361MZ + upload_cluster_results_to_test_analytics(c, cluster_path, not test_failed) + upload_environmentd_results_to_test_analytics(c, envd_path, not test_failed) + + assert not test_failed + + if buildkite.is_in_buildkite(): + # Upload both CSVs as artifacts + buildkite.upload_artifact(cluster_path, cwd=MZ_ROOT, quiet=True) + buildkite.upload_artifact(envd_path, cwd=MZ_ROOT, quiet=True) + + if args.analyze: + # Analyze both files separately (each has its own schema) + analyze_results_file(cluster_path) + analyze_results_file(envd_path) class BenchTarget: @@ -2311,8 +2356,10 @@ def dbbench_connection_flags(self) -> list[str]: class CloudTarget(BenchTarget): - def __init__(self, c: Composition) -> None: + def __init__(self, c: Composition, username: str, app_password: str) -> None: self.c = c + self.username = username + self.app_password = app_password self.new_app_password: str | None = None def dbbench_connection_flags(self) -> list[str]: @@ -2325,7 +2372,7 @@ def dbbench_connection_flags(self) -> list[str]: "-port", "6875", "-username", - USERNAME, + self.username, "-password", self.new_app_password, "-database", @@ -2356,7 +2403,7 @@ def new_connection(self) -> psycopg.Connection: conn = psycopg.connect( host=self.c.cloud_hostname(), port=6875, - user=USERNAME, + user=self.username, password=self.new_app_password, dbname="materialize", sslmode="require", @@ -2858,7 +2905,7 @@ def labels_to_drop( return unique, dropped -def upload_results_to_test_analytics( +def upload_cluster_results_to_test_analytics( c: Composition, file: str, was_successful: bool, @@ -2900,3 +2947,46 @@ def upload_results_to_test_analytics( except Exception as e: # An error during an upload must never cause the build to fail test_analytics.on_upload_failed(e) + + +def upload_environmentd_results_to_test_analytics( + c: Composition, + file: str, + was_successful: bool, +) -> None: + if not buildkite.is_in_buildkite(): + return + + test_analytics = TestAnalyticsDb(create_test_analytics_config(c)) + test_analytics.builds.add_build_job(was_successful=was_successful) + + result_entries = [] + + with open(file) as f: + reader = csv.DictReader(f) + for row in reader: + result_entries.append( + cluster_spec_sheet_result_storage.ClusterSpecSheetEnvironmentdResultEntry( + scenario=row["scenario"], + scenario_version=row["scenario_version"], + scale=int(row["scale"]), + mode=row["mode"], + category=row["category"], + test_name=row["test_name"], + envd_cpus=int(row["envd_cpus"]), + repetition=int(row["repetition"]), + qps=float(row["qps"]) if row["qps"] else None, + ) + ) + + test_analytics.cluster_spec_sheet_environmentd_results.add_result( + framework_version=CLUSTER_SPEC_SHEET_VERSION, + results=result_entries, + ) + + try: + test_analytics.submit_updates() + print("Uploaded results.") + except Exception as e: + # An error during an upload must never cause the build to fail + test_analytics.on_upload_failed(e) From c54bc80dd5b4d3c5c7e68c1023ce7e62505f33a6 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Fri, 14 Nov 2025 12:56:11 +0000 Subject: [PATCH 5/5] Fix repo for dbbench --- test/dbbench/Dockerfile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/dbbench/Dockerfile b/test/dbbench/Dockerfile index 83b4b20f55723..8fe3f82498aed 100644 --- a/test/dbbench/Dockerfile +++ b/test/dbbench/Dockerfile @@ -7,7 +7,7 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. -# Build the dbbench Go binary from https://github.com/sjwiesman/dbbench +# Build the dbbench Go binary from https://github.com/MaterializeInc/dbbench # We use the standard Materialize mzbuild base image. MZFROM ubuntu-base @@ -27,10 +27,10 @@ ENV CGO_ENABLED=0 # Pin to specific commit for reproducibility # TODO: Fork it under the MaterializeInc organization (like e.g. sqlsmith). -ADD https://api.github.com/repos/sjwiesman/dbbench/git/commits/10b2a0b5159f06945646fa1179bda4be51fe02b4 version.json +ADD https://api.github.com/repos/MaterializeInc/dbbench/git/commits/10b2a0b5159f06945646fa1179bda4be51fe02b4 version.json # Clone and build dbbench. Some forks place main under ./cmd/dbbench; handle both. -RUN git clone https://github.com/sjwiesman/dbbench /workdir/dbbench \ +RUN git clone https://github.com/MaterializeInc/dbbench /workdir/dbbench \ && cd /workdir/dbbench \ && git checkout 10b2a0b5159f06945646fa1179bda4be51fe02b4 \ && go mod download \