Skip to content

Commit

Permalink
HPCC4J-606 DFSClient Allow ReadBuffer size to be set (#788)
Browse files Browse the repository at this point in the history
- Created new CircularByteBuffer class
- Created CircularByteBufferTest
- Made changes to allow read buffer size to be smaller than the read request size

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu authored Feb 25, 2025
1 parent 4bffc93 commit 76fb477
Show file tree
Hide file tree
Showing 4 changed files with 770 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public int read() throws IOException
{
streamPos++;
}

return ret;
}

Expand Down Expand Up @@ -155,6 +156,8 @@ public class BinaryRecordReader implements IRecordReader
100000000000L, 1000000000000L, 10000000000000L, 100000000000000L, 1000000000000000L };
private static final int[] signMap = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, +1, -1, +1, -1, +1, +1 };

private static final int SLEEP_TIME_WARN_MS = 100;
private static final int SHORT_SLEEP_MS = 1;
private static final int MASK_32_LOWER_HALF = 0xffff;
private static final int BUFFER_GROW_SIZE = 8192;
private static final int OPTIMIZED_STRING_READ_AHEAD = 32;
Expand Down Expand Up @@ -725,6 +728,8 @@ private void readIntoScratchBuffer(int offset, int dataLen) throws IOException
int requiredCapacity = offset + dataLen;
ensureScratchBufferCapacity(requiredCapacity);

int totalSleepTimeMS = 0;

int position = offset;
int bytesConsumed = 0;
while (bytesConsumed < dataLen)
Expand All @@ -737,9 +742,24 @@ private void readIntoScratchBuffer(int offset, int dataLen) throws IOException
throw e;
}

if (bytesRead == 0)
{
try
{
Thread.sleep(SHORT_SLEEP_MS);
totalSleepTimeMS += SHORT_SLEEP_MS;
}
catch (InterruptedException e) {}
}

position += bytesRead;
bytesConsumed += bytesRead;
}

if (totalSleepTimeMS > SLEEP_TIME_WARN_MS)
{
messages.addMessage("Warning: BinaryRecordReader.readIntoScratchBuffer(): slept for more than " + SLEEP_TIME_WARN_MS + "ms");
}
}

/**
Expand Down Expand Up @@ -1294,26 +1314,10 @@ else if ((this.scratchBuffer[strByteLen + bytesScanned] & 0xF8) == 0xF0)
// Use the second half of the remaining buffer space as a temp place to read in compressed bytes.
// Beginning of the buffer will be used to construct the string

int bytesToRead = compressedLen;
int availableBytes = 0;
try
{
availableBytes = this.inputStream.available();
}
catch(Exception e)
{
throw new IOException("Error, unexpected EOS while constructing QString.");
}

if (bytesToRead > availableBytes)
{
bytesToRead = availableBytes;
}

// Scratch buffer is divided into two parts. First expandedLen bytes are for the final expanded string
// Remaining bytes are for reading in the compressed string.
int readPos = expandedLen + compressedBytesConsumed;
readIntoScratchBuffer(readPos, bytesToRead);
readIntoScratchBuffer(readPos, compressedLen);

// We want to consume only a whole chunk so round off residual chars
// Below we will handle any residual bytes. (strLen % 4)
Expand All @@ -1334,7 +1338,7 @@ else if ((this.scratchBuffer[strByteLen + bytesScanned] & 0xF8) == 0xF0)
compressedBytesConsumed += QSTR_COMPRESSED_CHUNK_LEN;
}

compressedBytesRead += bytesToRead;
compressedBytesRead += compressedLen;
strByteLen += writePos;
}

Expand Down
Loading

0 comments on commit 76fb477

Please sign in to comment.