Skip to content

Commit ac5c947

Browse files
committed
Resolve conflicts
1 parent b2495c8 commit ac5c947

File tree

5 files changed

+6
-13
lines changed

5 files changed

+6
-13
lines changed

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java

-2
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@
1717

1818
package org.apache.flink.cdc.composer.flink.translator;
1919

20-
import org.apache.flink.api.java.tuple.Tuple2;
2120
import org.apache.flink.cdc.common.annotation.Internal;
2221
import org.apache.flink.cdc.common.event.Event;
2322
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
24-
import org.apache.flink.cdc.common.event.TableId;
2523
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
2624
import org.apache.flink.cdc.common.route.RouteRule;
2725
import org.apache.flink.cdc.common.sink.MetadataApplier;

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ public void processElement(StreamRecord<Event> streamRecord)
248248
}
249249

250250
private void processSchemaChangeEvents(SchemaChangeEvent event)
251-
throws InterruptedException, TimeoutException {
251+
throws InterruptedException, TimeoutException, ExecutionException {
252252
TableId tableId = event.tableId();
253253
LOG.info("Table {} received SchemaChangeEvent and start to be blocked.", tableId);
254254
handleSchemaChangeEvent(tableId, event);
@@ -258,7 +258,8 @@ private void processSchemaChangeEvents(SchemaChangeEvent event)
258258

259259
List<TableId> optionalRoutedTable = getRoutedTables(tableId);
260260
if (!optionalRoutedTable.isEmpty()) {
261-
getRoutedTables(tableId)
261+
tableIdMappingCache
262+
.get(tableId)
262263
.forEach(routed -> evolvedSchema.put(routed, getLatestEvolvedSchema(routed)));
263264
} else {
264265
evolvedSchema.put(tableId, getLatestEvolvedSchema(tableId));

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java

-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.flink.cdc.common.annotation.Internal;
2121
import org.apache.flink.cdc.common.event.Event;
22-
import org.apache.flink.cdc.common.event.TableId;
2322
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
2423
import org.apache.flink.cdc.common.route.RouteRule;
2524
import org.apache.flink.cdc.common.sink.MetadataApplier;

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.flink.cdc.common.event.TableId;
2121
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
22-
import org.apache.flink.cdc.common.schema.Selectors;
2322
import org.apache.flink.cdc.common.route.RouteRule;
2423
import org.apache.flink.cdc.common.sink.MetadataApplier;
2524
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
@@ -269,7 +268,7 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData
269268
metadataApplier,
270269
schemaManager,
271270
schemaDerivation,
272-
schemaManager.getBehavior());
271+
schemaChangeBehavior);
273272
break;
274273
}
275274
default:

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java

+2-6
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,14 @@
1717

1818
package org.apache.flink.cdc.runtime.operators.schema.coordinator;
1919

20-
import org.apache.flink.api.java.tuple.Tuple2;
2120
import org.apache.flink.cdc.common.annotation.Internal;
22-
import org.apache.flink.cdc.common.event.TableId;
2321
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
24-
import org.apache.flink.cdc.common.schema.Selectors;
2522
import org.apache.flink.cdc.common.route.RouteRule;
2623
import org.apache.flink.cdc.common.sink.MetadataApplier;
2724
import org.apache.flink.runtime.jobgraph.OperatorID;
2825
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
2926

3027
import java.util.List;
31-
import java.util.stream.Collectors;
3228

3329
/** Provider of {@link SchemaRegistry}. */
3430
@Internal
@@ -61,7 +57,7 @@ public OperatorID getOperatorId() {
6157

6258
@Override
6359
public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception {
64-
return new SchemaRegistry(operatorName, context, metadataApplier, routingRules);
65-
60+
return new SchemaRegistry(
61+
operatorName, context, metadataApplier, routingRules, schemaChangeBehavior);
6662
}
6763
}

0 commit comments

Comments
 (0)