diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java index f8cbb2604881ae..1c4020afd32f18 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.scheduler.strategy; +import org.apache.flink.runtime.execution.ExecutionState; + import java.util.Map; import java.util.Set; import java.util.function.Function; @@ -60,7 +62,8 @@ private Factory() {} @Override public InputConsumableDecider createInstance( SchedulingTopology schedulingTopology, - Function scheduledVertexRetriever) { + Function scheduledVertexRetriever, + Function executionStateRetriever) { return new AllFinishedInputConsumableDecider(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java index ccd354b0d0d8a3..6e3e2fec4b36db 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.scheduler.strategy; +import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import java.util.Map; @@ -40,12 +41,16 @@ public class DefaultInputConsumableDecider implements InputConsumableDecider { private final Function scheduledVertexRetriever; + private final Function executionStateRetriever; + DefaultInputConsumableDecider( Function scheduledVertexRetriever, Function - resultPartitionRetriever) { + resultPartitionRetriever, + Function executionStateRetriever) { this.scheduledVertexRetriever = scheduledVertexRetriever; this.resultPartitionRetriever = resultPartitionRetriever; + this.executionStateRetriever = executionStateRetriever; } @Override @@ -86,7 +91,12 @@ private boolean isConsumedPartitionGroupConsumable( ExecutionVertexID producerVertex = resultPartitionRetriever.apply(partitionId).getProducer().getId(); if (!verticesToSchedule.contains(producerVertex) - && !scheduledVertexRetriever.apply(producerVertex)) { + && !scheduledVertexRetriever.apply(producerVertex) + // For jm failover: the producer can be transitioned to FINISHED state not + // touched by scheduling strategy. This means all producer + // partitions finished, so we can schedule the downstream execution. + && executionStateRetriever.apply(producerVertex) + != ExecutionState.FINISHED) { return false; } } @@ -112,9 +122,12 @@ private Factory() {} @Override public InputConsumableDecider createInstance( SchedulingTopology schedulingTopology, - Function scheduledVertexRetriever) { + Function scheduledVertexRetriever, + Function executionStateRetriever) { return new DefaultInputConsumableDecider( - scheduledVertexRetriever, schedulingTopology::getResultPartition); + scheduledVertexRetriever, + schedulingTopology::getResultPartition, + executionStateRetriever); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java index 1d19dd2cf62e65..58bc635a6cbf3f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.scheduler.strategy; +import org.apache.flink.runtime.execution.ExecutionState; + import java.util.Map; import java.util.Set; import java.util.function.Function; @@ -53,6 +55,7 @@ boolean isConsumableBasedOnFinishedProducers( interface Factory { InputConsumableDecider createInstance( SchedulingTopology schedulingTopology, - Function scheduledVertexRetriever); + Function scheduledVertexRetriever, + Function executionStateRetriever); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java index df7c353e94e3bf..4043608ffc5b36 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.scheduler.strategy; +import org.apache.flink.runtime.execution.ExecutionState; + import java.util.Map; import java.util.Set; import java.util.function.Function; @@ -75,7 +77,8 @@ private Factory() {} @Override public InputConsumableDecider createInstance( SchedulingTopology schedulingTopology, - Function scheduledVertexRetriever) { + Function scheduledVertexRetriever, + Function executionStateRetriever) { return new PartialFinishedInputConsumableDecider(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java index 7be6922b9359d0..9ddef15e560af8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java @@ -68,7 +68,10 @@ public VertexwiseSchedulingStrategy( this.schedulingTopology = checkNotNull(schedulingTopology); this.inputConsumableDecider = inputConsumableDeciderFactory.createInstance( - schedulingTopology, scheduledVertices::contains); + schedulingTopology, + scheduledVertices::contains, + (executionVertexId) -> + schedulingTopology.getVertex(executionVertexId).getState()); LOG.info( "Using InputConsumableDecider {} for VertexwiseSchedulingStrategy.", inputConsumableDecider.getClass().getName()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java index af61b1e367646a..ac6bd65d33c00a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java @@ -206,6 +206,8 @@ void testHybridAndBlockingInputButBlockingInputNotFinished() { private DefaultInputConsumableDecider createDefaultInputConsumableDecider( Set scheduledVertices, SchedulingTopology schedulingTopology) { return new DefaultInputConsumableDecider( - scheduledVertices::contains, schedulingTopology::getResultPartition); + scheduledVertices::contains, + schedulingTopology::getResultPartition, + (id) -> schedulingTopology.getVertex(id).getState()); } }