Skip to content

Commit 0eaf9ef

Browse files
author
linjc13
committed
[FLINK-37110][cdc-runtime] Fixed the problem of using the same executor to process schema change requests and flush event requests, resulting in blocking timeout.
1 parent a130718 commit 0eaf9ef

File tree

1 file changed

+27
-1
lines changed

1 file changed

+27
-1
lines changed

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java

+27-1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import java.util.concurrent.CompletableFuture;
6262
import java.util.concurrent.ConcurrentHashMap;
6363
import java.util.concurrent.ExecutorService;
64+
import java.util.concurrent.Executors;
6465
import java.util.concurrent.TimeoutException;
6566
import java.util.concurrent.atomic.AtomicReference;
6667
import java.util.stream.Collectors;
@@ -98,6 +99,9 @@ public class SchemaCoordinator extends SchemaRegistry {
9899
private transient Multimap<Tuple2<Integer, SchemaChangeEvent>, Integer>
99100
alreadyHandledSchemaChangeEvents;
100101

102+
/** Executor service to execute schema change. */
103+
private final ExecutorService schemaChangeThreadPool;
104+
101105
public SchemaCoordinator(
102106
String operatorName,
103107
OperatorCoordinator.Context context,
@@ -114,6 +118,7 @@ public SchemaCoordinator(
114118
routingRules,
115119
schemaChangeBehavior,
116120
rpcTimeout);
121+
this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
117122
}
118123

119124
// -----------------
@@ -131,6 +136,14 @@ public void start() throws Exception {
131136
"Started SchemaRegistry for {}. Parallelism: {}", operatorName, currentParallelism);
132137
}
133138

139+
@Override
140+
public void close() throws Exception {
141+
super.close();
142+
if (schemaChangeThreadPool != null && !schemaChangeThreadPool.isShutdown()) {
143+
schemaChangeThreadPool.shutdownNow();
144+
}
145+
}
146+
134147
// --------------------------
135148
// Checkpoint related methods
136149
// --------------------------
@@ -268,7 +281,20 @@ private void handleSchemaEvolveRequest(
268281
LOG.info(
269282
"Received the last required schema change request {}. Switching from WAITING_FOR_FLUSH to EVOLVING.",
270283
request);
271-
startSchemaChange();
284+
285+
schemaChangeThreadPool.submit(
286+
() -> {
287+
try {
288+
startSchemaChange();
289+
} catch (Throwable t) {
290+
failJob(
291+
"Schema change applying task",
292+
new FlinkRuntimeException(
293+
"Failed to apply schema change event.", t));
294+
throw new FlinkRuntimeException(
295+
"Failed to apply schema change event.", t);
296+
}
297+
});
272298
}
273299
}
274300

0 commit comments

Comments
 (0)