Skip to content

Commit 079d531

Browse files
committed
Update: regard SchemaEvolveException as the only acceptable exception in tolerant mode
1 parent b22fbad commit 079d531

File tree

10 files changed

+148
-44
lines changed

10 files changed

+148
-44
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.exceptions;
19+
20+
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
21+
22+
import javax.annotation.Nullable;
23+
24+
/** An exception occurred during schema evolution. */
25+
public class SchemaEvolveException extends Exception {
26+
private final SchemaChangeEvent applyingEvent;
27+
private final String problem;
28+
private final @Nullable Throwable context;
29+
30+
public SchemaEvolveException(
31+
SchemaChangeEvent applyingEvent, String problem, @Nullable Throwable context) {
32+
this.applyingEvent = applyingEvent;
33+
this.problem = problem;
34+
this.context = context;
35+
}
36+
37+
public SchemaChangeEvent getApplyingEvent() {
38+
return applyingEvent;
39+
}
40+
41+
public String getProblem() {
42+
return problem;
43+
}
44+
45+
@Nullable
46+
public Throwable getContext() {
47+
return context;
48+
}
49+
50+
@Override
51+
public String toString() {
52+
return "SchemaEvolveException{"
53+
+ "applyingEvent="
54+
+ applyingEvent
55+
+ ", problem='"
56+
+ problem
57+
+ '\''
58+
+ ", context='"
59+
+ context
60+
+ '\''
61+
+ '}';
62+
}
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.exceptions;
19+
20+
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
21+
22+
/** A special kind of {@link SchemaEvolveException} that sink doesn't support such event type. */
23+
public class UnsupportedSchemaChangeEventException extends SchemaEvolveException {
24+
25+
public UnsupportedSchemaChangeEventException(SchemaChangeEvent applyingEvent) {
26+
super(applyingEvent, "Sink doesn't support such schema change event.", null);
27+
}
28+
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/MetadataApplier.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.cdc.common.annotation.PublicEvolving;
2121
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2222
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
23+
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
2324

2425
import java.io.Serializable;
2526
import java.util.Set;
@@ -35,5 +36,5 @@ public interface MetadataApplier extends Serializable {
3536
Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes();
3637

3738
/** Apply the given {@link SchemaChangeEvent} to external systems. */
38-
void applySchemaChange(SchemaChangeEvent schemaChangeEvent);
39+
void applySchemaChange(SchemaChangeEvent schemaChangeEvent) throws SchemaEvolveException;
3940
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2727
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2828
import org.apache.flink.cdc.common.event.TableId;
29+
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
30+
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
2931
import org.apache.flink.cdc.common.schema.Column;
3032
import org.apache.flink.cdc.common.schema.Schema;
3133
import org.apache.flink.cdc.common.sink.MetadataApplier;
@@ -97,7 +99,7 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
9799
}
98100

99101
@Override
100-
public void applySchemaChange(SchemaChangeEvent event) {
102+
public void applySchemaChange(SchemaChangeEvent event) throws SchemaEvolveException {
101103
try {
102104
// send schema change op to doris
103105
if (event instanceof CreateTableEvent) {
@@ -109,11 +111,10 @@ public void applySchemaChange(SchemaChangeEvent event) {
109111
} else if (event instanceof RenameColumnEvent) {
110112
applyRenameColumnEvent((RenameColumnEvent) event);
111113
} else if (event instanceof AlterColumnTypeEvent) {
112-
throw new RuntimeException("Unsupported schema change event, " + event);
114+
throw new UnsupportedSchemaChangeEventException(event);
113115
}
114116
} catch (Exception ex) {
115-
throw new RuntimeException(
116-
"Failed to schema change, " + event + ", reason: " + ex.getMessage());
117+
throw new SchemaEvolveException(event, ex.getMessage(), null);
117118
}
118119
}
119120

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2626
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2727
import org.apache.flink.cdc.common.event.TableId;
28+
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
29+
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
2830
import org.apache.flink.cdc.common.schema.Schema;
2931
import org.apache.flink.cdc.common.sink.MetadataApplier;
3032
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
@@ -114,7 +116,8 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
114116
}
115117

116118
@Override
117-
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
119+
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent)
120+
throws SchemaEvolveException {
118121
if (catalog == null) {
119122
catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
120123
}
@@ -130,11 +133,10 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
130133
} else if (schemaChangeEvent instanceof AlterColumnTypeEvent) {
131134
applyAlterColumn((AlterColumnTypeEvent) schemaChangeEvent);
132135
} else {
133-
throw new UnsupportedOperationException(
134-
"PaimonDataSink doesn't support schema change event " + schemaChangeEvent);
136+
throw new UnsupportedSchemaChangeEventException(schemaChangeEvent);
135137
}
136138
} catch (Exception e) {
137-
throw new RuntimeException(e);
139+
throw new SchemaEvolveException(schemaChangeEvent, "schema change applying failure", e);
138140
}
139141
}
140142

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.cdc.common.event.DropColumnEvent;
2424
import org.apache.flink.cdc.common.event.RenameColumnEvent;
2525
import org.apache.flink.cdc.common.event.TableId;
26+
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
2627
import org.apache.flink.cdc.common.schema.Column;
2728
import org.apache.flink.cdc.common.sink.MetadataApplier;
2829
import org.apache.flink.cdc.common.types.DataType;
@@ -94,7 +95,7 @@ private void initialize(String metastore)
9495
@ValueSource(strings = {"filesystem", "hive"})
9596
public void testApplySchemaChange(String metastore)
9697
throws Catalog.TableNotExistException, Catalog.DatabaseNotEmptyException,
97-
Catalog.DatabaseNotExistException {
98+
Catalog.DatabaseNotExistException, SchemaEvolveException {
9899
initialize(metastore);
99100
MetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions);
100101
CreateTableEvent createTableEvent =
@@ -182,7 +183,7 @@ public void testApplySchemaChange(String metastore)
182183
@ValueSource(strings = {"filesystem", "hive"})
183184
public void testCreateTableWithOptions(String metastore)
184185
throws Catalog.TableNotExistException, Catalog.DatabaseNotEmptyException,
185-
Catalog.DatabaseNotExistException {
186+
Catalog.DatabaseNotExistException, SchemaEvolveException {
186187
initialize(metastore);
187188
Map<String, String> tableOptions = new HashMap<>();
188189
tableOptions.put("bucket", "-1");
@@ -228,7 +229,7 @@ public void testCreateTableWithOptions(String metastore)
228229
@ValueSource(strings = {"filesystem", "hive"})
229230
public void testCreateTableWithAllDataTypes(String metastore)
230231
throws Catalog.TableNotExistException, Catalog.DatabaseNotEmptyException,
231-
Catalog.DatabaseNotExistException {
232+
Catalog.DatabaseNotExistException, SchemaEvolveException {
232233
initialize(metastore);
233234
MetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions);
234235
CreateTableEvent createTableEvent =
@@ -338,7 +339,7 @@ public void testCreateTableWithAllDataTypes(String metastore)
338339
@ValueSource(strings = {"filesystem", "hive"})
339340
public void testAddColumnWithPosition(String metastore)
340341
throws Catalog.DatabaseNotEmptyException, Catalog.DatabaseNotExistException,
341-
Catalog.TableNotExistException {
342+
Catalog.TableNotExistException, SchemaEvolveException {
342343
initialize(metastore);
343344
MetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions);
344345

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.flink.cdc.common.event.DropColumnEvent;
3232
import org.apache.flink.cdc.common.event.Event;
3333
import org.apache.flink.cdc.common.event.TableId;
34+
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
3435
import org.apache.flink.cdc.common.schema.Column;
3536
import org.apache.flink.cdc.common.schema.Schema;
3637
import org.apache.flink.cdc.common.types.DataTypes;
@@ -129,7 +130,7 @@ private void initialize(String metastore)
129130
.dropDatabase(TEST_DATABASE, true, true);
130131
}
131132

132-
private List<Event> createTestEvents() {
133+
private List<Event> createTestEvents() throws SchemaEvolveException {
133134
List<Event> testEvents = new ArrayList<>();
134135
// create table
135136
Schema schema =
@@ -171,7 +172,7 @@ private List<Event> createTestEvents() {
171172
@ValueSource(strings = {"filesystem", "hive"})
172173
public void testSinkWithDataChange(String metastore)
173174
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
174-
Catalog.DatabaseNotExistException {
175+
Catalog.DatabaseNotExistException, SchemaEvolveException {
175176
initialize(metastore);
176177
PaimonSink<Event> paimonSink =
177178
new PaimonSink<>(
@@ -257,7 +258,7 @@ public void testSinkWithDataChange(String metastore)
257258
@ValueSource(strings = {"filesystem", "hive"})
258259
public void testSinkWithSchemaChange(String metastore)
259260
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
260-
Catalog.DatabaseNotExistException {
261+
Catalog.DatabaseNotExistException, SchemaEvolveException {
261262
initialize(metastore);
262263
PaimonSink<Event> paimonSink =
263264
new PaimonSink(
@@ -392,7 +393,7 @@ public void testSinkWithSchemaChange(String metastore)
392393
@ValueSource(strings = {"filesystem", "hive"})
393394
public void testSinkWithMultiTables(String metastore)
394395
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
395-
Catalog.DatabaseNotExistException {
396+
Catalog.DatabaseNotExistException, SchemaEvolveException {
396397
initialize(metastore);
397398
PaimonSink<Event> paimonSink =
398399
new PaimonSink<>(

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2626
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2727
import org.apache.flink.cdc.common.event.TableId;
28+
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
29+
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
2830
import org.apache.flink.cdc.common.schema.Column;
2931
import org.apache.flink.cdc.common.sink.MetadataApplier;
3032

@@ -91,7 +93,8 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
9193
}
9294

9395
@Override
94-
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
96+
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent)
97+
throws SchemaEvolveException {
9598
if (!isOpened) {
9699
isOpened = true;
97100
catalog.open();
@@ -108,12 +111,11 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
108111
} else if (schemaChangeEvent instanceof AlterColumnTypeEvent) {
109112
applyAlterColumn((AlterColumnTypeEvent) schemaChangeEvent);
110113
} else {
111-
throw new UnsupportedOperationException(
112-
"StarRocksDataSink doesn't support schema change event " + schemaChangeEvent);
114+
throw new UnsupportedSchemaChangeEventException(schemaChangeEvent);
113115
}
114116
}
115117

116-
private void applyCreateTable(CreateTableEvent createTableEvent) {
118+
private void applyCreateTable(CreateTableEvent createTableEvent) throws SchemaEvolveException {
117119
StarRocksTable starRocksTable =
118120
StarRocksUtils.toStarRocksTable(
119121
createTableEvent.tableId(),
@@ -128,11 +130,11 @@ private void applyCreateTable(CreateTableEvent createTableEvent) {
128130
LOG.info("Successful to create table, event: {}", createTableEvent);
129131
} catch (StarRocksCatalogException e) {
130132
LOG.error("Failed to create table, event: {}", createTableEvent.tableId(), e);
131-
throw new RuntimeException("Failed to create table, event: " + createTableEvent, e);
133+
throw new SchemaEvolveException(createTableEvent, "Failed to create table", e);
132134
}
133135
}
134136

135-
private void applyAddColumn(AddColumnEvent addColumnEvent) {
137+
private void applyAddColumn(AddColumnEvent addColumnEvent) throws SchemaEvolveException {
136138
List<StarRocksColumn> addColumns = new ArrayList<>();
137139
for (AddColumnEvent.ColumnWithPosition columnWithPosition :
138140
addColumnEvent.getAddedColumns()) {
@@ -202,21 +204,21 @@ private void applyAddColumn(AddColumnEvent addColumnEvent) {
202204
"Failed to apply add column because of alter exception, event: {}",
203205
addColumnEvent,
204206
alterException);
205-
throw new RuntimeException(
206-
"Failed to apply add column because of alter exception, event: "
207-
+ addColumnEvent,
207+
throw new SchemaEvolveException(
208+
addColumnEvent,
209+
"Failed to apply add column because of alter exception, event: ",
208210
alterException);
209211
} else {
210212
String errorMsg =
211213
String.format(
212214
"Failed to apply add column because of validation failure, event: %s, table: %s",
213215
addColumnEvent, table);
214216
LOG.error(errorMsg);
215-
throw new RuntimeException(errorMsg);
217+
throw new SchemaEvolveException(addColumnEvent, errorMsg, null);
216218
}
217219
}
218220

219-
private void applyDropColumn(DropColumnEvent dropColumnEvent) {
221+
private void applyDropColumn(DropColumnEvent dropColumnEvent) throws SchemaEvolveException {
220222
List<String> dropColumns = dropColumnEvent.getDroppedColumnNames();
221223
TableId tableId = dropColumnEvent.tableId();
222224
StarRocksCatalogException alterException = null;
@@ -268,33 +270,35 @@ private void applyDropColumn(DropColumnEvent dropColumnEvent) {
268270
"Failed to apply drop column because of alter exception, event: {}",
269271
dropColumnEvent,
270272
alterException);
271-
throw new RuntimeException(
272-
"Failed to apply drop column because of alter exception, event: "
273-
+ dropColumnEvent,
273+
throw new SchemaEvolveException(
274+
dropColumnEvent,
275+
"Failed to apply drop column because of alter exception",
274276
alterException);
275277
} else {
276278
String errorMsg =
277279
String.format(
278280
"Failed to apply drop column because of validation failure, event: %s, table: %s",
279281
dropColumnEvent, table);
280282
LOG.error(errorMsg);
281-
throw new RuntimeException(errorMsg);
283+
throw new SchemaEvolveException(dropColumnEvent, errorMsg, null);
282284
}
283285
}
284286

285-
private void applyRenameColumn(RenameColumnEvent renameColumnEvent) {
287+
private void applyRenameColumn(RenameColumnEvent renameColumnEvent)
288+
throws SchemaEvolveException {
286289
// TODO StarRocks plans to support column rename since 3.3 which has not been released.
287290
// Support it later.
288-
throw new UnsupportedOperationException("Rename column is not supported currently");
291+
throw new UnsupportedSchemaChangeEventException(renameColumnEvent);
289292
}
290293

291-
private void applyAlterColumn(AlterColumnTypeEvent alterColumnTypeEvent) {
294+
private void applyAlterColumn(AlterColumnTypeEvent alterColumnTypeEvent)
295+
throws SchemaEvolveException {
292296
// TODO There are limitations for data type conversions. We should know the data types
293297
// before and after changing so that we can make a validation. But the event only contains
294298
// data
295299
// types after changing. One way is that the framework delivers the old schema. We can
296300
// support
297301
// the alter after a discussion.
298-
throw new UnsupportedOperationException("Alter column is not supported currently");
302+
throw new UnsupportedSchemaChangeEventException(alterColumnTypeEvent);
299303
}
300304
}

0 commit comments

Comments
 (0)