From 032e560bdc59dcfe774df851d38af6492f4402a4 Mon Sep 17 00:00:00 2001 From: linjc13 Date: Tue, 14 Jan 2025 18:14:04 +0800 Subject: [PATCH] [FLINK-37110][cdc-runtime] Fixed the problem of using the same executor to process schema change requests and flush event requests, resulting in blocking timeout. --- .../operators/schema/common/SchemaRegistry.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java index 566dffa93e6..c649021db05 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java @@ -52,6 +52,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; import java.util.function.BooleanSupplier; @@ -96,6 +97,9 @@ public abstract class SchemaRegistry implements OperatorCoordinator, Coordinatio protected transient SchemaManager schemaManager; protected transient TableIdRouter router; + /** Executor service to execute handle event from operator. */ + private final ExecutorService runInEventFromOperatorExecutor; + protected SchemaRegistry( OperatorCoordinator.Context context, String operatorName, @@ -111,6 +115,7 @@ protected SchemaRegistry( this.routingRules = routingRules; this.rpcTimeout = rpcTimeout; this.behavior = schemaChangeBehavior; + this.runInEventFromOperatorExecutor = Executors.newSingleThreadExecutor(); } // --------------- @@ -130,6 +135,7 @@ public void start() throws Exception { public void close() throws Exception { LOG.info("Closing SchemaRegistry - {}.", operatorName); coordinatorExecutor.shutdown(); + runInEventFromOperatorExecutor.shutdown(); } // ------------------------------ @@ -235,6 +241,7 @@ public final CompletableFuture handleCoordinationRequest( CoordinationRequest request) { CompletableFuture future = new CompletableFuture<>(); runInEventLoop( + coordinatorExecutor, () -> { if (request instanceof GetEvolvedSchemaRequest) { handleGetEvolvedSchemaRequest((GetEvolvedSchemaRequest) request, future); @@ -253,6 +260,7 @@ public final CompletableFuture handleCoordinationRequest( public final void handleEventFromOperator( int subTaskId, int attemptNumber, OperatorEvent event) { runInEventLoop( + runInEventFromOperatorExecutor, () -> { if (event instanceof FlushSuccessEvent) { handleFlushSuccessEvent((FlushSuccessEvent) event); @@ -297,7 +305,11 @@ public final void executionAttemptReady( public final void checkpointCoordinator( long checkpointId, CompletableFuture completableFuture) throws Exception { LOG.info("Going to start checkpoint No.{}", checkpointId); - runInEventLoop(() -> snapshot(completableFuture), "Taking checkpoint - %d", checkpointId); + runInEventLoop( + coordinatorExecutor, + () -> snapshot(completableFuture), + "Taking checkpoint - %d", + checkpointId); } @Override @@ -325,6 +337,7 @@ public final void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpoi * directly, make sure you're running heavy logics inside, or the entire job might hang! */ protected void runInEventLoop( + final ExecutorService coordinatorExecutor, final ThrowingRunnable action, final String actionName, final Object... actionNameFormatParameters) {