Skip to content

Commit 38068d9

Browse files
committed
Code review changes
1 parent 6a0bcc5 commit 38068d9

File tree

3 files changed

+22
-2
lines changed

3 files changed

+22
-2
lines changed

dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,8 @@ public class BinaryRecordReader implements IRecordReader
156156
100000000000L, 1000000000000L, 10000000000000L, 100000000000000L, 1000000000000000L };
157157
private static final int[] signMap = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, +1, -1, +1, -1, +1, +1 };
158158

159+
private static final int SLEEP_TIME_WARN_MS = 100;
160+
private static final int SHORT_SLEEP_MS = 1;
159161
private static final int MASK_32_LOWER_HALF = 0xffff;
160162
private static final int BUFFER_GROW_SIZE = 8192;
161163
private static final int OPTIMIZED_STRING_READ_AHEAD = 32;
@@ -726,6 +728,8 @@ private void readIntoScratchBuffer(int offset, int dataLen) throws IOException
726728
int requiredCapacity = offset + dataLen;
727729
ensureScratchBufferCapacity(requiredCapacity);
728730

731+
int totalSleepTimeMS = 0;
732+
729733
int position = offset;
730734
int bytesConsumed = 0;
731735
while (bytesConsumed < dataLen)
@@ -742,14 +746,20 @@ private void readIntoScratchBuffer(int offset, int dataLen) throws IOException
742746
{
743747
try
744748
{
745-
Thread.sleep(1);
749+
Thread.sleep(SHORT_SLEEP_MS);
750+
totalSleepTimeMS += SHORT_SLEEP_MS;
746751
}
747752
catch (InterruptedException e) {}
748753
}
749754

750755
position += bytesRead;
751756
bytesConsumed += bytesRead;
752757
}
758+
759+
if (totalSleepTimeMS > SLEEP_TIME_WARN_MS)
760+
{
761+
messages.addMessage("Warning: BinaryRecordReader.readIntoScratchBuffer(): slept for more than " + SLEEP_TIME_WARN_MS + "ms");
762+
}
753763
}
754764

755765
/**

dfsclient/src/main/java/org/hpccsystems/dfs/client/CircularByteBuffer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515

1616
public class CircularByteBuffer
1717
{
18+
// Previous testing found that increasing read size beyond 16MB did not improve performance.
19+
// New implementation separates the two read size from the buffer size, but it wouldn't make
20+
// sense to go beyond 16MB for the buffer size
21+
public static final int MAX_BUFFER_SIZE = 16 * 1024 * 1024;
22+
1823
private final byte[] buffer;
1924
private int readPos = 0;
2025
private int writePos = 0;
@@ -35,6 +40,11 @@ public CircularByteBuffer(int bufferSize) throws IllegalArgumentException
3540
throw new IllegalArgumentException("Buffer size must be greater than 0");
3641
}
3742

43+
if (bufferSize > MAX_BUFFER_SIZE)
44+
{
45+
throw new IllegalArgumentException("Buffer size must be less than " + MAX_BUFFER_SIZE);
46+
}
47+
3848
buffer = new byte[bufferSize];
3949
}
4050

dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1447,7 +1447,7 @@ public int available() throws IOException
14471447
{
14481448
String prefix = "RowServiceInputStream.available(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":";
14491449

1450-
IOException wrappedException = new IOException(prefix + "End of input stream");
1450+
IOException wrappedException = new IOException(prefix + "End of input stream, streamPos: " + streamPos);
14511451
throw wrappedException;
14521452
}
14531453
}

0 commit comments

Comments
 (0)