-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HPCC4J-606 DFSClient Allow ReadBuffer size to be set #788
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,217 @@ | ||
/******************************************************************************* | ||
* HPCC SYSTEMS software Copyright (C) 2024 HPCC Systems®. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on | ||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*******************************************************************************/ | ||
|
||
package org.hpccsystems.dfs.client; | ||
|
||
import java.io.IOException; | ||
|
||
public class CircularByteBuffer | ||
{ | ||
private final byte[] buffer; | ||
private int readPos = 0; | ||
private int writePos = 0; | ||
private int markPos = 0; | ||
private int bytesReadAfterMark = 0; | ||
private int currentNumberOfBytes = 0; | ||
|
||
public CircularByteBuffer(int bufferSize) | ||
{ | ||
buffer = new byte[bufferSize]; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are 0 and negative ints ok here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we consider an upper limit as well? |
||
} | ||
|
||
public int getCurrentNumberOfBytes() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is the meaning of current bytes? I assume this is the number of occupied bytes ? is there a more self documenting name for the method and the member? |
||
{ | ||
// We only adjust for the mark internally and when providing information about available space | ||
return currentNumberOfBytes; | ||
} | ||
|
||
public boolean hasSpace() | ||
{ | ||
return getSpace() > 0; | ||
} | ||
|
||
public int getSpace() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. getAvailableByteCount? |
||
{ | ||
int adjustedByteCount = currentNumberOfBytes; | ||
if (markPos >= 0) | ||
{ | ||
adjustedByteCount += bytesReadAfterMark; | ||
} | ||
|
||
return buffer.length - adjustedByteCount; | ||
} | ||
|
||
public int getContiguousSpace() | ||
{ | ||
if (!hasSpace()) | ||
{ | ||
return 0; | ||
} | ||
|
||
// If we have a marked position we don't want to allow that space to be written to until after reset has been called | ||
int rPos = readPos; | ||
if (markPos >= 0) | ||
{ | ||
rPos = markPos; | ||
} | ||
|
||
if (writePos >= rPos) | ||
{ | ||
return buffer.length - writePos; | ||
} | ||
else | ||
{ | ||
return rPos - writePos; | ||
} | ||
} | ||
|
||
public int getWriteOffset() | ||
{ | ||
return writePos; | ||
} | ||
|
||
public int incrementWriteOffset(int increment) | ||
{ | ||
int maxIncrement = buffer.length - writePos; | ||
increment = Math.min(increment, maxIncrement); | ||
|
||
writePos += increment; | ||
if (writePos >= buffer.length) | ||
{ | ||
writePos = 0; | ||
} | ||
|
||
currentNumberOfBytes += increment; | ||
return increment; | ||
} | ||
|
||
public void add(final byte[] targetBuffer, final int offset, final int length) throws IOException | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's add javadoc comments to these methods (copilot might be able to do this for us) |
||
{ | ||
if (currentNumberOfBytes + length > buffer.length) | ||
{ | ||
throw new IOException("Not enough space available"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Who is the target of the message, and will this be enough information to trivially identify the problem, and formulate a solution? |
||
} | ||
|
||
for (int i = 0; i < length; i++) | ||
{ | ||
buffer[writePos] = targetBuffer[offset + i]; | ||
if (++writePos == buffer.length) | ||
{ | ||
writePos = 0; | ||
} | ||
} | ||
currentNumberOfBytes += length; | ||
} | ||
|
||
public byte read() throws IOException | ||
{ | ||
if (currentNumberOfBytes <= 0) | ||
{ | ||
throw new IOException("No bytes available to read"); | ||
} | ||
|
||
byte b = buffer[readPos]; | ||
currentNumberOfBytes--; | ||
|
||
if (markPos >= 0) | ||
{ | ||
bytesReadAfterMark++; | ||
} | ||
|
||
readPos++; | ||
if (readPos >= buffer.length) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for consistency's sake this could change to if (++readPos >= buffer.length) |
||
{ | ||
readPos = 0; | ||
} | ||
|
||
return b; | ||
} | ||
|
||
public void read(final byte[] targetBuffer, final int targetOffset, final int length) throws IOException | ||
{ | ||
if (length > currentNumberOfBytes) | ||
{ | ||
throw new IOException("Not enough bytes available to read"); | ||
} | ||
|
||
if (readPos + length <= buffer.length) | ||
{ | ||
System.arraycopy(buffer, readPos, targetBuffer, targetOffset, length); | ||
} | ||
else | ||
{ | ||
int firstCopyLength = buffer.length - readPos; | ||
System.arraycopy(buffer, readPos, targetBuffer, targetOffset, firstCopyLength); | ||
System.arraycopy(buffer, 0, targetBuffer, targetOffset + firstCopyLength, length - firstCopyLength); | ||
rpastrana marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
readPos += length; | ||
if (readPos >= buffer.length) | ||
{ | ||
readPos -= buffer.length; | ||
} | ||
|
||
currentNumberOfBytes -= length; | ||
if (markPos >= 0) | ||
{ | ||
bytesReadAfterMark += length; | ||
} | ||
} | ||
|
||
public byte[] getInternalBuffer() | ||
{ | ||
return buffer; | ||
} | ||
|
||
public void mark(int readLim) throws IllegalArgumentException | ||
{ | ||
if (readLim > buffer.length) | ||
{ | ||
throw new IllegalArgumentException("Read limit exceeds available bytes"); | ||
} | ||
|
||
markPos = readPos; | ||
bytesReadAfterMark = 0; | ||
} | ||
|
||
public void reset() | ||
{ | ||
if (markPos < 0) | ||
{ | ||
return; | ||
} | ||
|
||
currentNumberOfBytes += bytesReadAfterMark; | ||
|
||
readPos = markPos; | ||
markPos = -1; | ||
bytesReadAfterMark = 0; | ||
} | ||
|
||
public long skip(long n) throws IOException | ||
{ | ||
if (n > currentNumberOfBytes) | ||
{ | ||
throw new IOException("Not enough bytes available to skip"); | ||
} | ||
|
||
readPos += n; | ||
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Show resolved
Hide resolved
|
||
if (readPos >= buffer.length) | ||
{ | ||
readPos -= buffer.length; | ||
} | ||
|
||
currentNumberOfBytes -= n; | ||
return n; | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2025?