Skip to content

Commit 75a84c5

Browse files
committed
Code review changes
1 parent 155c1a4 commit 75a84c5

File tree

4 files changed

+374
-94
lines changed

4 files changed

+374
-94
lines changed

dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public int read() throws IOException
7575
{
7676
streamPos++;
7777
}
78+
7879
return ret;
7980
}
8081

@@ -737,6 +738,15 @@ private void readIntoScratchBuffer(int offset, int dataLen) throws IOException
737738
throw e;
738739
}
739740

741+
if (bytesRead == 0)
742+
{
743+
try
744+
{
745+
Thread.sleep(1);
746+
}
747+
catch (InterruptedException e) {}
748+
}
749+
740750
position += bytesRead;
741751
bytesConsumed += bytesRead;
742752
}
@@ -1294,26 +1304,10 @@ else if ((this.scratchBuffer[strByteLen + bytesScanned] & 0xF8) == 0xF0)
12941304
// Use the second half of the remaining buffer space as a temp place to read in compressed bytes.
12951305
// Beginning of the buffer will be used to construct the string
12961306

1297-
int bytesToRead = compressedLen;
1298-
int availableBytes = 0;
1299-
try
1300-
{
1301-
availableBytes = this.inputStream.available();
1302-
}
1303-
catch(Exception e)
1304-
{
1305-
throw new IOException("Error, unexpected EOS while constructing QString.");
1306-
}
1307-
1308-
if (bytesToRead > availableBytes)
1309-
{
1310-
bytesToRead = availableBytes;
1311-
}
1312-
13131307
// Scratch buffer is divided into two parts. First expandedLen bytes are for the final expanded string
13141308
// Remaining bytes are for reading in the compressed string.
13151309
int readPos = expandedLen + compressedBytesConsumed;
1316-
readIntoScratchBuffer(readPos, bytesToRead);
1310+
readIntoScratchBuffer(readPos, compressedLen);
13171311

13181312
// We want to consume only a whole chunk so round off residual chars
13191313
// Below we will handle any residual bytes. (strLen % 4)
@@ -1334,7 +1328,7 @@ else if ((this.scratchBuffer[strByteLen + bytesScanned] & 0xF8) == 0xF0)
13341328
compressedBytesConsumed += QSTR_COMPRESSED_CHUNK_LEN;
13351329
}
13361330

1337-
compressedBytesRead += bytesToRead;
1331+
compressedBytesRead += compressedLen;
13381332
strByteLen += writePos;
13391333
}
13401334

dfsclient/src/main/java/org/hpccsystems/dfs/client/CircularByteBuffer.java

Lines changed: 109 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* HPCC SYSTEMS software Copyright (C) 2024 HPCC Systems®.
2+
* HPCC SYSTEMS software Copyright (C) 2025 HPCC Systems®.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
55
* the License. You may obtain a copy of the License at
@@ -13,34 +13,58 @@
1313

1414
package org.hpccsystems.dfs.client;
1515

