1111from materialize .checks .actions import Testdrive
1212from materialize .checks .checks import Check , externally_idempotent
1313from materialize .checks .common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
14+ from materialize .checks .executors import Executor
15+ from materialize .mz_version import MzVersion
1416
1517
1618def schemas () -> str :
@@ -21,6 +23,13 @@ def schemas() -> str:
2123class MultiplePartitions (Check ):
2224 """Test that adds new partitions to a Kafka source"""
2325
26+ def _can_run (self , e : Executor ) -> bool :
27+ # v0.80.0 introduced backward incompatible changes to `TOPIC METADATA
28+ # REFRESH INTERVAL`, which was never available to customers, so rather
29+ # than try to introduce hacks to support it, we simply disable tests
30+ # that used
31+ return self .base_version >= MzVersion .parse_mz ("v0.80.0-dev" )
32+
2433 def initialize (self ) -> Testdrive :
2534 return Testdrive (
2635 schemas ()
@@ -35,10 +44,11 @@ def initialize(self) -> Testdrive:
3544 $ kafka-ingest format=avro key-format=avro topic=multiple-partitions-topic key-schema=${keyschema} schema=${schema} repeat=100
3645 {"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
3746
38- > CREATE SOURCE multiple_partitions_source
39- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-multiple-partitions-topic-${testdrive.seed}', TOPIC METADATA REFRESH INTERVAL '500ms')
40- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
41- ENVELOPE UPSERT
47+ # Note: we use "postgres-execute" here instead of ">" because for commands run with
48+ # the ">" testdrive parses them with the SQL parser from `main`, and the SQL for
49+ # this command is version dependent.
50+ $ postgres-execute connection=postgres://materialize:materialize@${testdrive.materialize-sql-addr}
51+ CREATE SOURCE multiple_partitions_source FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-multiple-partitions-topic-${testdrive.seed}', TOPIC METADATA REFRESH INTERVAL '500ms') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT;
4252
4353 $ kafka-add-partitions topic=multiple-partitions-topic total-partitions=2
4454
0 commit comments