Skip to content

Commit fc17f6f

Browse files
zentoltillrohrmann
authored andcommitted
[FLINK-XXXX] Streamline allocation of logical slots
This closes #9.
1 parent 705b54a commit fc17f6f

File tree

2 files changed

+132
-17
lines changed

2 files changed

+132
-17
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,8 @@
8282
import org.apache.flink.runtime.query.KvStateLocation;
8383
import org.apache.flink.runtime.query.UnknownKvStateLocation;
8484
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
85-
import org.apache.flink.runtime.scheduler.ExecutionSlotSharingGroup;
8685
import org.apache.flink.runtime.scheduler.SchedulerNG;
8786
import org.apache.flink.runtime.scheduler.SchedulerUtils;
88-
import org.apache.flink.runtime.scheduler.SharedSlot;
8987
import org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener;
9088
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
9189
import org.apache.flink.runtime.shuffle.ShuffleMaster;
@@ -111,7 +109,6 @@
111109
import java.util.Optional;
112110
import java.util.Set;
113111
import java.util.concurrent.CompletableFuture;
114-
import java.util.concurrent.ExecutionException;
115112
import java.util.concurrent.Executor;
116113
import java.util.concurrent.ScheduledExecutorService;
117114
import java.util.concurrent.TimeUnit;
@@ -501,16 +498,10 @@ private ParallelismAndResourceAssignments determineParallelismAndAssignResources
501498
final Set<ExecutionVertexID> containedExecutionVertices =
502499
sharedSlotToVertexAssignment.get(i);
503500

504-
final SharedSlot sharedSlot =
505-
reserveSharedSlot(slotInfo, containedExecutionVertices);
501+
final SharedSlot sharedSlot = reserveSharedSlot(slotInfo);
506502

507503
for (ExecutionVertexID executionVertexId : containedExecutionVertices) {
508-
final LogicalSlot logicalSlot;
509-
try {
510-
logicalSlot = sharedSlot.allocateLogicalSlot(executionVertexId).get();
511-
} catch (InterruptedException | ExecutionException e) {
512-
throw new RuntimeException("Should not have failed.");
513-
}
504+
final LogicalSlot logicalSlot = sharedSlot.allocateLogicalSlot();
514505
assignedSlots.put(executionVertexId, logicalSlot);
515506
}
516507
}
@@ -539,8 +530,7 @@ private Map<Integer, Set<ExecutionVertexID>> determineParallelismAndAssignToFutu
539530
return sharedSlotToVertexAssignment;
540531
}
541532

542-
private SharedSlot reserveSharedSlot(
543-
SlotInfo slotInfo, Set<ExecutionVertexID> containedExecutionVertices) {
533+
private SharedSlot reserveSharedSlot(SlotInfo slotInfo) {
544534
final PhysicalSlot physicalSlot =
545535
declarativeSlotPool.reserveFreeSlot(
546536
slotInfo.getAllocationId(),
@@ -549,11 +539,9 @@ private SharedSlot reserveSharedSlot(
549539
final SharedSlot sharedSlot =
550540
new SharedSlot(
551541
new SlotRequestId(),
552-
slotInfo.getResourceProfile(),
553-
ExecutionSlotSharingGroup.forVertexIds(containedExecutionVertices),
554-
CompletableFuture.completedFuture(physicalSlot),
542+
physicalSlot,
555543
slotInfo.willBeOccupiedIndefinitely(),
556-
ignored ->
544+
() ->
557545
declarativeSlotPool.freeReservedSlot(
558546
slotInfo.getAllocationId(),
559547
null,
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.scheduler.declarative;
20+
21+
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
22+
import org.apache.flink.runtime.jobmaster.LogicalSlot;
23+
import org.apache.flink.runtime.jobmaster.SlotOwner;
24+
import org.apache.flink.runtime.jobmaster.SlotRequestId;
25+
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
26+
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
27+
import org.apache.flink.util.Preconditions;
28+
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.util.ArrayList;
33+
import java.util.HashMap;
34+
import java.util.List;
35+
import java.util.Map;
36+
37+
/** Shared slot implementation for the {@link DeclarativeScheduler}. */
38+
public class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
39+
private static final Logger LOG = LoggerFactory.getLogger(SharedSlot.class);
40+
41+
private final SlotRequestId physicalSlotRequestId;
42+
43+
private final PhysicalSlot physicalSlot;
44+
45+
private final Runnable externalReleaseCallback;
46+
47+
private final Map<SlotRequestId, LogicalSlot> allocatedLogicalSlots;
48+
49+
private final boolean slotWillBeOccupiedIndefinitely;
50+
51+
private State state;
52+
53+
public SharedSlot(
54+
SlotRequestId physicalSlotRequestId,
55+
PhysicalSlot physicalSlot,
56+
boolean slotWillBeOccupiedIndefinitely,
57+
Runnable externalReleaseCallback) {
58+
this.physicalSlotRequestId = physicalSlotRequestId;
59+
this.physicalSlot = physicalSlot;
60+
this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
61+
this.externalReleaseCallback = externalReleaseCallback;
62+
this.state = State.ALLOCATED;
63+
this.allocatedLogicalSlots = new HashMap<>();
64+
physicalSlot.tryAssignPayload(this);
65+
}
66+
67+
/**
68+
* Registers an allocation request for a logical slot.
69+
*
70+
* <p>The logical slot request is complete once the underlying physical slot request is
71+
* complete.
72+
*
73+
* @return the logical slot
74+
*/
75+
public LogicalSlot allocateLogicalSlot() {
76+
LOG.debug("Allocating logical slot from shared slot ({})", physicalSlotRequestId);
77+
return new SingleLogicalSlot(
78+
new SlotRequestId(),
79+
physicalSlot,
80+
null,
81+
Locality.UNKNOWN,
82+
this,
83+
slotWillBeOccupiedIndefinitely);
84+
}
85+
86+
@Override
87+
public void returnLogicalSlot(LogicalSlot logicalSlot) {
88+
LOG.debug("Returning logical slot to shared slot ({})", physicalSlotRequestId);
89+
Preconditions.checkState(
90+
allocatedLogicalSlots.remove(logicalSlot.getSlotRequestId()) != null,
91+
"Trying to remove a logical slot request which has been either already removed or never created.");
92+
tryReleaseExternally();
93+
}
94+
95+
@Override
96+
public void release(Throwable cause) {
97+
LOG.debug("Release shared slot ({})", physicalSlotRequestId);
98+
99+
// copy the logical slot collection to avoid ConcurrentModificationException
100+
// if logical slot releases cause cancellation of other executions
101+
// which will try to call returnLogicalSlot and modify requestedLogicalSlots collection
102+
final List<LogicalSlot> collect = new ArrayList<>(allocatedLogicalSlots.values());
103+
for (LogicalSlot allocatedLogicalSlot : collect) {
104+
allocatedLogicalSlot.releaseSlot(cause);
105+
}
106+
allocatedLogicalSlots.clear();
107+
tryReleaseExternally();
108+
}
109+
110+
private void tryReleaseExternally() {
111+
if (state != State.RELEASED && allocatedLogicalSlots.isEmpty()) {
112+
state = State.RELEASED;
113+
LOG.debug("Release shared slot externally ({})", physicalSlotRequestId);
114+
externalReleaseCallback.run();
115+
}
116+
}
117+
118+
@Override
119+
public boolean willOccupySlotIndefinitely() {
120+
return slotWillBeOccupiedIndefinitely;
121+
}
122+
123+
private enum State {
124+
ALLOCATED,
125+
RELEASED
126+
}
127+
}

0 commit comments

Comments
 (0)