Skip to content

Commit 1523f2c

Browse files
committed
[FLINK-36934][network] Enrich numSubpartitions to TierMasterAgent#addPartitionAndGetShuffleDescriptor
1 parent a0d2d87 commit 1523f2c

File tree

7 files changed

+11
-9
lines changed

7 files changed

+11
-9
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,9 @@ public void unregisterJob(JobID jobID) {
8181
}
8282

8383
public List<TierShuffleDescriptor> addPartitionAndGetShuffleDescriptor(
84-
JobID jobID, ResultPartitionID resultPartitionID) {
84+
JobID jobID, int numSubpartitions, ResultPartitionID resultPartitionID) {
8585
return tieredStorageMasterClient.addPartitionAndGetShuffleDescriptor(
86-
jobID, resultPartitionID);
86+
jobID, numSubpartitions, resultPartitionID);
8787
}
8888

8989
public void releasePartition(ShuffleDescriptor shuffleDescriptor) {

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,12 @@ public void unregisterJob(JobID jobID) {
4949
}
5050

5151
public List<TierShuffleDescriptor> addPartitionAndGetShuffleDescriptor(
52-
JobID jobID, ResultPartitionID resultPartitionID) {
52+
JobID jobID, int numSubpartitions, ResultPartitionID resultPartitionID) {
5353
return tiers.stream()
5454
.map(
5555
tierMasterAgent ->
5656
tierMasterAgent.addPartitionAndGetShuffleDescriptor(
57-
jobID, resultPartitionID))
57+
jobID, numSubpartitions, resultPartitionID))
5858
.collect(Collectors.toList());
5959
}
6060

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpMasterAgent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public void unregisterJob(JobID jobID) {
3838

3939
@Override
4040
public TierShuffleDescriptor addPartitionAndGetShuffleDescriptor(
41-
JobID jobID, ResultPartitionID resultPartitionID) {
41+
JobID jobID, int numSubpartitions, ResultPartitionID resultPartitionID) {
4242
// noop
4343
return NoOpTierShuffleDescriptor.INSTANCE;
4444
}

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierMasterAgent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public interface TierMasterAgent {
3232

3333
/** Add a new tiered storage partition and get the {@link TierShuffleDescriptor}. */
3434
TierShuffleDescriptor addPartitionAndGetShuffleDescriptor(
35-
JobID jobID, ResultPartitionID resultPartitionID);
35+
JobID jobID, int numSubpartitions, ResultPartitionID resultPartitionID);
3636

3737
/**
3838
* Release a tiered storage partition.

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void unregisterJob(JobID jobID) {
5656

5757
@Override
5858
public TierShuffleDescriptor addPartitionAndGetShuffleDescriptor(
59-
JobID jobID, ResultPartitionID resultPartitionID) {
59+
JobID jobID, int numSubpartitions, ResultPartitionID resultPartitionID) {
6060
TieredStoragePartitionId partitionId = convertId(resultPartitionID);
6161
resourceRegistry.registerResource(
6262
partitionId,

flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,9 @@ public CompletableFuture<NettyShuffleDescriptor> registerPartitionWithProducer(
108108
if (tieredInternalShuffleMaster != null) {
109109
tierShuffleDescriptors =
110110
tieredInternalShuffleMaster.addPartitionAndGetShuffleDescriptor(
111-
jobID, resultPartitionID);
111+
jobID,
112+
partitionDescriptor.getNumberOfSubpartitions(),
113+
resultPartitionID);
112114
}
113115

114116
NettyShuffleDescriptor shuffleDeploymentDescriptor =

flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgentTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ void testAddAndReleasePartition() throws IOException {
5252
RemoteTierMasterAgent masterAgent =
5353
new RemoteTierMasterAgent(tempFolder.getAbsolutePath(), resourceRegistry);
5454
TierShuffleDescriptor tierShuffleDescriptor =
55-
masterAgent.addPartitionAndGetShuffleDescriptor(new JobID(), resultPartitionID);
55+
masterAgent.addPartitionAndGetShuffleDescriptor(new JobID(), 1, resultPartitionID);
5656
assertThat(partitionFile.exists()).isTrue();
5757
masterAgent.releasePartition(tierShuffleDescriptor);
5858

0 commit comments

Comments
 (0)