Skip to content

Commit 8c6aa0b

Browse files
committed
[FLINK-36880][network] Handle the finished vertex in InputConsumableDecider
1 parent 1523f2c commit 8c6aa0b

File tree

6 files changed

+36
-9
lines changed

6 files changed

+36
-9
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.flink.runtime.scheduler.strategy;
2020

21+
import org.apache.flink.runtime.execution.ExecutionState;
22+
2123
import java.util.Map;
2224
import java.util.Set;
2325
import java.util.function.Function;
@@ -60,7 +62,8 @@ private Factory() {}
6062
@Override
6163
public InputConsumableDecider createInstance(
6264
SchedulingTopology schedulingTopology,
63-
Function<ExecutionVertexID, Boolean> scheduledVertexRetriever) {
65+
Function<ExecutionVertexID, Boolean> scheduledVertexRetriever,
66+
Function<ExecutionVertexID, ExecutionState> executionStateRetriever) {
6467
return new AllFinishedInputConsumableDecider();
6568
}
6669
}

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.runtime.scheduler.strategy;
2020

21+
import org.apache.flink.runtime.execution.ExecutionState;
2122
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
2223

2324
import java.util.Map;
@@ -40,12 +41,16 @@ public class DefaultInputConsumableDecider implements InputConsumableDecider {
4041

4142
private final Function<ExecutionVertexID, Boolean> scheduledVertexRetriever;
4243

44+
private final Function<ExecutionVertexID, ExecutionState> executionStateRetriever;
45+
4346
DefaultInputConsumableDecider(
4447
Function<ExecutionVertexID, Boolean> scheduledVertexRetriever,
4548
Function<IntermediateResultPartitionID, SchedulingResultPartition>
46-
resultPartitionRetriever) {
49+
resultPartitionRetriever,
50+
Function<ExecutionVertexID, ExecutionState> executionStateRetriever) {
4751
this.scheduledVertexRetriever = scheduledVertexRetriever;
4852
this.resultPartitionRetriever = resultPartitionRetriever;
53+
this.executionStateRetriever = executionStateRetriever;
4954
}
5055

5156
@Override
@@ -86,7 +91,12 @@ private boolean isConsumedPartitionGroupConsumable(
8691
ExecutionVertexID producerVertex =
8792
resultPartitionRetriever.apply(partitionId).getProducer().getId();
8893
if (!verticesToSchedule.contains(producerVertex)
89-
&& !scheduledVertexRetriever.apply(producerVertex)) {
94+
&& !scheduledVertexRetriever.apply(producerVertex)
95+
// For jm failover: the producer can be transitioned to FINISHED state not
96+
// touched by scheduling strategy. This means all producer
97+
// partitions finished, so we can schedule the downstream execution.
98+
&& executionStateRetriever.apply(producerVertex)
99+
!= ExecutionState.FINISHED) {
90100
return false;
91101
}
92102
}
@@ -112,9 +122,12 @@ private Factory() {}
112122
@Override
113123
public InputConsumableDecider createInstance(
114124
SchedulingTopology schedulingTopology,
115-
Function<ExecutionVertexID, Boolean> scheduledVertexRetriever) {
125+
Function<ExecutionVertexID, Boolean> scheduledVertexRetriever,
126+
Function<ExecutionVertexID, ExecutionState> executionStateRetriever) {
116127
return new DefaultInputConsumableDecider(
117-
scheduledVertexRetriever, schedulingTopology::getResultPartition);
128+
scheduledVertexRetriever,
129+
schedulingTopology::getResultPartition,
130+
executionStateRetriever);
118131
}
119132
}
120133
}

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.flink.runtime.scheduler.strategy;
2020

21+
import org.apache.flink.runtime.execution.ExecutionState;
22+
2123
import java.util.Map;
2224
import java.util.Set;
2325
import java.util.function.Function;
@@ -53,6 +55,7 @@ boolean isConsumableBasedOnFinishedProducers(
5355
interface Factory {
5456
InputConsumableDecider createInstance(
5557
SchedulingTopology schedulingTopology,
56-
Function<ExecutionVertexID, Boolean> scheduledVertexRetriever);
58+
Function<ExecutionVertexID, Boolean> scheduledVertexRetriever,
59+
Function<ExecutionVertexID, ExecutionState> executionStateRetriever);
5760
}
5861
}

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.flink.runtime.scheduler.strategy;
2020

21+
import org.apache.flink.runtime.execution.ExecutionState;
22+
2123
import java.util.Map;
2224
import java.util.Set;
2325
import java.util.function.Function;
@@ -75,7 +77,8 @@ private Factory() {}
7577
@Override
7678
public InputConsumableDecider createInstance(
7779
SchedulingTopology schedulingTopology,
78-
Function<ExecutionVertexID, Boolean> scheduledVertexRetriever) {
80+
Function<ExecutionVertexID, Boolean> scheduledVertexRetriever,
81+
Function<ExecutionVertexID, ExecutionState> executionStateRetriever) {
7982
return new PartialFinishedInputConsumableDecider();
8083
}
8184
}

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,10 @@ public VertexwiseSchedulingStrategy(
6868
this.schedulingTopology = checkNotNull(schedulingTopology);
6969
this.inputConsumableDecider =
7070
inputConsumableDeciderFactory.createInstance(
71-
schedulingTopology, scheduledVertices::contains);
71+
schedulingTopology,
72+
scheduledVertices::contains,
73+
(executionVertexId) ->
74+
schedulingTopology.getVertex(executionVertexId).getState());
7275
LOG.info(
7376
"Using InputConsumableDecider {} for VertexwiseSchedulingStrategy.",
7477
inputConsumableDecider.getClass().getName());

flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ void testHybridAndBlockingInputButBlockingInputNotFinished() {
206206
private DefaultInputConsumableDecider createDefaultInputConsumableDecider(
207207
Set<ExecutionVertexID> scheduledVertices, SchedulingTopology schedulingTopology) {
208208
return new DefaultInputConsumableDecider(
209-
scheduledVertices::contains, schedulingTopology::getResultPartition);
209+
scheduledVertices::contains,
210+
schedulingTopology::getResultPartition,
211+
(id) -> schedulingTopology.getVertex(id).getState());
210212
}
211213
}

0 commit comments

Comments
 (0)