5252import java .util .concurrent .CompletableFuture ;
5353import java .util .concurrent .ConcurrentHashMap ;
5454import java .util .concurrent .ExecutorService ;
55+ import java .util .concurrent .Executors ;
5556import java .util .concurrent .TimeoutException ;
5657import java .util .function .BooleanSupplier ;
5758
@@ -96,6 +97,9 @@ public abstract class SchemaRegistry implements OperatorCoordinator, Coordinatio
9697 protected transient SchemaManager schemaManager ;
9798 protected transient TableIdRouter router ;
9899
100+ /** Executor service to execute handle event from operator. */
101+ private final ExecutorService runInEventFromOperatorExecutor ;
102+
99103 protected SchemaRegistry (
100104 OperatorCoordinator .Context context ,
101105 String operatorName ,
@@ -111,6 +115,7 @@ protected SchemaRegistry(
111115 this .routingRules = routingRules ;
112116 this .rpcTimeout = rpcTimeout ;
113117 this .behavior = schemaChangeBehavior ;
118+ this .runInEventFromOperatorExecutor = Executors .newSingleThreadExecutor ();
114119 }
115120
116121 // ---------------
@@ -130,6 +135,7 @@ public void start() throws Exception {
130135 public void close () throws Exception {
131136 LOG .info ("Closing SchemaRegistry - {}." , operatorName );
132137 coordinatorExecutor .shutdown ();
138+ runInEventFromOperatorExecutor .shutdown ();
133139 }
134140
135141 // ------------------------------
@@ -235,6 +241,7 @@ public final CompletableFuture<CoordinationResponse> handleCoordinationRequest(
235241 CoordinationRequest request ) {
236242 CompletableFuture <CoordinationResponse > future = new CompletableFuture <>();
237243 runInEventLoop (
244+ coordinatorExecutor ,
238245 () -> {
239246 if (request instanceof GetEvolvedSchemaRequest ) {
240247 handleGetEvolvedSchemaRequest ((GetEvolvedSchemaRequest ) request , future );
@@ -253,6 +260,7 @@ public final CompletableFuture<CoordinationResponse> handleCoordinationRequest(
253260 public final void handleEventFromOperator (
254261 int subTaskId , int attemptNumber , OperatorEvent event ) {
255262 runInEventLoop (
263+ runInEventFromOperatorExecutor ,
256264 () -> {
257265 if (event instanceof FlushSuccessEvent ) {
258266 handleFlushSuccessEvent ((FlushSuccessEvent ) event );
@@ -297,7 +305,11 @@ public final void executionAttemptReady(
297305 public final void checkpointCoordinator (
298306 long checkpointId , CompletableFuture <byte []> completableFuture ) throws Exception {
299307 LOG .info ("Going to start checkpoint No.{}" , checkpointId );
300- runInEventLoop (() -> snapshot (completableFuture ), "Taking checkpoint - %d" , checkpointId );
308+ runInEventLoop (
309+ coordinatorExecutor ,
310+ () -> snapshot (completableFuture ),
311+ "Taking checkpoint - %d" ,
312+ checkpointId );
301313 }
302314
303315 @ Override
@@ -325,6 +337,7 @@ public final void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpoi
325337 * directly, make sure you're running heavy logics inside, or the entire job might hang!
326338 */
327339 protected void runInEventLoop (
340+ final ExecutorService coordinatorExecutor ,
328341 final ThrowingRunnable <Throwable > action ,
329342 final String actionName ,
330343 final Object ... actionNameFormatParameters ) {
0 commit comments