Skip to content

Commit 1da71ba

Browse files
authored
apacheGH-43060: [C++] Limit buffer size in BufferedInputStream::SetBufferSize with raw_read_bound (apache#43064)
### Rationale for this change See apache#43060 . This patch optimize memory-usage for buffering ### What changes are included in this PR? Limit the `buffer_size_` used by `SetBufferSize` ### Are these changes tested? Yes ### Are there any user-facing changes? No * GitHub Issue: apache#43060 Authored-by: mwish <[email protected]> Signed-off-by: mwish <[email protected]>
1 parent 6680dcf commit 1da71ba

File tree

2 files changed

+30
-2
lines changed

2 files changed

+30
-2
lines changed

cpp/src/arrow/io/buffered.cc

+9
Original file line numberDiff line numberDiff line change
@@ -282,13 +282,20 @@ class BufferedInputStream::Impl : public BufferedBase {
282282
return raw_pos_ - bytes_buffered_;
283283
}
284284

285+
// Resize internal read buffer. Note that the internal buffer-size
286+
// should be not larger than the raw_read_bound_.
285287
Status SetBufferSize(int64_t new_buffer_size) {
286288
if (new_buffer_size <= 0) {
287289
return Status::Invalid("Buffer size should be positive");
288290
}
289291
if ((buffer_pos_ + bytes_buffered_) >= new_buffer_size) {
290292
return Status::Invalid("Cannot shrink read buffer if buffered data remains");
291293
}
294+
if (raw_read_bound_ >= 0) {
295+
// No need to reserve space for more than the total remaining number of bytes.
296+
new_buffer_size = std::min(new_buffer_size,
297+
bytes_buffered_ + (raw_read_bound_ - raw_read_total_));
298+
}
292299
return ResizeBuffer(new_buffer_size);
293300
}
294301

@@ -433,6 +440,8 @@ class BufferedInputStream::Impl : public BufferedBase {
433440
private:
434441
std::shared_ptr<InputStream> raw_;
435442
int64_t raw_read_total_;
443+
// a bound on the maximum number of bytes to read from the raw input stream.
444+
// The default -1 indicates that it is unbounded
436445
int64_t raw_read_bound_;
437446

438447
// Number of remaining bytes in the buffer, to be reduced on each read from

cpp/src/arrow/io/buffered_test.cc

+21-2
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,8 @@ class TestBufferedInputStream : public FileTestFixture<BufferedInputStream> {
329329
local_pool_ = MemoryPool::CreateDefault();
330330
}
331331

332-
void MakeExample1(int64_t buffer_size, MemoryPool* pool = default_memory_pool()) {
332+
void MakeExample1(int64_t buffer_size, MemoryPool* pool = default_memory_pool(),
333+
int64_t raw_read_bound = -1) {
333334
test_data_ = kExample1;
334335

335336
ASSERT_OK_AND_ASSIGN(auto file_out, FileOutputStream::Open(path_));
@@ -338,7 +339,8 @@ class TestBufferedInputStream : public FileTestFixture<BufferedInputStream> {
338339

339340
ASSERT_OK_AND_ASSIGN(auto file_in, ReadableFile::Open(path_));
340341
raw_ = file_in;
341-
ASSERT_OK_AND_ASSIGN(buffered_, BufferedInputStream::Create(buffer_size, pool, raw_));
342+
ASSERT_OK_AND_ASSIGN(
343+
buffered_, BufferedInputStream::Create(buffer_size, pool, raw_, raw_read_bound));
342344
}
343345

344346
protected:
@@ -472,6 +474,23 @@ TEST_F(TestBufferedInputStream, SetBufferSize) {
472474
ASSERT_OK(buffered_->SetBufferSize(5));
473475
}
474476

477+
// GH-43060: Internal buffer should not greater than the
478+
// bytes could buffer.
479+
TEST_F(TestBufferedInputStream, BufferSizeLimit) {
480+
{
481+
// Buffer size should not exceeds raw_read_bound
482+
MakeExample1(/*buffer_size=*/100000, default_memory_pool(), /*raw_read_bound=*/15);
483+
EXPECT_EQ(15, buffered_->buffer_size());
484+
}
485+
{
486+
// Set a buffer size after read.
487+
MakeExample1(/*buffer_size=*/10, default_memory_pool(), /*raw_read_bound=*/15);
488+
ASSERT_OK(buffered_->Read(10));
489+
ASSERT_OK(buffered_->SetBufferSize(/*new_buffer_size=*/100000));
490+
EXPECT_EQ(5, buffered_->buffer_size());
491+
}
492+
}
493+
475494
class TestBufferedInputStreamBound : public ::testing::Test {
476495
public:
477496
void SetUp() { CreateExample(/*bounded=*/true); }

0 commit comments

Comments
 (0)