@@ -108,20 +108,56 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
108
108
else :
109
109
if parallelism_count == 1 or parallelism_index == 0 :
110
110
test_upgrade_from_version (
111
- c , f"{ version } " , priors , filter = args .filter , zero_downtime = True
111
+ c ,
112
+ f"{ version } " ,
113
+ priors ,
114
+ filter = args .filter ,
115
+ zero_downtime = True ,
116
+ force_source_table_syntax = False ,
112
117
)
113
118
if parallelism_count == 1 or parallelism_index == 1 :
114
119
test_upgrade_from_version (
115
- c , f"{ version } " , priors , filter = args .filter , zero_downtime = False
120
+ c ,
121
+ f"{ version } " ,
122
+ priors ,
123
+ filter = args .filter ,
124
+ zero_downtime = False ,
125
+ force_source_table_syntax = False ,
126
+ )
127
+ test_upgrade_from_version (
128
+ c ,
129
+ f"{ version } " ,
130
+ priors ,
131
+ filter = args .filter ,
132
+ zero_downtime = False ,
133
+ force_source_table_syntax = True ,
116
134
)
117
135
118
136
if parallelism_count == 1 or parallelism_index == 0 :
119
137
test_upgrade_from_version (
120
- c , "current_source" , priors = [], filter = args .filter , zero_downtime = True
138
+ c ,
139
+ "current_source" ,
140
+ priors = [],
141
+ filter = args .filter ,
142
+ zero_downtime = True ,
143
+ force_source_table_syntax = False ,
121
144
)
122
145
if parallelism_count == 1 or parallelism_index == 1 :
123
146
test_upgrade_from_version (
124
- c , "current_source" , priors = [], filter = args .filter , zero_downtime = False
147
+ c ,
148
+ "current_source" ,
149
+ priors = [],
150
+ filter = args .filter ,
151
+ zero_downtime = False ,
152
+ force_source_table_syntax = False ,
153
+ )
154
+ test_upgrade_from_version (
155
+ c ,
156
+ "current_source" ,
157
+ priors = [],
158
+ filter = args .filter ,
159
+ zero_downtime = False ,
160
+ force_source_table_syntax = True ,
125
161
)
126
162
127
163
@@ -144,13 +180,14 @@ def test_upgrade_from_version(
144
180
priors : list [MzVersion ],
145
181
filter : str ,
146
182
zero_downtime : bool ,
183
+ force_source_table_syntax : bool ,
147
184
) -> None :
148
185
print (
149
186
f"+++ Testing { '0dt upgrade' if zero_downtime else 'regular upgrade' } from Materialize { from_version } to current_source."
150
187
)
151
188
152
189
system_parameter_defaults = get_default_system_parameters (
153
- zero_downtime = zero_downtime
190
+ zero_downtime = zero_downtime ,
154
191
)
155
192
deploy_generation = 0
156
193
@@ -289,6 +326,13 @@ def test_upgrade_from_version(
289
326
c .rm (mz_service )
290
327
291
328
print (f"{ '0dt-' if zero_downtime else '' } Upgrading to final version" )
329
+ system_parameter_defaults = get_default_system_parameters (
330
+ zero_downtime = zero_downtime ,
331
+ # We can only force the syntax on the final version so that the migration to convert
332
+ # sources to the new model can be applied without preventing sources from being
333
+ # created in the old syntax on the older version.
334
+ force_source_table_syntax = force_source_table_syntax ,
335
+ )
292
336
mz_to = Materialized (
293
337
name = mz_service ,
294
338
options = list (mz_options .values ()),
0 commit comments