Skip to content

Commit 4e2bc4d

Browse files
committed
[FLINK-37218][network] Avoid repeatedly reading the broadcast buffer from the sort-merge shuffle data file while accumulating small buffers from multiple sub-partitions.
1 parent efcee81 commit 4e2bc4d

File tree

3 files changed

+114
-42
lines changed

3 files changed

+114
-42
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java

Lines changed: 80 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.flink.runtime.io.network.partition;
2020

2121
import org.apache.flink.annotation.VisibleForTesting;
22-
import org.apache.flink.api.java.tuple.Tuple2;
2322
import org.apache.flink.core.memory.MemorySegment;
2423
import org.apache.flink.runtime.io.network.buffer.Buffer;
2524
import org.apache.flink.runtime.io.network.buffer.BufferHeader;
@@ -32,6 +31,7 @@
3231
import java.nio.channels.FileChannel;
3332
import java.util.ArrayDeque;
3433
import java.util.Queue;
34+
import java.util.function.BiConsumer;
3535
import java.util.function.Consumer;
3636

3737
import static org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.HEADER_LENGTH;
@@ -74,8 +74,10 @@ class PartitionedFileReader {
7474
/** Number of remaining bytes in the current data region read. */
7575
private long currentRegionRemainingBytes;
7676

77-
/** A queue storing pairs of file offsets and sizes to be read. */
78-
private final Queue<Tuple2<Long, Long>> offsetAndSizesToRead = new ArrayDeque<>();
77+
/** A queue storing {@link BufferPositionDescriptor} to be read. */
78+
private final Queue<BufferPositionDescriptor> readBufferPositions = new ArrayDeque<>();
79+
80+
private BufferPositionDescriptor currentBufferPositionDescriptor;
7981

8082
PartitionedFileReader(
8183
PartitionedFile partitionedFile,
@@ -100,23 +102,23 @@ class PartitionedFileReader {
100102

101103
private void moveToNextReadablePosition(ByteBuffer indexEntryBuf) throws IOException {
102104
while (currentRegionRemainingBytes <= 0 && hasNextPositionToRead()) {
103-
if (!offsetAndSizesToRead.isEmpty()) {
104-
Tuple2<Long, Long> offsetAndSize = offsetAndSizesToRead.poll();
105-
nextOffsetToRead = offsetAndSize.f0;
106-
currentRegionRemainingBytes = offsetAndSize.f1;
105+
if (!readBufferPositions.isEmpty()) {
106+
BufferPositionDescriptor descriptor = readBufferPositions.poll();
107+
nextOffsetToRead = descriptor.offset;
108+
currentRegionRemainingBytes = descriptor.size;
109+
currentBufferPositionDescriptor = descriptor;
107110
} else {
108111
// move to next region which has buffers
109112
if (nextRegionToRead < partitionedFile.getNumRegions()) {
110-
updateReadableOffsetAndSize(indexEntryBuf, offsetAndSizesToRead);
113+
updateReadableOffsetAndSize(indexEntryBuf, readBufferPositions);
111114
++nextRegionToRead;
112115
}
113116
}
114117
}
115118
}
116119

117120
private boolean hasNextPositionToRead() {
118-
return !offsetAndSizesToRead.isEmpty()
119-
|| nextRegionToRead < partitionedFile.getNumRegions();
121+
return !readBufferPositions.isEmpty() || nextRegionToRead < partitionedFile.getNumRegions();
120122
}
121123

122124
/**
@@ -144,31 +146,31 @@ private boolean hasNextPositionToRead() {
144146
*
145147
* @param indexEntryBuf A ByteBuffer containing index entries which provide offset and size
146148
* information.
147-
* @param offsetAndSizesToRead A queue to store the updated offsets and sizes.
149+
* @param readBufferPositions A queue to store the buffer position descriptors.
148150
* @throws IOException If an I/O error occurs when accessing the index file channel.
149151
*/
150152
@VisibleForTesting
151153
void updateReadableOffsetAndSize(
152-
ByteBuffer indexEntryBuf, Queue<Tuple2<Long, Long>> offsetAndSizesToRead)
154+
ByteBuffer indexEntryBuf, Queue<BufferPositionDescriptor> readBufferPositions)
153155
throws IOException {
154156
int startSubpartition = subpartitionIndexSet.getStartIndex();
155157
int endSubpartition = subpartitionIndexSet.getEndIndex();
156158

157159
if (startSubpartition >= subpartitionOrderRotationIndex
158160
|| endSubpartition < subpartitionOrderRotationIndex) {
159161
updateReadableOffsetAndSize(
160-
startSubpartition, endSubpartition, indexEntryBuf, offsetAndSizesToRead);
162+
startSubpartition, endSubpartition, indexEntryBuf, readBufferPositions);
161163
} else {
162164
updateReadableOffsetAndSize(
163165
subpartitionOrderRotationIndex,
164166
endSubpartition,
165167
indexEntryBuf,
166-
offsetAndSizesToRead);
168+
readBufferPositions);
167169
updateReadableOffsetAndSize(
168170
startSubpartition,
169171
subpartitionOrderRotationIndex - 1,
170172
indexEntryBuf,
171-
offsetAndSizesToRead);
173+
readBufferPositions);
172174
}
173175
}
174176

