Skip to content

Commit a3c9efe

Browse files
committed
[FLINK-35242][runtime] Support more schema evolution strategies
1 parent d386c7c commit a3c9efe

File tree

65 files changed

+4413
-366
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+4413
-366
lines changed

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java

+24-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.cli.parser;
1919

2020
import org.apache.flink.cdc.common.configuration.Configuration;
21+
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2122
import org.apache.flink.cdc.common.utils.StringUtils;
2223
import org.apache.flink.cdc.composer.definition.PipelineDef;
2324
import org.apache.flink.cdc.composer.definition.RouteDef;
@@ -28,14 +29,17 @@
2829
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
2930
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3031
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
32+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
3133
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
3234

3335
import java.nio.file.Path;
3436
import java.util.ArrayList;
3537
import java.util.List;
3638
import java.util.Map;
3739
import java.util.Optional;
40+
import java.util.Set;
3841

42+
import static org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions;
3943
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
4044

4145
/** Parser for converting YAML formatted pipeline definition to {@link PipelineDef}. */
@@ -51,6 +55,8 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
5155
// Source / sink keys
5256
private static final String TYPE_KEY = "type";
5357
private static final String NAME_KEY = "name";
58+
private static final String INCLUDE_SCHEMA_EVOLUTION_TYPES = "include.schema.changes";
59+
private static final String EXCLUDE_SCHEMA_EVOLUTION_TYPES = "exclude.schema.changes";
5460

5561
// Route keys
5662
private static final String ROUTE_SOURCE_TABLE_KEY = "source-table";
@@ -135,6 +141,23 @@ private SourceDef toSourceDef(JsonNode sourceNode) {
135141
}
136142

