Skip to content

Commit b3b8a5e

Browse files
committed
Fix test cases
1 parent 672d61a commit b3b8a5e

File tree

1 file changed

+23
-3
lines changed

1 file changed

+23
-3
lines changed

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java

+23-3
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,9 @@ void testOneToOneMapping() {
147147

148148
@Test
149149
void testMergingTablesWithExactSameSchema() {
150+
SchemaManager schemaManager = new SchemaManager();
150151
SchemaDerivation schemaDerivation =
151-
new SchemaDerivation(new SchemaManager(), ROUTES, new HashMap<>());
152+
new SchemaDerivation(schemaManager, ROUTES, new HashMap<>());
152153

153154
// Create table 1
154155
List<SchemaChangeEvent> derivedChangesAfterCreateTable =
@@ -158,6 +159,8 @@ void testMergingTablesWithExactSameSchema() {
158159
.asCreateTableEvent()
159160
.hasTableId(MERGED_TABLE)
160161
.hasSchema(SCHEMA);
162+
derivedChangesAfterCreateTable.forEach(schemaManager::applyEvolvedSchemaChange);
163+
161164
// Create table 2
162165
assertThat(schemaDerivation.applySchemaChange(new CreateTableEvent(TABLE_2, SCHEMA)))
163166
.isEmpty();
@@ -177,6 +180,8 @@ void testMergingTablesWithExactSameSchema() {
177180
.asAddColumnEvent()
178181
.hasTableId(MERGED_TABLE)
179182
.containsAddedColumns(newCol1, newCol2);
183+
derivedChangesAfterAddColumn.forEach(schemaManager::applyEvolvedSchemaChange);
184+
180185
// Add column for table 2
181186
assertThat(schemaDerivation.applySchemaChange(new AddColumnEvent(TABLE_2, newColumns)))
182187
.isEmpty();
@@ -190,6 +195,8 @@ void testMergingTablesWithExactSameSchema() {
190195
.asAlterColumnTypeEvent()
191196
.hasTableId(MERGED_TABLE)
192197
.containsTypeMapping(typeMapping);
198+
derivedChangesAfterAlterColumnType.forEach(schemaManager::applyEvolvedSchemaChange);
199+
193200
// Alter column type for table 2
194201
assertThat(
195202
schemaDerivation.applySchemaChange(
@@ -215,6 +222,8 @@ void testMergingTablesWithExactSameSchema() {
215222
.containsAddedColumns(
216223
new AddColumnEvent.ColumnWithPosition(
217224
new PhysicalColumn("last_name", DataTypes.STRING(), null)));
225+
derivedChangesAfterRenameColumn.forEach(schemaManager::applyEvolvedSchemaChange);
226+
218227
// Rename column for table 2
219228
assertThat(
220229
schemaDerivation.applySchemaChange(
@@ -235,6 +244,8 @@ void testMergingTableWithDifferentSchemas() {
235244
.asCreateTableEvent()
236245
.hasTableId(MERGED_TABLE)
237246
.hasSchema(SCHEMA);
247+
derivedChangesAfterCreateTable.forEach(schemaManager::applyEvolvedSchemaChange);
248+
238249
// Create table 2
239250
List<SchemaChangeEvent> derivedChangesAfterCreateTable2 =
240251
schemaDerivation.applySchemaChange(
@@ -250,6 +261,7 @@ void testMergingTableWithDifferentSchemas() {
250261
"gender", DataTypes.STRING(), null)))),
251262
new AlterColumnTypeEvent(
252263
MERGED_TABLE, ImmutableMap.of("age", DataTypes.BIGINT())));
264+
derivedChangesAfterCreateTable2.forEach(schemaManager::applyEvolvedSchemaChange);
253265

254266
// Add column for table 1
255267
AddColumnEvent.ColumnWithPosition newCol1 =
@@ -266,6 +278,8 @@ void testMergingTableWithDifferentSchemas() {
266278
.asAddColumnEvent()
267279
.hasTableId(MERGED_TABLE)
268280
.containsAddedColumns(newCol1, newCol2);
281+
derivedChangesAfterAddColumn.forEach(schemaManager::applyEvolvedSchemaChange);
282+
269283
// Add column for table 2
270284
List<SchemaChangeEvent> derivedChangesAfterAddColumnForTable2 =
271285
schemaDerivation.applySchemaChange(
@@ -284,6 +298,7 @@ void testMergingTableWithDifferentSchemas() {
284298
.containsTypeMapping(
285299
ImmutableMap.of(
286300
"new_col1", DataTypes.STRING(), "new_col2", DataTypes.STRING()));
301+
derivedChangesAfterAddColumnForTable2.forEach(schemaManager::applyEvolvedSchemaChange);
287302

288303
// Alter column type for table 1
289304
ImmutableMap<String, DataType> typeMapping = ImmutableMap.of("age", DataTypes.BIGINT());
@@ -316,6 +331,8 @@ void testMergingTableWithDifferentSchemas() {
316331
.containsAddedColumns(
317332
new AddColumnEvent.ColumnWithPosition(
318333
new PhysicalColumn("last_name", DataTypes.STRING(), null)));
334+
derivedChangesAfterRenameColumn.forEach(schemaManager::applyEvolvedSchemaChange);
335+
319336
// Rename column for table 2
320337
List<SchemaChangeEvent> derivedChangesAfterRenameColumnForTable2 =
321338
schemaDerivation.applySchemaChange(
@@ -327,8 +344,9 @@ void testMergingTableWithDifferentSchemas() {
327344
.containsAddedColumns(
328345
new AddColumnEvent.ColumnWithPosition(
329346
new PhysicalColumn("first_name", DataTypes.STRING(), null)));
347+
derivedChangesAfterRenameColumnForTable2.forEach(schemaManager::applyEvolvedSchemaChange);
330348

331-
assertThat(schemaManager.getLatestUpstreamSchema(MERGED_TABLE))
349+
assertThat(schemaManager.getLatestEvolvedSchema(MERGED_TABLE))
332350
.contains(
333351
Schema.newBuilder()
334352
.column(Column.physicalColumn("id", DataTypes.BIGINT()))
@@ -344,8 +362,9 @@ void testMergingTableWithDifferentSchemas() {
344362

345363
@Test
346364
void testIncompatibleTypes() {
365+
SchemaManager schemaManager = new SchemaManager();
347366
SchemaDerivation schemaDerivation =
348-
new SchemaDerivation(new SchemaManager(), ROUTES, new HashMap<>());
367+
new SchemaDerivation(schemaManager, ROUTES, new HashMap<>());
349368
// Create table 1
350369
List<SchemaChangeEvent> derivedChangesAfterCreateTable =
351370
schemaDerivation.applySchemaChange(new CreateTableEvent(TABLE_1, SCHEMA));
@@ -354,6 +373,7 @@ void testIncompatibleTypes() {
354373
.asCreateTableEvent()
355374
.hasTableId(MERGED_TABLE)
356375
.hasSchema(SCHEMA);
376+
derivedChangesAfterCreateTable.forEach(schemaManager::applyEvolvedSchemaChange);
357377

358378
// Create table 2
359379
assertThatThrownBy(

0 commit comments

Comments
 (0)