@@ -181,15 +183,15 @@ void updateReadableOffsetAndSize(
181183
* @param startSubpartition The starting index of the subpartition range to be processed.
182184
* @param endSubpartition The ending index of the subpartition range to be processed.
183185
* @param indexEntryBuf A ByteBuffer containing the index entries to read offsets and sizes.
184-
* @param offsetAndSizesToRead A queue to store the updated offsets and sizes.
186+
* @param readBufferPositions A queue to store the buffer position descriptors.
185187
* @throws IOException If an I/O error occurs during reading of index entries.
186188
* @throws IllegalStateException If offsets are not contiguous and not from a single buffer.
187189
*/
188190
private void updateReadableOffsetAndSize(
189191
int startSubpartition,
190192
int endSubpartition,
191193
ByteBuffer indexEntryBuf,
192-
Queue<Tuple2<Long, Long>> offsetAndSizesToRead)
194+
Queue<BufferPositionDescriptor> readBufferPositions)
193195
throws IOException {
194196
partitionedFile.getIndexEntry(
195197
indexFileChannel, indexEntryBuf, nextRegionToRead, startSubpartition);
@@ -202,18 +204,28 @@ private void updateReadableOffsetAndSize(
202204
long endPartitionSize = indexEntryBuf.getLong();
203205

204206
if (startPartitionOffset != endPartitionOffset || startPartitionSize != endPartitionSize) {
205-
offsetAndSizesToRead.add(
206-
Tuple2.of(
207+
readBufferPositions.add(
208+
new BufferPositionDescriptor(
207209
startPartitionOffset,
208-
endPartitionOffset + endPartitionSize - startPartitionOffset));
210+
endPartitionOffset + endPartitionSize - startPartitionOffset,
211+
1));
209212
} else if (startPartitionSize != 0) {
210213
// this branch is for broadcast subpartitions
211-
for (int i = startSubpartition; i <= endSubpartition; i++) {
212-
offsetAndSizesToRead.add(Tuple2.of(startPartitionOffset, startPartitionSize));
213-
}
214+
readBufferPositions.add(
215+
new BufferPositionDescriptor(
216+
startPartitionOffset,
217+
startPartitionSize,
218+
endSubpartition - startSubpartition + 1));
214219
}
215220
}
216221

222+
@VisibleForTesting
223+
void readCurrentRegion(
224+
Queue<MemorySegment> freeSegments, BufferRecycler recycler, Consumer<Buffer> consumer)
225+
throws IOException {
226+
readCurrentRegion(freeSegments, recycler, (buffer, repeatCount) -> consumer.accept(buffer));
227+
}
228+
217229
/**
218230
* Reads a buffer from the current region of the target {@link PartitionedFile} and moves the
219231
* read position forward.
@@ -226,7 +238,9 @@ private void updateReadableOffsetAndSize(
226238
* @return Whether the file reader has remaining data to read.
227239
*/
228240
boolean readCurrentRegion(
229-
Queue<MemorySegment> freeSegments, BufferRecycler recycler, Consumer<Buffer> consumer)
241+
Queue<MemorySegment> freeSegments,
242+
BufferRecycler recycler,
243+
BiConsumer<Buffer, Integer> consumer)
230244
throws IOException {
231245
if (currentRegionRemainingBytes == 0) {
232246
return false;
@@ -300,7 +314,7 @@ private BufferAndHeader processBuffer(
300314
ByteBuffer byteBuffer,
301315
Buffer buffer,
302316
BufferAndHeader partialBuffer,
303-
Consumer<Buffer> consumer) {
317+
BiConsumer<Buffer, Integer> consumer) {
304318
BufferHeader header = partialBuffer.header;
305319
CompositeBuffer targetBuffer = partialBuffer.buffer;
306320
while (byteBuffer.hasRemaining()) {
@@ -331,7 +345,7 @@ private BufferAndHeader processBuffer(
331345
}
332346

333347
header = null;
334-
consumer.accept(targetBuffer);
348+
consumer.accept(targetBuffer, currentBufferPositionDescriptor.repeatCount);
335349
targetBuffer = null;
336350
}
337351
return new BufferAndHeader(targetBuffer, header);
@@ -366,4 +380,44 @@ private static class BufferAndHeader {
366380
this.header = header;
367381
}
368382
}
383+
384+
/**
385+
* Represents the position and size of a buffer along with the repeat count. For a regular
386+
* buffer, the repeat count is typically one. For a broadcast buffer, the repeat count
387+
* corresponds to the number of subpartitions.
388+
*/
389+
@VisibleForTesting
390+
static class BufferPositionDescriptor {
391+
private final long offset;
392+
private final long size;
393+
private final int repeatCount;
394+
395+
/**
396+
* Constructs a BufferPositionDescriptor with specified offset, size, and repeat count.
397+
*
398+
* @param offset the offset of the buffer
399+
* @param size the size of the buffer
400+
* @param repeatCount the repeat count for the buffer
401+
*/
402+
BufferPositionDescriptor(long offset, long size, int repeatCount) {
403+
this.offset = offset;
404+
this.size = size;
405+
this.repeatCount = repeatCount;
406+
}
407+
408+
@VisibleForTesting
409+
long getOffset() {
410+
return offset;
411+
}
412+
413+
@VisibleForTesting
414+
long getSize() {
415+
return size;
416+
}
417+
418+
@VisibleForTesting
419+
int getRepeatCount() {
420+
return repeatCount;
421+
}
422+
}
369423
}

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,14 +111,14 @@ public BufferAndBacklog getNextBuffer() {
111111
}
112112
}
113113

