Skip to content

Commit 7c97bad

Browse files
committed
Address comments
1 parent b3b8a5e commit 7c97bad

File tree

20 files changed

+98
-77
lines changed

20 files changed

+98
-77
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java

+3
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
package org.apache.flink.cdc.common.event;
1919

20+
import org.apache.flink.cdc.common.annotation.Public;
21+
2022
/** An enumeration of schema change event types for {@link SchemaChangeEvent}. */
23+
@Public
2124
public enum SchemaChangeEventType {
2225
ADD_COLUMN,
2326
ALTER_COLUMN_TYPE,

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,19 @@
1818
package org.apache.flink.cdc.common.exceptions;
1919

2020
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
21+
import org.apache.flink.util.FlinkRuntimeException;
2122

2223
import javax.annotation.Nullable;
2324

2425
/** An exception occurred during schema evolution. */
25-
public class SchemaEvolveException extends Exception {
26+
public class SchemaEvolveException extends FlinkRuntimeException {
2627
private final SchemaChangeEvent applyingEvent;
2728
private final String problem;
2829
private final @Nullable Throwable context;
2930

3031
public SchemaEvolveException(
3132
SchemaChangeEvent applyingEvent, String problem, @Nullable Throwable context) {
33+
super(context);
3234
this.applyingEvent = applyingEvent;
3335
this.problem = problem;
3436
this.context = context;

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/SchemaChangeBehavior.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919

2020
import org.apache.flink.cdc.common.annotation.PublicEvolving;
2121

22-
/** Behavior for handling schema changes. */
22+
/**
23+
* Behavior for handling schema changes. Enums are sorted from the most tolerant strategy (IGNORE)
24+
* to the most aggressive one (EXCEPTION).
25+
*/
2326
@PublicEvolving
2427
public enum SchemaChangeBehavior {
2528
IGNORE,

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.flink.cdc.common.sink;
1919

2020
import org.apache.flink.cdc.common.annotation.PublicEvolving;
21+
import org.apache.flink.cdc.common.event.DataChangeEvent;
22+
import org.apache.flink.cdc.common.function.HashFunctionProvider;
2123

2224
/**
2325
* {@code DataSink} is used to write change data to external system and apply metadata changes to

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/MetadataApplier.java

+19-6
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,34 @@
2020
import org.apache.flink.cdc.common.annotation.PublicEvolving;
2121
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2222
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
23+
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
2324
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
2425

2526
import java.io.Serializable;
27+
import java.util.Arrays;
2628
import java.util.Set;
29+
import java.util.stream.Collectors;
2730

2831
/** {@code MetadataApplier} is used to apply metadata changes to external systems. */
2932
@PublicEvolving
3033
public interface MetadataApplier extends Serializable {
3134

32-
/** Checks if this metadata applier should handle this event type. */
33-
boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType);
34-
35-
/** Checks what kind of schema change events downstream can handle. */
36-
Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes();
37-
3835
/** Apply the given {@link SchemaChangeEvent} to external systems. */
3936
void applySchemaChange(SchemaChangeEvent schemaChangeEvent) throws SchemaEvolveException;
37+
38+
/** Sets enabled schema evolution event types of current metadata applier. */
39+
default MetadataApplier setAcceptedSchemaEvolutionTypes(
40+
Set<SchemaChangeEventType> schemaEvolutionTypes) {
41+
return this;
42+
}
43+
44+
/** Checks if this metadata applier should this event type. */
45+
default boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) {
46+
return true;
47+
}
48+
49+
/** Checks what kind of schema change events downstream can handle. */
50+
default Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
51+
return Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet());
52+
}
4053
}

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,9 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
134134
schemaOperatorTranslator.translate(
135135
stream,
136136
parallelism,
137-
dataSink.getMetadataApplier(
138-
pipelineDef.getSink().getIncludedSchemaEvolutionTypes()),
137+
dataSink.getMetadataApplier()
138+
.setAcceptedSchemaEvolutionTypes(
139+
pipelineDef.getSink().getIncludedSchemaEvolutionTypes()),
139140
pipelineDef.getRoute());
140141

141142
// Build Partitioner used to shuffle Event

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/utils/factory/DataSinkFactory1.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.flink.cdc.composer.utils.factory;
1919

2020
import org.apache.flink.cdc.common.configuration.ConfigOption;
21-
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2221
import org.apache.flink.cdc.common.factories.DataSinkFactory;
2322
import org.apache.flink.cdc.common.sink.DataSink;
2423
import org.apache.flink.cdc.common.sink.EventSinkProvider;
@@ -70,7 +69,7 @@ public EventSinkProvider getEventSinkProvider() {
7069
}
7170

7271
@Override
73-
public MetadataApplier getMetadataApplier(Set<SchemaChangeEventType> enabledEventTypes) {
72+
public MetadataApplier getMetadataApplier() {
7473
return null;
7574
}
7675
}

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/utils/factory/DataSinkFactory2.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.flink.cdc.composer.utils.factory;
1919

2020
import org.apache.flink.cdc.common.configuration.ConfigOption;
21-
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2221
import org.apache.flink.cdc.common.factories.DataSinkFactory;
2322
import org.apache.flink.cdc.common.sink.DataSink;
2423
import org.apache.flink.cdc.common.sink.EventSinkProvider;
@@ -38,8 +37,7 @@ public EventSinkProvider getEventSinkProvider() {
3837
}
3938

4039
@Override
41-
public MetadataApplier getMetadataApplier(
42-
Set<SchemaChangeEventType> enabledEventTypes) {
40+
public MetadataApplier getMetadataApplier() {
4341
return null;
4442
}
4543
};

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSink.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.flink.cdc.connectors.doris.sink;
1919

2020
import org.apache.flink.cdc.common.configuration.Configuration;
21-
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2221
import org.apache.flink.cdc.common.sink.DataSink;
2322
import org.apache.flink.cdc.common.sink.EventSinkProvider;
2423
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
@@ -32,7 +31,6 @@
3231

3332
import java.io.Serializable;
3433
import java.time.ZoneId;
35-
import java.util.Set;
3634

3735
/** A {@link DataSink} for "Doris" connector. */
3836
public class DorisDataSink implements DataSink, Serializable {
@@ -76,7 +74,7 @@ public EventSinkProvider getEventSinkProvider() {
7674
}
7775

7876
@Override
79-
public MetadataApplier getMetadataApplier(Set<SchemaChangeEventType> enabledEventTypes) {
80-
return new DorisMetadataApplier(dorisOptions, configuration, enabledEventTypes);
77+
public MetadataApplier getMetadataApplier() {
78+
return new DorisMetadataApplier(dorisOptions, configuration);
8179
}
8280
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java

+6-9
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public class DorisMetadataApplier implements MetadataApplier {
6969
private DorisOptions dorisOptions;
7070
private SchemaChangeManager schemaChangeManager;
7171
private Configuration config;
72-
private final Set<SchemaChangeEventType> enabledSchemaEvolutionTypes;
72+
private Set<SchemaChangeEventType> enabledSchemaEvolutionTypes;
7373

7474
public DorisMetadataApplier(DorisOptions dorisOptions, Configuration config) {
7575
this.dorisOptions = dorisOptions;
@@ -78,14 +78,11 @@ public DorisMetadataApplier(DorisOptions dorisOptions, Configuration config) {
7878
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
7979
}
8080

81-
public DorisMetadataApplier(
82-
DorisOptions dorisOptions,
83-
Configuration config,
84-
Set<SchemaChangeEventType> enabledSchemaEvolutionTypes) {
85-
this.dorisOptions = dorisOptions;
86-
this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
87-
this.config = config;
88-
this.enabledSchemaEvolutionTypes = enabledSchemaEvolutionTypes;
81+
@Override
82+
public MetadataApplier setAcceptedSchemaEvolutionTypes(
83+
Set<SchemaChangeEventType> schemaEvolutionTypes) {
84+
this.enabledSchemaEvolutionTypes = schemaEvolutionTypes;
85+
return this;
8986
}
9087

9188
@Override

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -428,9 +428,11 @@ private void runJobWithEvents(List<Event> events) throws Exception {
428428
schemaOperatorTranslator.translate(
429429
stream,
430430
DEFAULT_PARALLELISM,
431-
dorisSink.getMetadataApplier(
432-
Arrays.stream(SchemaChangeEventTypeFamily.ALL)
433-
.collect(Collectors.toSet())),
431+
dorisSink
432+
.getMetadataApplier()
433+
.setAcceptedSchemaEvolutionTypes(
434+
Arrays.stream(SchemaChangeEventTypeFamily.ALL)
435+
.collect(Collectors.toSet())),
434436
new ArrayList<>());
435437

436438
DataSinkTranslator sinkTranslator = new DataSinkTranslator();

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,19 @@ public EventSinkProvider getEventSinkProvider() {
9999
}
100100

101101
@Override
102-
public MetadataApplier getMetadataApplier(Set<SchemaChangeEventType> enabledEventTypes) {
102+
public MetadataApplier getMetadataApplier() {
103103
return new MetadataApplier() {
104+
105+
private Set<SchemaChangeEventType> enabledEventTypes =
106+
Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet());
107+
108+
@Override
109+
public MetadataApplier setAcceptedSchemaEvolutionTypes(
110+
Set<SchemaChangeEventType> schemaEvolutionTypes) {
111+
enabledEventTypes = schemaEvolutionTypes;
112+
return this;
113+
}
114+
104115
@Override
105116
public boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) {
106117
return enabledEventTypes.contains(schemaChangeEventType);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.flink.cdc.connectors.paimon.sink;
1919

2020
import org.apache.flink.cdc.common.event.Event;
21-
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2221
import org.apache.flink.cdc.common.event.TableId;
2322
import org.apache.flink.cdc.common.sink.DataSink;
2423
import org.apache.flink.cdc.common.sink.EventSinkProvider;
@@ -32,7 +31,6 @@
3231
import java.io.Serializable;
3332
import java.util.List;
3433
import java.util.Map;
35-
import java.util.Set;
3634

3735
/** A {@link DataSink} for Paimon connector that supports schema evolution. */
3836
public class PaimonDataSink implements DataSink, Serializable {
@@ -68,7 +66,7 @@ public EventSinkProvider getEventSinkProvider() {
6866
}
6967

7068
@Override
71-
public MetadataApplier getMetadataApplier(Set<SchemaChangeEventType> enabledEventTypes) {
72-
return new PaimonMetadataApplier(options, tableOptions, partitionMaps, enabledEventTypes);
69+
public MetadataApplier getMetadataApplier() {
70+
return new PaimonMetadataApplier(options, tableOptions, partitionMaps);
7371
}
7472
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java

+6-10
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class PaimonMetadataApplier implements MetadataApplier {
7070

7171
private final Map<TableId, List<String>> partitionMaps;
7272

73-
private final Set<SchemaChangeEventType> enabledSchemaEvolutionTypes;
73+
private Set<SchemaChangeEventType> enabledSchemaEvolutionTypes;
7474

7575
public PaimonMetadataApplier(Options catalogOptions) {
7676
this.catalogOptions = catalogOptions;
@@ -89,15 +89,11 @@ public PaimonMetadataApplier(
8989
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
9090
}
9191

92-
public PaimonMetadataApplier(
93-
Options catalogOptions,
94-
Map<String, String> tableOptions,
95-
Map<TableId, List<String>> partitionMaps,
96-
Set<SchemaChangeEventType> enabledSchemaEvolutionTypes) {
97-
this.catalogOptions = catalogOptions;
98-
this.tableOptions = tableOptions;
99-
this.partitionMaps = partitionMaps;
100-
this.enabledSchemaEvolutionTypes = enabledSchemaEvolutionTypes;
92+
@Override
93+
public MetadataApplier setAcceptedSchemaEvolutionTypes(
94+
Set<SchemaChangeEventType> schemaEvolutionTypes) {
95+
this.enabledSchemaEvolutionTypes = schemaEvolutionTypes;
96+
return this;
10197
}
10298

10399
@Override

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.flink.cdc.connectors.starrocks.sink;
1919

2020
import org.apache.flink.cdc.common.event.Event;
21-
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2221
import org.apache.flink.cdc.common.sink.DataSink;
2322
import org.apache.flink.cdc.common.sink.EventSinkProvider;
2423
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
@@ -31,7 +30,6 @@
3130

3231
import java.io.Serializable;
3332
import java.time.ZoneId;
34-
import java.util.Set;
3533

3634
/** A {@link DataSink} for StarRocks connector that supports schema evolution. */
3735
public class StarRocksDataSink implements DataSink, Serializable {
@@ -72,13 +70,12 @@ public EventSinkProvider getEventSinkProvider() {
7270
}
7371

7472
@Override
75-
public MetadataApplier getMetadataApplier(Set<SchemaChangeEventType> enabledEventTypes) {
73+
public MetadataApplier getMetadataApplier() {
7674
StarRocksCatalog catalog =
7775
new StarRocksCatalog(
7876
sinkOptions.getJdbcUrl(),
7977
sinkOptions.getUsername(),
8078
sinkOptions.getPassword());
81-
return new StarRocksMetadataApplier(
82-
catalog, tableCreateConfig, schemaChangeConfig, enabledEventTypes);
79+
return new StarRocksMetadataApplier(catalog, tableCreateConfig, schemaChangeConfig);
8380
}
8481
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java

+6-11
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public class StarRocksMetadataApplier implements MetadataApplier {
5454
private final TableCreateConfig tableCreateConfig;
5555
private final SchemaChangeConfig schemaChangeConfig;
5656
private boolean isOpened;
57-
private final Set<SchemaChangeEventType> enabledSchemaEvolutionTypes;
57+
private Set<SchemaChangeEventType> enabledSchemaEvolutionTypes;
5858

5959
public StarRocksMetadataApplier(
6060
StarRocksCatalog catalog,
@@ -67,16 +67,11 @@ public StarRocksMetadataApplier(
6767
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
6868
}
6969

70-
public StarRocksMetadataApplier(
71-
StarRocksCatalog catalog,
72-
TableCreateConfig tableCreateConfig,
73-
SchemaChangeConfig schemaChangeConfig,
74-
Set<SchemaChangeEventType> enabledSchemaEvolutionTypes) {
75-
this.catalog = catalog;
76-
this.tableCreateConfig = tableCreateConfig;
77-
this.schemaChangeConfig = schemaChangeConfig;
78-
this.isOpened = false;
79-
this.enabledSchemaEvolutionTypes = enabledSchemaEvolutionTypes;
70+
@Override
71+
public MetadataApplier setAcceptedSchemaEvolutionTypes(
72+
Set<SchemaChangeEventType> schemaEvolutionTypes) {
73+
this.enabledSchemaEvolutionTypes = schemaEvolutionTypes;
74+
return this;
8075
}
8176

8277
@Override

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -375,9 +375,11 @@ private void runJobWithEvents(List<Event> events) throws Exception {
375375
schemaOperatorTranslator.translate(
376376
stream,
377377
DEFAULT_PARALLELISM,
378-
starRocksSink.getMetadataApplier(
379-
Arrays.stream(SchemaChangeEventTypeFamily.ALL)
380-
.collect(Collectors.toSet())),
378+
starRocksSink
379+
.getMetadataApplier()
380+
.setAcceptedSchemaEvolutionTypes(
381+
Arrays.stream(SchemaChangeEventTypeFamily.ALL)
382+
.collect(Collectors.toSet())),
381383
new ArrayList<>());
382384

383385
DataSinkTranslator sinkTranslator = new DataSinkTranslator();

0 commit comments

Comments
 (0)