Skip to content

Commit

Permalink
Parquet: Optimize parquet write perf (#238)
Browse files Browse the repository at this point in the history
* fix the performance degradation issue when calling WriteTable API to write parquet file
* make the DataBuffer grow ratio configurable
  • Loading branch information
JkSelf authored May 8, 2023
1 parent bbb24af commit 80fbc8f
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 12 deletions.
7 changes: 7 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ class QueryConfig {
/// output rows.
static constexpr const char* kMaxOutputBatchRows = "max_output_batch_rows";

/// It is used when DataBuffer.reserve() method to reallocated buffer size.
static constexpr const char* kDataBufferGrowRatio = "data_buffer_grow_ratio";

static constexpr const char* kHashAdaptivityEnabled =
"driver.hash_adaptivity_enabled";

Expand Down Expand Up @@ -237,6 +240,10 @@ class QueryConfig {
return get<uint32_t>(kMaxOutputBatchRows, 10'000);
}

uint32_t dataBufferGrowRatio() const {
return get<uint32_t>(kDataBufferGrowRatio, 1);
}

bool hashAdaptivityEnabled() const {
return get<bool>(kHashAdaptivityEnabled, true);
}
Expand Down
14 changes: 9 additions & 5 deletions velox/dwio/common/DataBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class DataBuffer {
return data()[i];
}

void reserve(uint64_t capacity) {
void reserve(uint64_t capacity, uint32_t growRatio = 1) {
if (capacity <= capacity_) {
// After resetting the buffer, capacity always resets to zero.
DWIO_ENSURE_NOT_NULL(buf_);
Expand All @@ -105,15 +105,15 @@ class DataBuffer {
if (veloxRef_ != nullptr) {
DWIO_RAISE("Can't reserve on a referenced buffer");
}
const auto newSize = sizeInBytes(capacity);
const auto newSize = sizeInBytes(capacity) * growRatio;
if (buf_ == nullptr) {
buf_ = reinterpret_cast<T*>(pool_->allocate(newSize));
} else {
buf_ = reinterpret_cast<T*>(
pool_->reallocate(buf_, sizeInBytes(capacity_), newSize));
}
DWIO_ENSURE(buf_ != nullptr || newSize == 0);
capacity_ = capacity;
capacity_ = capacity * growRatio;
}

void extend(uint64_t size) {
Expand Down Expand Up @@ -141,8 +141,12 @@ class DataBuffer {
append(offset, src.data() + srcOffset, items);
}

void append(uint64_t offset, const T* FOLLY_NONNULL src, uint64_t items) {
reserve(offset + items);
void append(
uint64_t offset,
const T* FOLLY_NONNULL src,
uint64_t items,
uint32_t growRatio = 1) {
reserve(offset + items, growRatio);
unsafeAppend(offset, src, items);
}

Expand Down
5 changes: 3 additions & 2 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ void Writer::write(const RowVectorPtr& data) {
auto table = arrow::Table::Make(
recordBatch->schema(), recordBatch->columns(), data->size());
if (!arrowWriter_) {
stream_ = std::make_shared<DataBufferSink>(pool_);
stream_ = std::make_shared<DataBufferSink>(
pool_, queryCtx_->queryConfig().dataBufferGrowRatio());
auto arrowProperties = ::parquet::ArrowWriterProperties::Builder().build();
PARQUET_ASSIGN_OR_THROW(
arrowWriter_,
Expand All @@ -43,7 +44,7 @@ void Writer::write(const RowVectorPtr& data) {
arrowProperties));
}

PARQUET_THROW_NOT_OK(arrowWriter_->WriteRecordBatch(*recordBatch));
PARQUET_THROW_NOT_OK(arrowWriter_->WriteTable(*table, 10000));
}

void Writer::flush() {
Expand Down
24 changes: 19 additions & 5 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#include "velox/dwio/common/DataBuffer.h"
#include "velox/dwio/common/DataSink.h"

#include "velox/core/Context.h"
#include "velox/core/QueryConfig.h"
#include "velox/core/QueryCtx.h"
#include "velox/vector/ComplexVector.h"

#include <parquet/arrow/writer.h> // @manual
Expand All @@ -28,18 +31,24 @@ namespace facebook::velox::parquet {
// Utility for capturing Arrow output into a DataBuffer.
class DataBufferSink : public arrow::io::OutputStream {
public:
explicit DataBufferSink(memory::MemoryPool& pool) : buffer_(pool) {}
explicit DataBufferSink(memory::MemoryPool& pool, uint32_t growRatio = 1)
: buffer_(pool), growRatio_(growRatio) {}

arrow::Status Write(const std::shared_ptr<arrow::Buffer>& data) override {
buffer_.append(
buffer_.size(),
reinterpret_cast<const char*>(data->data()),
data->size());
data->size(),
growRatio_);
return arrow::Status::OK();
}

arrow::Status Write(const void* data, int64_t nbytes) override {
buffer_.append(buffer_.size(), reinterpret_cast<const char*>(data), nbytes);
buffer_.append(
buffer_.size(),
reinterpret_cast<const char*>(data),
nbytes,
growRatio_);
return arrow::Status::OK();
}

Expand All @@ -65,6 +74,7 @@ class DataBufferSink : public arrow::io::OutputStream {

private:
dwio::common::DataBuffer<char> buffer_;
uint32_t growRatio_ = 1;
};

// Writes Velox vectors into a DataSink using Arrow Parquet writer.
Expand All @@ -79,11 +89,14 @@ class Writer {
memory::MemoryPool& pool,
int32_t rowsInRowGroup,
std::shared_ptr<::parquet::WriterProperties> properties =
::parquet::WriterProperties::Builder().build())
::parquet::WriterProperties::Builder().build(),
std::shared_ptr<velox::core::QueryCtx> queryCtx =
std::make_shared<velox::core::QueryCtx>(nullptr))
: rowsInRowGroup_(rowsInRowGroup),
pool_(pool),
finalSink_(std::move(sink)),
properties_(std::move(properties)) {}
properties_(std::move(properties)),
queryCtx_(std::move(queryCtx)) {}

// Appends 'data' into the writer.
void write(const RowVectorPtr& data);
Expand Down Expand Up @@ -113,6 +126,7 @@ class Writer {
std::unique_ptr<::parquet::arrow::FileWriter> arrowWriter_;

std::shared_ptr<::parquet::WriterProperties> properties_;
std::shared_ptr<velox::core::QueryCtx> queryCtx_;
};

} // namespace facebook::velox::parquet

0 comments on commit 80fbc8f

Please sign in to comment.