16-
import java.io.IOException;
17-
1816
public class CircularByteBuffer
1917
{
2018
private final byte[] buffer;
2119
private int readPos = 0;
2220
private int writePos = 0;
23-
private int markPos = 0;
21+
private int markPos = -1;
2422
private int bytesReadAfterMark = 0;
2523
private int currentNumberOfBytes = 0;
2624

27-
public CircularByteBuffer(int bufferSize)
25+
/**
26+
* Instantiates a new circular byte buffer.
27+
*
28+
* @param bufferSize the buffer size
29+
* @throws IllegalArgumentException if buffer size is less than or equal to 0
30+
*/
31+
public CircularByteBuffer(int bufferSize) throws IllegalArgumentException
2832
{
33+
if (bufferSize <= 0)
34+
{
35+
throw new IllegalArgumentException("Buffer size must be greater than 0");
36+
}
37+
2938
buffer = new byte[bufferSize];
3039
}
3140

32-
public int getCurrentNumberOfBytes()
41+
/**
42+
* Gets the number of bytes available in the buffer.
43+
*
44+
* @return aviailable bytes
45+
*/
46+
public int getBytesAvailable()
3347
{
3448
// We only adjust for the mark internally and when providing information about available space
3549
return currentNumberOfBytes;
3650
}
3751

52+
/**
53+
* Checks if the buffer has space.
54+
*
55+
* @return true, if successful
56+
*/
3857
public boolean hasSpace()
3958
{
40-
return getSpace() > 0;
59+
return getFreeSpace() > 0;
4160
}
4261

43-
public int getSpace()
62+
/**
63+
* Gets the free space in the buffer.
64+
*
65+
* @return the free space
66+
*/
67+
public int getFreeSpace()
4468
{
4569
int adjustedByteCount = currentNumberOfBytes;
4670
if (markPos >= 0)
@@ -51,7 +75,12 @@ public int getSpace()
5175
return buffer.length - adjustedByteCount;
5276
}
5377

54-
public int getContiguousSpace()
78+
/**
79+
* Gets the contiguous free space in the buffer.
80+
*
81+
* @return the contiguous free space
82+
*/
83+
public int getContiguousFreeSpace()
5584
{
5685
if (!hasSpace())
5786
{
@@ -75,11 +104,22 @@ public int getContiguousSpace()
75104
}
76105
}
77106

107+
/**
108+
* Gets the location of the next write.
109+
*
110+
* @return the write offset
111+
*/
78112
public int getWriteOffset()
79113
{
80114
return writePos;
81115
}
82116

117+
/**
118+
* Increments write offset.
119+
*
120+
* @param increment number of bytes to increment
121+
* @return the number of bytes incremented
122+
*/
83123
public int incrementWriteOffset(int increment)
84124
{
85125
int maxIncrement = buffer.length - writePos;
@@ -95,29 +135,43 @@ public int incrementWriteOffset(int increment)
95135
return increment;
96136
}
97137

98-
public void add(final byte[] targetBuffer, final int offset, final int length) throws IOException
138+
/**
139+
* Adds the bytes to the buffer.
140+
*
141+
* @param srcBuffer the source buffer
142+
* @param offset the offset within the source buffer
143+
* @param length the length of bytes to add
144+
* @return the number of bytes added
145+
*/
146+
public int add(final byte[] srcBuffer, int offset, int length)
99147
{
100148
if (currentNumberOfBytes + length > buffer.length)
101149
{
102-
throw new IOException("Not enough space available");
150+
length = buffer.length - currentNumberOfBytes;
103151
}
104152

105153
for (int i = 0; i < length; i++)
106154
{
107-
buffer[writePos] = targetBuffer[offset + i];
155+
buffer[writePos] = srcBuffer[offset + i];
108156
if (++writePos == buffer.length)
109157
{
110158
writePos = 0;
111159
}
112160
}
113161
currentNumberOfBytes += length;
162+
return length;
114163
}
115164

116-
public byte read() throws IOException
165+
/**
166+
* Reads a byte from the buffer.
167+
*
168+
* @return the byte read as an int [0-255] or -1 if no bytes are available
169+
*/
170+
public int read()
117171
{
118172
if (currentNumberOfBytes <= 0)
119173
{
120-
throw new IOException("No bytes available to read");
174+
return -1;
121175
}
122176

123177
byte b = buffer[readPos];
@@ -128,20 +182,28 @@ public byte read() throws IOException
128182
bytesReadAfterMark++;
129183
}
130184

131-
readPos++;
132-
if (readPos >= buffer.length)
185+
if (++readPos >= buffer.length)
133186
{
134187
readPos = 0;
135188
}
136189

137-
return b;
190+
int ret = b;
191+
return ret + 128;
138192
}
139193

140-
public void read(final byte[] targetBuffer, final int targetOffset, final int length) throws IOException
194+
/**
195+
* Reads bytes from the buffer.
196+
*
197+
* @param targetBuffer the target buffer to write to
198+
* @param targetOffset the target offset within the target buffer
199+
* @param length the number of bytes to read
200+
* @return the number of bytes read
201+
*/
202+
public int read(final byte[] targetBuffer, int targetOffset, int length)
141203
{
142204
if (length > currentNumberOfBytes)
143205
{
144-
throw new IOException("Not enough bytes available to read");
206+
length = currentNumberOfBytes;
145207
}
146208

147209
if (readPos + length <= buffer.length)
@@ -166,13 +228,26 @@ public void read(final byte[] targetBuffer, final int targetOffset, final int le
166228
{
167229
bytesReadAfterMark += length;
168230
}
231+
232+
return length;
169233
}
170234

235+
/**
236+
* Gets the internal buffer.
237+
*
238+
* @return the internal buffer
239+
*/
171240
public byte[] getInternalBuffer()
172241
{
173242
return buffer;
174243
}
175244

245+
/**
246+
* Marks the current read position, allowing a reset to return to this position.
247+
*
248+
* @param readLim the read limit before a reset is no longer allowed
249+
* @throws IllegalArgumentException if read limit exceeds available bytes
250+
*/
176251
public void mark(int readLim) throws IllegalArgumentException
177252
{
178253
if (readLim > buffer.length)
@@ -184,6 +259,9 @@ public void mark(int readLim) throws IllegalArgumentException
184259
bytesReadAfterMark = 0;
185260
}
186261

262+
/**
263+
* Resets the read position to the last marked position.
264+
*/
187265
public void reset()
188266
{
189267
if (markPos < 0)
@@ -198,11 +276,17 @@ public void reset()
198276
bytesReadAfterMark = 0;
199277
}
200278

201-
public long skip(long n) throws IOException
279+
/**
280+
* Skips the specified number of bytes.
281+
*
282+
* @param n the number of bytes to skip
283+
* @return the number of bytes skipped
284+
*/
285+
public int skip(int n)
202286
{
203287
if (n > currentNumberOfBytes)
204288
{
205-
throw new IOException("Not enough bytes available to skip");
289+
n = currentNumberOfBytes;
206290
}
207291

208292
readPos += n;
@@ -212,6 +296,10 @@ public long skip(long n) throws IOException
212296
}
213297

214298
currentNumberOfBytes -= n;
299+
if (markPos >= 0)
300+
{
301+
bytesReadAfterMark += n;
302+
}
215303
return n;
216304
}
217305
};

0 commit comments

Comments
 (0)