Skip to content

Commit 155c1a4

Browse files
committed
HPCC4J-606 DFSClient Allow ReadBuffer size to be set
- Created new CircularByteBuffer class - Created CircularByteBufferTest - Made changes to allow read buffer size to be smaller than the read request size Signed-off-by: James McMullan [email protected]
1 parent a41b476 commit 155c1a4

File tree

3 files changed

+444
-174
lines changed

3 files changed

+444
-174
lines changed
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
/*******************************************************************************
2+
* HPCC SYSTEMS software Copyright (C) 2024 HPCC Systems®.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*******************************************************************************/
13+
14+
package org.hpccsystems.dfs.client;
15+
16+
import java.io.IOException;
17+
18+
public class CircularByteBuffer
19+
{
20+
private final byte[] buffer;
21+
private int readPos = 0;
22+
private int writePos = 0;
23+
private int markPos = 0;
24+
private int bytesReadAfterMark = 0;
25+
private int currentNumberOfBytes = 0;
26+
27+
public CircularByteBuffer(int bufferSize)
28+
{
29+
buffer = new byte[bufferSize];
30+
}
31+
32+
public int getCurrentNumberOfBytes()
33+
{
34+
// We only adjust for the mark internally and when providing information about available space
35+
return currentNumberOfBytes;
36+
}
37+
38+
public boolean hasSpace()
39+
{
40+
return getSpace() > 0;
41+
}
42+
43+
public int getSpace()
44+
{
45+
int adjustedByteCount = currentNumberOfBytes;
46+
if (markPos >= 0)
47+
{
48+
adjustedByteCount += bytesReadAfterMark;
49+
}
50+
51+
return buffer.length - adjustedByteCount;
52+
}
53+
54+
public int getContiguousSpace()
55+
{
56+
if (!hasSpace())
57+
{
58+
return 0;
59+
}
60+
61+
// If we have a marked position we don't want to allow that space to be written to until after reset has been called
62+
int rPos = readPos;
63+
if (markPos >= 0)
64+
{
65+
rPos = markPos;
66+
}
67+
68+
if (writePos >= rPos)
69+
{
70+
return buffer.length - writePos;
71+
}
72+
else
73+
{
74+
return rPos - writePos;
75+
}
76+
}
77+
78+
public int getWriteOffset()
79+
{
80+
return writePos;
81+
}
82+
83+
public int incrementWriteOffset(int increment)
84+
{
85+
int maxIncrement = buffer.length - writePos;
86+
increment = Math.min(increment, maxIncrement);
87+
88+
writePos += increment;
89+
if (writePos >= buffer.length)
90+
{
91+
writePos = 0;
92+
}
93+
94+
currentNumberOfBytes += increment;
95+
return increment;
96+
}
97+
98+
public void add(final byte[] targetBuffer, final int offset, final int length) throws IOException
99+
{
100+
if (currentNumberOfBytes + length > buffer.length)
101+
{
102+
throw new IOException("Not enough space available");
103+
}
104+
105+
for (int i = 0; i < length; i++)
106+
{
107+
buffer[writePos] = targetBuffer[offset + i];
108+
if (++writePos == buffer.length)
109+
{
110+
writePos = 0;
111+
}
112+
}
113+
currentNumberOfBytes += length;
114+
}
115+
116+
public byte read() throws IOException
117+
{
118+
if (currentNumberOfBytes <= 0)
119+
{
120+
throw new IOException("No bytes available to read");
121+
}
122+
123+
byte b = buffer[readPos];
124+
currentNumberOfBytes--;
125+
126+
if (markPos >= 0)
127+
{
128+
bytesReadAfterMark++;
129+
}
130+
131+
readPos++;
132+
if (readPos >= buffer.length)
133+
{
134+
readPos = 0;
135+
}
136+
137+
return b;
138+
}
139+
140+
public void read(final byte[] targetBuffer, final int targetOffset, final int length) throws IOException
141+
{
142+
if (length > currentNumberOfBytes)
143+
{
144+
throw new IOException("Not enough bytes available to read");
145+
}
146+
147+
if (readPos + length <= buffer.length)
148+
{
149+
System.arraycopy(buffer, readPos, targetBuffer, targetOffset, length);
150+
}
151+
else
152+
{
153+
int firstCopyLength = buffer.length - readPos;
154+
System.arraycopy(buffer, readPos, targetBuffer, targetOffset, firstCopyLength);
155+
System.arraycopy(buffer, 0, targetBuffer, targetOffset + firstCopyLength, length - firstCopyLength);
156+
}
157+
158+
readPos += length;
159+
if (readPos >= buffer.length)
160+
{
161+
readPos -= buffer.length;
162+
}
163+
164+
currentNumberOfBytes -= length;
165+
if (markPos >= 0)
166+
{
167+
bytesReadAfterMark += length;
168+
}
169+
}
170+
171+
public byte[] getInternalBuffer()
172+
{
173+
return buffer;
174+
}
175+
176+
public void mark(int readLim) throws IllegalArgumentException
177+
{
178+
if (readLim > buffer.length)
179+
{
180+
throw new IllegalArgumentException("Read limit exceeds available bytes");
181+
}
182+
183+
markPos = readPos;
184+
bytesReadAfterMark = 0;
185+
}
186+
187+
public void reset()
188+
{
189+
if (markPos < 0)
190+
{
191+
return;
192+
}
193+
194+
currentNumberOfBytes += bytesReadAfterMark;
195+
196+
readPos = markPos;
197+
markPos = -1;
198+
bytesReadAfterMark = 0;
199+
}
200+
201+
public long skip(long n) throws IOException
202+
{
203+
if (n > currentNumberOfBytes)
204+
{
205+
throw new IOException("Not enough bytes available to skip");
206+
}
207+
208+
readPos += n;
209+
if (readPos >= buffer.length)
210+
{
211+
readPos -= buffer.length;
212+
}
213+
214+
currentNumberOfBytes -= n;
215+
return n;
216+
}
217+
};

0 commit comments

Comments
 (0)