Skip to content

Commit 7735f35

Browse files
committed
[FLINK-36880][network] Hybrid shuffle supports job master failover if only external tier used.
1 parent 306f2d9 commit 7735f35

File tree

18 files changed

+837
-37
lines changed

18 files changed

+837
-37
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ResultPartitionBytes.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
package org.apache.flink.runtime.executiongraph;
2020

2121
import java.io.Serializable;
22+
import java.util.List;
2223

24+
import static org.apache.flink.util.Preconditions.checkArgument;
2325
import static org.apache.flink.util.Preconditions.checkNotNull;
2426

2527
/** This class represents a snapshot of the result partition bytes metrics. */
@@ -34,4 +36,23 @@ public ResultPartitionBytes(long[] subpartitionBytes) {
3436
public long[] getSubpartitionBytes() {
3537
return subpartitionBytes;
3638
}
39+
40+
/** Merge all {@link ResultPartitionBytes} by sum up them per-subpartition. */
41+
public static ResultPartitionBytes mergeAll(List<ResultPartitionBytes> partitions) {
42+
checkArgument(!partitions.isEmpty());
43+
int expectedLength = partitions.get(0).getSubpartitionBytes().length;
44+
for (ResultPartitionBytes resultPartitionByte : partitions) {
45+
if (resultPartitionByte.getSubpartitionBytes().length != expectedLength) {
46+
throw new IllegalArgumentException(
47+
"only all ResultPartitionBytes with the same length can be merged");
48+
}
49+
}
50+
long[] mergedSubpartitionBytes = new long[expectedLength];
51+
for (int i = 0; i < expectedLength; i++) {
52+
for (ResultPartitionBytes resultPartitionByte : partitions) {
53+
mergedSubpartitionBytes[i] += resultPartitionByte.getSubpartitionBytes()[i];
54+
}
55+
}
56+
return new ResultPartitionBytes(mergedSubpartitionBytes);
57+
}
3758
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
20+
21+
import org.apache.flink.api.java.tuple.Tuple2;
22+
23+
import java.io.Serializable;
24+
import java.util.Collection;
25+
import java.util.Collections;
26+
27+
/**
28+
* This is a collection of all {@link TieredShuffleMasterSnapshot}s from every tier in one snapshot
29+
* round.
30+
*/
31+
public class AllTieredShuffleMasterSnapshots implements Serializable {
32+
/**
33+
* Snapshots of all tires. For each tier, it is a tuple of
34+
* (identifier,TieredShuffleMasterSnapshot)
35+
*/
36+
private final Collection<Tuple2<String, TieredShuffleMasterSnapshot>> snapshots;
37+
38+
public AllTieredShuffleMasterSnapshots(
39+
Collection<Tuple2<String, TieredShuffleMasterSnapshot>> snapshots) {
40+
this.snapshots = snapshots;
41+
}
42+
43+
public Collection<Tuple2<String, TieredShuffleMasterSnapshot>> getSnapshots() {
44+
return snapshots;
45+
}
46+
47+
public static AllTieredShuffleMasterSnapshots empty() {
48+
return new AllTieredShuffleMasterSnapshots(Collections.emptyList());
49+
}
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
20+
21+
/**
22+
* A singleton implementation of {@link TieredShuffleMasterSnapshot} that represents an empty
23+
* snapshot of tiered shuffle master.
24+
*/
25+
public class EmptyTieredShuffleMasterSnapshot implements TieredShuffleMasterSnapshot {
26+
private static final EmptyTieredShuffleMasterSnapshot INSTANCE =
27+
new EmptyTieredShuffleMasterSnapshot();
28+
29+
public static EmptyTieredShuffleMasterSnapshot getInstance() {
30+
return INSTANCE;
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
20+
21+
import org.apache.flink.api.common.JobID;
22+
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
23+
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
24+
25+
import java.util.Optional;
26+
27+
/** The retriever for shuffle descriptor. */
28+
public interface ShuffleDescriptorRetriever {
29+
/**
30+
* Get shuffle descriptor by JobID and ResultPartitionId.
31+
*
32+
* @return shuffle descriptor or empty if not exist.
33+
*/
34+
Optional<ShuffleDescriptor> getShuffleDescriptor(
35+
JobID jobID, ResultPartitionID resultPartitionID);
36+
}

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
2020

2121
import org.apache.flink.api.common.JobID;
22+
import org.apache.flink.api.java.tuple.Tuple2;
2223
import org.apache.flink.configuration.Configuration;
24+
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
2325
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
2426
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration;
2527
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
@@ -30,11 +32,16 @@
3032
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
3133
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleHandler;
3234
import org.apache.flink.runtime.shuffle.JobShuffleContext;
35+
import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
3336
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
3437
import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
38+
import org.apache.flink.runtime.shuffle.ShuffleMasterSnapshotContext;
3539

40+
import java.time.Duration;
3641
import java.util.Collection;
42+
import java.util.Collections;
3743
import java.util.List;
44+
import java.util.Set;
3845
import java.util.concurrent.CompletableFuture;
3946
import java.util.stream.Collectors;
4047

@@ -48,17 +55,75 @@ public class TieredInternalShuffleMaster {
4855

4956
private final ShuffleMasterContext shuffleMasterContext;
5057

51-
public TieredInternalShuffleMaster(ShuffleMasterContext shuffleMasterContext) {
58+
private final boolean useOnlyExternalTier;
59+
60+
public TieredInternalShuffleMaster(
61+
ShuffleMasterContext shuffleMasterContext,
62+
ShuffleDescriptorRetriever shuffleDescriptorRetriever) {
5263
this.shuffleMasterContext = shuffleMasterContext;
5364
Configuration conf = shuffleMasterContext.getConfiguration();
65+
String externalTierFactoryClass =
66+
conf.get(
67+
NettyShuffleEnvironmentOptions
68+
.NETWORK_HYBRID_SHUFFLE_EXTERNAL_REMOTE_TIER_FACTORY_CLASS_NAME);
69+
this.useOnlyExternalTier = externalTierFactoryClass != null;
5470
TieredStorageConfiguration tieredStorageConfiguration =
5571
TieredStorageConfiguration.fromConfiguration(conf);
5672
TieredStorageResourceRegistry resourceRegistry = new TieredStorageResourceRegistry();
57-
List<TierMasterAgent> tierFactories =
73+
List<Tuple2<String, TierMasterAgent>> tierFactories =
5874
tieredStorageConfiguration.getTierFactories().stream()
59-
.map(tierFactory -> tierFactory.createMasterAgent(resourceRegistry))
75+
.map(
76+
tierFactory ->
77+
Tuple2.of(
78+
tierFactory.identifier(),
79+
tierFactory.createMasterAgent(resourceRegistry)))
6080
.collect(Collectors.toList());
61-
this.tieredStorageMasterClient = new TieredStorageMasterClient(tierFactories);
81+
this.tieredStorageMasterClient =
82+
new TieredStorageMasterClient(tierFactories, shuffleDescriptorRetriever);
83+
}
84+
85+
public boolean supportsBatchSnapshot() {
86+
return useOnlyExternalTier;
87+
}
88+
89+
public void snapshotState(
90+
CompletableFuture<AllTieredShuffleMasterSnapshots> snapshotFuture,
91+
ShuffleMasterSnapshotContext context,
92+
JobID jobId) {
93+
// only external tier supports snapshot for now.
94+
if (useOnlyExternalTier) {
95+
tieredStorageMasterClient.snapshotState(snapshotFuture, context, jobId);
96+
}
97+
}
98+
99+
public void snapshotState(CompletableFuture<AllTieredShuffleMasterSnapshots> snapshotFuture) {
100+
if (useOnlyExternalTier) {
101+
tieredStorageMasterClient.snapshotState(snapshotFuture);
102+
}
103+
}
104+
105+
public void restoreState(List<TieredInternalShuffleMasterSnapshot> snapshots, JobID jobId) {
106+
if (useOnlyExternalTier) {
107+
tieredStorageMasterClient.restoreState(snapshots, jobId);
108+
}
109+
}
110+
111+
public void restoreState(TieredInternalShuffleMasterSnapshot clusterSnapshot) {
112+
if (useOnlyExternalTier) {
113+
tieredStorageMasterClient.restoreState(clusterSnapshot);
114+
}
115+
}
116+
117+
public CompletableFuture<Collection<PartitionWithMetrics>> getPartitionWithMetrics(
118+
JobShuffleContext jobShuffleContext,
119+
Duration timeout,
120+
Set<ResultPartitionID> expectedPartitions) {
121+
if (useOnlyExternalTier) {
122+
return tieredStorageMasterClient.getPartitionWithMetrics(
123+
jobShuffleContext, timeout, expectedPartitions);
124+
} else {
125+
return CompletableFuture.completedFuture(Collections.emptyList());
126+
}
62127
}
63128

64129
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
20+
21+
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
22+
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
23+
import org.apache.flink.runtime.shuffle.ShuffleMasterSnapshot;
24+
25+
import java.util.Map;
26+
27+
/**
28+
* The internal {@link ShuffleMasterSnapshot} for hybrid shuffle. This bump shuffle descriptors and
29+
* all tiers snapshot.
30+
*/
31+
public class TieredInternalShuffleMasterSnapshot implements ShuffleMasterSnapshot {
32+
private final Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors;
33+
34+
private final AllTieredShuffleMasterSnapshots allTierSnapshots;
35+
36+
public TieredInternalShuffleMasterSnapshot(
37+
Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors,
38+
AllTieredShuffleMasterSnapshots allTierSnapshots) {
39+
this.shuffleDescriptors = shuffleDescriptors;
40+
this.allTierSnapshots = allTierSnapshots;
41+
}
42+
43+
public Map<ResultPartitionID, ShuffleDescriptor> getShuffleDescriptors() {
44+
return shuffleDescriptors;
45+
}
46+
47+
public AllTieredShuffleMasterSnapshots getAllTierSnapshots() {
48+
return allTierSnapshots;
49+
}
50+
51+
@Override
52+
public boolean isIncremental() {
53+
return true;
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
20+
21+
import java.io.Serializable;
22+
23+
/**
24+
* This class represents a snapshot of tiered shuffle master, which can be used to restore the
25+
* internal state of the shuffle master.
26+
*
27+
* <p>IMPORTANT: It is incremental.
28+
*/
29+
public interface TieredShuffleMasterSnapshot extends Serializable {}

0 commit comments

Comments
 (0)