114-
private void addBuffer(Buffer buffer) {
114+
private void addBuffer(Buffer buffer, int repeatCount) {
115115
boolean needRecycleBuffer = false;
116116

117117
synchronized (lock) {
118118
if (isReleased) {
119119
needRecycleBuffer = true;
120120
} else {
121-
addBufferToFullyFilledBuffer(buffer);
121+
addBufferToFullyFilledBuffer(buffer, repeatCount);
122122
}
123123
}
124124

@@ -128,6 +128,14 @@ private void addBuffer(Buffer buffer) {
128128
}
129129
}
130130

131+
private void addBufferToFullyFilledBuffer(Buffer buffer, int repeatCount) {
132+
for (int i = 0; i < repeatCount; i++) {
133+
addBufferToFullyFilledBuffer(buffer);
134+
buffer.retainBuffer();
135+
}
136+
buffer.recycleBuffer();
137+
}
138+
131139
private void addBufferToFullyFilledBuffer(Buffer buffer) {
132140
if (toFilledBuffer == null) {
133141
toFilledBuffer =

flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -212,50 +212,60 @@ private void verifyReadablePosition(
212212
createAndConfigIndexEntryBuffer(),
213213
subpartitionOrderRotationIndex);
214214

215-
Queue<Tuple2<Long, Long>> offsetAndSizesToRead = new ArrayDeque<>();
215+
Queue<PartitionedFileReader.BufferPositionDescriptor> offsetAndSizesToRead =
216+
new ArrayDeque<>();
216217
fileReader.updateReadableOffsetAndSize(
217218
createAndConfigIndexEntryBuffer(), offsetAndSizesToRead);
218219

219220
if (isBroadcastRegion) {
220-
assertThat(offsetAndSizesToRead).hasSize(end - start + 1);
221-
for (Tuple2<Long, Long> tuple2 : offsetAndSizesToRead) {
222-
assertThat(tuple2.f0).isEqualTo(regionStat[start].get(0).f0);
223-
assertThat(tuple2.f1).isEqualTo(regionStat[start].get(0).f1);
221+
assertThat(
222+
offsetAndSizesToRead.stream()
223+
.map(
224+
PartitionedFileReader.BufferPositionDescriptor
225+
::getRepeatCount)
226+
.reduce(Integer::sum)
227+
.get())
228+
.isEqualTo(end - start + 1);
229+
for (PartitionedFileReader.BufferPositionDescriptor descriptor : offsetAndSizesToRead) {
230+
assertThat(descriptor.getOffset()).isEqualTo(regionStat[start].get(0).f0);
231+
assertThat(descriptor.getSize()).isEqualTo(regionStat[start].get(0).f1);
224232
}
225233
return;
226234
}
227235

228236
if (start >= subpartitionOrderRotationIndex || end <= subpartitionOrderRotationIndex - 1) {
229237
assertThat(offsetAndSizesToRead).hasSize(1);
230238

231-
Tuple2<Long, Long> offsetAndSize = offsetAndSizesToRead.poll();
232-
assertThat(offsetAndSize.f0).isEqualTo(regionStat[start].get(0).f0);
239+
PartitionedFileReader.BufferPositionDescriptor descriptor = offsetAndSizesToRead.poll();
240+
assertThat(descriptor.getOffset()).isEqualTo(regionStat[start].get(0).f0);
233241

234242
long expectedSize = 0L;
235243
for (int i = start; i <= end; i++) {
236244
expectedSize += regionStat[i].get(0).f1;
237245
}
238-
assertThat(offsetAndSize.f1).isEqualTo(expectedSize);
246+
assertThat(descriptor.getSize()).isEqualTo(expectedSize);
239247
} else {
240248
assertThat(offsetAndSizesToRead).hasSize(2);
241249

242-
Tuple2<Long, Long> offsetAndSize1 = offsetAndSizesToRead.poll();
243-
Tuple2<Long, Long> offsetAndSize2 = offsetAndSizesToRead.poll();
244-
assertThat(offsetAndSize1.f0)
250+
PartitionedFileReader.BufferPositionDescriptor descriptor1 =
251+
offsetAndSizesToRead.poll();
252+
PartitionedFileReader.BufferPositionDescriptor descriptor2 =
253+
offsetAndSizesToRead.poll();
254+
assertThat(descriptor1.getOffset())
245255
.isEqualTo(regionStat[subpartitionOrderRotationIndex].get(0).f0);
246-
assertThat(offsetAndSize2.f0).isEqualTo(regionStat[start].get(0).f0);
256+
assertThat(descriptor2.getOffset()).isEqualTo(regionStat[start].get(0).f0);
247257

248258
long expectedSize = 0L;
249259
for (int i = subpartitionOrderRotationIndex; i <= end; i++) {
250260
expectedSize += regionStat[i].get(0).f1;
251261
}
252-
assertThat(offsetAndSize1.f1).isEqualTo(expectedSize);
262+
assertThat(descriptor1.getSize()).isEqualTo(expectedSize);
253263

254264
expectedSize = 0L;
255265
for (int i = start; i < subpartitionOrderRotationIndex; i++) {
256266
expectedSize += regionStat[i].get(0).f1;
257267
}
258-
assertThat(offsetAndSize2.f1).isEqualTo(expectedSize);
268+
assertThat(descriptor2.getSize()).isEqualTo(expectedSize);
259269
}
260270
}
261271

0 commit comments

Comments
 (0)