137143
private SinkDef toSinkDef(JsonNode sinkNode) {
144+
List<String> includedSETypes = new ArrayList<>();
145+
List<String> excludedSETypes = new ArrayList<>();
146+
147+
Optional.ofNullable(sinkNode.get(INCLUDE_SCHEMA_EVOLUTION_TYPES))
148+
.ifPresent(e -> e.forEach(tag -> includedSETypes.add(tag.asText())));
149+
150+
Optional.ofNullable(sinkNode.get(EXCLUDE_SCHEMA_EVOLUTION_TYPES))
151+
.ifPresent(e -> e.forEach(tag -> excludedSETypes.add(tag.asText())));
152+
153+
Set<SchemaChangeEventType> declaredSETypes =
154+
resolveSchemaEvolutionOptions(includedSETypes, excludedSETypes);
155+
156+
if (sinkNode instanceof ObjectNode) {
157+
((ObjectNode) sinkNode).remove(INCLUDE_SCHEMA_EVOLUTION_TYPES);
158+
((ObjectNode) sinkNode).remove(EXCLUDE_SCHEMA_EVOLUTION_TYPES);
159+
}
160+
138161
Map<String, String> sinkMap =
139162
mapper.convertValue(sinkNode, new TypeReference<Map<String, String>>() {});
140163

@@ -148,7 +171,7 @@ private SinkDef toSinkDef(JsonNode sinkNode) {
148171
// "name" field is optional
149172
String name = sinkMap.remove(NAME_KEY);
150173

151-
return new SinkDef(type, name, Configuration.fromMap(sinkMap));
174+
return new SinkDef(type, name, Configuration.fromMap(sinkMap), declaredSETypes);
152175
}
153176

154177
private RouteDef toRouteDef(JsonNode routeNode) {

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AddColumnEvent.java

+5
Original file line numberDiff line numberDiff line change
@@ -155,4 +155,9 @@ public String toString() {
155155
public TableId tableId() {
156156
return tableId;
157157
}
158+
159+
@Override
160+
public SchemaChangeEventType getType() {
161+
return SchemaChangeEventType.ADD_COLUMN;
162+
}
158163
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java

+5
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,9 @@ public String toString() {
7777
public TableId tableId() {
7878
return tableId;
7979
}
80+
81+
@Override
82+
public SchemaChangeEventType getType() {
83+
return SchemaChangeEventType.ALTER_COLUMN_TYPE;
84+
}
8085
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/CreateTableEvent.java

+5
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,9 @@ public String toString() {
7272
public TableId tableId() {
7373
return tableId;
7474
}
75+
76+
@Override
77+
public SchemaChangeEventType getType() {
78+
return SchemaChangeEventType.CREATE_TABLE;
79+
}
7580
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DropColumnEvent.java

+5
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,9 @@ public String toString() {
7676
public TableId tableId() {
7777
return tableId;
7878
}
79+
80+
@Override
81+
public SchemaChangeEventType getType() {
82+
return SchemaChangeEventType.DROP_COLUMN;
83+
}
7984
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/RenameColumnEvent.java

+5
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,9 @@ public String toString() {
7070
public TableId tableId() {
7171
return tableId;
7272
}
73+
74+
@Override
75+
public SchemaChangeEventType getType() {
76+
return SchemaChangeEventType.RENAME_COLUMN;
77+
}
7378
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEvent.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,7 @@
2626
* system, such as CREATE, DROP, RENAME and so on.
2727
*/
2828
@PublicEvolving
29-
public interface SchemaChangeEvent extends ChangeEvent, Serializable {}
29+
public interface SchemaChangeEvent extends ChangeEvent, Serializable {
30+
/** Returns its {@link SchemaChangeEventType}. */
31+
SchemaChangeEventType getType();
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.event;
19+
20+
/** An enumeration of schema change event types for {@link SchemaChangeEvent}. */
21+
public enum SchemaChangeEventType {
22+
ADD_COLUMN,
23+
ALTER_COLUMN_TYPE,
24+
CREATE_TABLE,
25+
DROP_COLUMN,
26+
RENAME_COLUMN;
27+
28+
public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) {
29+
if (event instanceof AddColumnEvent) {
30+
return ADD_COLUMN;
31+
} else if (event instanceof AlterColumnTypeEvent) {
32+
return ALTER_COLUMN_TYPE;
33+
} else if (event instanceof CreateTableEvent) {
34+
return CREATE_TABLE;
35+
} else if (event instanceof DropColumnEvent) {
36+
return DROP_COLUMN;
37+
} else if (event instanceof RenameColumnEvent) {
38+
return RENAME_COLUMN;
39+
} else {
40+
throw new RuntimeException("Unknown schema change event type: " + event.getClass());
41+
}
42+
}
43+
44+
public static SchemaChangeEventType ofTag(String tag) {
45+
switch (tag) {
46+
case "add.column":
47+
return ADD_COLUMN;
48+
case "alter.column.type":
49+
return ALTER_COLUMN_TYPE;
50+
case "create.table":
51+
return CREATE_TABLE;
52+
case "drop.column":
53+
return DROP_COLUMN;
54+
case "rename.column":
55+
return RENAME_COLUMN;
56+
default:
57+
return null;
58+
}
59+
}
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.event;
19+
20+
/**
21+
* An enumeration of schema change event families for clustering {@link SchemaChangeEvent}s into
22+
* categories.
23+
*/
24+
public class SchemaChangeEventTypeFamily {
25+
26+
public static final SchemaChangeEventType[] ADD = {SchemaChangeEventType.ADD_COLUMN};
27+
28+
public static final SchemaChangeEventType[] ALTER = {SchemaChangeEventType.ALTER_COLUMN_TYPE};
29+
30+
public static final SchemaChangeEventType[] CREATE = {SchemaChangeEventType.CREATE_TABLE};
31+
32+
public static final SchemaChangeEventType[] DROP = {SchemaChangeEventType.DROP_COLUMN};
33+
34+
public static final SchemaChangeEventType[] RENAME = {SchemaChangeEventType.RENAME_COLUMN};
35+
36+
public static final SchemaChangeEventType[] TABLE = {SchemaChangeEventType.CREATE_TABLE};
37+
38+
public static final SchemaChangeEventType[] COLUMN = {
39+
SchemaChangeEventType.ADD_COLUMN,
40+
SchemaChangeEventType.ALTER_COLUMN_TYPE,
41+
SchemaChangeEventType.DROP_COLUMN,
42+
SchemaChangeEventType.RENAME_COLUMN
43+
};
44+
45+
public static final SchemaChangeEventType[] ALL = {
46+
SchemaChangeEventType.ADD_COLUMN,
47+
SchemaChangeEventType.CREATE_TABLE,
48+
SchemaChangeEventType.ALTER_COLUMN_TYPE,
49+
SchemaChangeEventType.DROP_COLUMN,
50+
SchemaChangeEventType.RENAME_COLUMN
51+
};
52+
53+
public static final SchemaChangeEventType[] NONE = {};
54+
55+
public static SchemaChangeEventType[] ofTag(String tag) {
56+
switch (tag) {
57+
case "add":
58+
return ADD;
59+
case "alter":
60+
return ALTER;
61+
case "create":
62+
return CREATE;
63+
case "drop":
64+
return DROP;
65+
case "rename":
66+
return RENAME;
67+
case "table":
68+
return TABLE;
69+
case "column":
70+
return COLUMN;
71+
case "all":
72+
return ALL;
73+
default:
74+
return NONE;
75+
}
76+
}
77+
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/SchemaChangeBehavior.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
/** Behavior for handling schema changes. */
2323
@PublicEvolving
2424
public enum SchemaChangeBehavior {
25-
EVOLVE,
2625
IGNORE,
26+
TRY_EVOLVE,
27+
EVOLVE,
2728
EXCEPTION
2829
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
package org.apache.flink.cdc.common.sink;
1919

2020
import org.apache.flink.cdc.common.annotation.PublicEvolving;
21+
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
22+
23+
import java.util.Set;
2124

2225
/**
2326
* {@code DataSink} is used to write change data to external system and apply metadata changes to
@@ -30,5 +33,5 @@ public interface DataSink {
3033
EventSinkProvider getEventSinkProvider();
3134

3235
/** Get the {@link MetadataApplier} for applying metadata changes to external systems. */
33-
MetadataApplier getMetadataApplier();
36+
MetadataApplier getMetadataApplier(Set<SchemaChangeEventType> enabledEventTypes);
3437
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/MetadataApplier.java

+8
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,21 @@
1919

2020
import org.apache.flink.cdc.common.annotation.PublicEvolving;
2121
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
22+
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2223

2324
import java.io.Serializable;
25+
import java.util.Set;
2426

2527
/** {@code MetadataApplier} is used to apply metadata changes to external systems. */
2628
@PublicEvolving
2729
public interface MetadataApplier extends Serializable {
2830

31+
/** Checks if this metadata applier should handle this event type. */
32+
boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType);
33+
34+
/** Checks what kind of schema change events downstream can handle. */
35+
Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes();
36+
2937
/** Apply the given {@link SchemaChangeEvent} to external systems. */
3038
void applySchemaChange(SchemaChangeEvent schemaChangeEvent);
3139
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java

+42
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,24 @@
1717

1818
package org.apache.flink.cdc.common.utils;
1919

20+
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
2021
import org.apache.flink.cdc.common.event.AddColumnEvent;
2122
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
2223
import org.apache.flink.cdc.common.event.CreateTableEvent;
2324
import org.apache.flink.cdc.common.event.DataChangeEvent;
2425
import org.apache.flink.cdc.common.event.DropColumnEvent;
2526
import org.apache.flink.cdc.common.event.RenameColumnEvent;
2627
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
28+
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
29+
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
2730
import org.apache.flink.cdc.common.event.TableId;
2831

32+
import java.util.ArrayList;
33+
import java.util.Arrays;
34+
import java.util.HashSet;
35+
import java.util.List;
36+
import java.util.Set;
37+
2938
/** Utilities for handling {@link org.apache.flink.cdc.common.event.ChangeEvent}s. */
3039
public class ChangeEventUtils {
3140
public static DataChangeEvent recreateDataChangeEvent(
@@ -81,4 +90,37 @@ public static SchemaChangeEvent recreateSchemaChangeEvent(
8190
"Unsupported schema change event with type \"%s\"",
8291
schemaChangeEvent.getClass().getCanonicalName()));
8392
}
93+
94+
public static Set<SchemaChangeEventType> resolveSchemaEvolutionOptions(
95+
List<String> includedSchemaEvolutionTypes, List<String> excludedSchemaEvolutionTypes) {
96+
List<SchemaChangeEventType> resultTypes = new ArrayList<>();
97+
98+
if (includedSchemaEvolutionTypes.isEmpty()) {
99+
resultTypes.addAll(Arrays.asList(SchemaChangeEventTypeFamily.ALL));
100+
} else {
101+
for (String includeTag : includedSchemaEvolutionTypes) {
102+
resultTypes.addAll(resolveSchemaEvolutionTag(includeTag));
103+
}
104+
}
105+
106+
for (String excludeTag : excludedSchemaEvolutionTypes) {
107+
resultTypes.removeAll(resolveSchemaEvolutionTag(excludeTag));
108+
}
109+
110+
return new HashSet<>(resultTypes);
111+
}
112+
113+
@VisibleForTesting
114+
public static List<SchemaChangeEventType> resolveSchemaEvolutionTag(String tag) {
115+
List<SchemaChangeEventType> types =
116+
new ArrayList<>(Arrays.asList(SchemaChangeEventTypeFamily.ofTag(tag)));
117+
if (types.isEmpty()) {
118+
// It's a specified tag
119+
SchemaChangeEventType type = SchemaChangeEventType.ofTag(tag);
120+
if (type != null) {
121+
types.add(type);
122+
}
123+
}
124+
return types;
125+
}
84126
}

0 commit comments

Comments
 (0)