Skip to content

Commit

Permalink
[FLINK-XXXX] Streamline allocation of logical slots
Browse files Browse the repository at this point in the history
This closes #9.
  • Loading branch information
zentol authored and tillrohrmann committed Jan 15, 2021
1 parent 04de3fa commit 9d99ad6
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,8 @@
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
import org.apache.flink.runtime.scheduler.ExecutionSlotSharingGroup;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerUtils;
import org.apache.flink.runtime.scheduler.SharedSlot;
import org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
Expand All @@ -111,7 +109,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -501,16 +498,10 @@ private ParallelismAndResourceAssignments determineParallelismAndAssignResources
final Set<ExecutionVertexID> containedExecutionVertices =
sharedSlotToVertexAssignment.get(i);

final SharedSlot sharedSlot =
reserveSharedSlot(slotInfo, containedExecutionVertices);
final SharedSlot sharedSlot = reserveSharedSlot(slotInfo);

for (ExecutionVertexID executionVertexId : containedExecutionVertices) {
final LogicalSlot logicalSlot;
try {
logicalSlot = sharedSlot.allocateLogicalSlot(executionVertexId).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Should not have failed.");
}
final LogicalSlot logicalSlot = sharedSlot.allocateLogicalSlot();
assignedSlots.put(executionVertexId, logicalSlot);
}
}
Expand Down Expand Up @@ -539,8 +530,7 @@ private Map<Integer, Set<ExecutionVertexID>> determineParallelismAndAssignToFutu
return sharedSlotToVertexAssignment;
}

private SharedSlot reserveSharedSlot(
SlotInfo slotInfo, Set<ExecutionVertexID> containedExecutionVertices) {
private SharedSlot reserveSharedSlot(SlotInfo slotInfo) {
final PhysicalSlot physicalSlot =
declarativeSlotPool.reserveFreeSlot(
slotInfo.getAllocationId(),
Expand All @@ -549,11 +539,9 @@ private SharedSlot reserveSharedSlot(
final SharedSlot sharedSlot =
new SharedSlot(
new SlotRequestId(),
slotInfo.getResourceProfile(),
ExecutionSlotSharingGroup.forVertexIds(containedExecutionVertices),
CompletableFuture.completedFuture(physicalSlot),
physicalSlot,
slotInfo.willBeOccupiedIndefinitely(),
ignored ->
() ->
declarativeSlotPool.freeReservedSlot(
slotInfo.getAllocationId(),
null,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler.declarative;

import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** Shared slot implementation for the {@link DeclarativeScheduler}. */
public class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
private static final Logger LOG = LoggerFactory.getLogger(SharedSlot.class);

private final SlotRequestId physicalSlotRequestId;

private final PhysicalSlot physicalSlot;

private final Runnable externalReleaseCallback;

private final Map<SlotRequestId, LogicalSlot> allocatedLogicalSlots;

private final boolean slotWillBeOccupiedIndefinitely;

private State state;

public SharedSlot(
SlotRequestId physicalSlotRequestId,
PhysicalSlot physicalSlot,
boolean slotWillBeOccupiedIndefinitely,
Runnable externalReleaseCallback) {
this.physicalSlotRequestId = physicalSlotRequestId;
this.physicalSlot = physicalSlot;
this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
this.externalReleaseCallback = externalReleaseCallback;
this.state = State.ALLOCATED;
this.allocatedLogicalSlots = new HashMap<>();
physicalSlot.tryAssignPayload(this);
}

/**
* Registers an allocation request for a logical slot.
*
* <p>The logical slot request is complete once the underlying physical slot request is
* complete.
*
* @return the logical slot
*/
public LogicalSlot allocateLogicalSlot() {
LOG.debug("Allocating logical slot from shared slot ({})", physicalSlotRequestId);
return new SingleLogicalSlot(
new SlotRequestId(),
physicalSlot,
null,
Locality.UNKNOWN,
this,
slotWillBeOccupiedIndefinitely);
}

@Override
public void returnLogicalSlot(LogicalSlot logicalSlot) {
LOG.debug("Returning logical slot to shared slot ({})", physicalSlotRequestId);
Preconditions.checkState(
allocatedLogicalSlots.remove(logicalSlot.getSlotRequestId()) != null,
"Trying to remove a logical slot request which has been either already removed or never created.");
tryReleaseExternally();
}

@Override
public void release(Throwable cause) {
LOG.debug("Release shared slot ({})", physicalSlotRequestId);

// copy the logical slot collection to avoid ConcurrentModificationException
// if logical slot releases cause cancellation of other executions
// which will try to call returnLogicalSlot and modify requestedLogicalSlots collection
final List<LogicalSlot> collect = new ArrayList<>(allocatedLogicalSlots.values());
for (LogicalSlot allocatedLogicalSlot : collect) {
allocatedLogicalSlot.releaseSlot(cause);
}
allocatedLogicalSlots.clear();
tryReleaseExternally();
}

private void tryReleaseExternally() {
if (state != State.RELEASED && allocatedLogicalSlots.isEmpty()) {
state = State.RELEASED;
LOG.debug("Release shared slot externally ({})", physicalSlotRequestId);
externalReleaseCallback.run();
}
}

@Override
public boolean willOccupySlotIndefinitely() {
return slotWillBeOccupiedIndefinitely;
}

private enum State {
ALLOCATED,
RELEASED
}
}

0 comments on commit 9d99ad6

Please sign in to comment.