forked from alibaba/paimon-cpp
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patharrow.diff
More file actions
213 lines (195 loc) · 9.63 KB
/
arrow.diff
File metadata and controls
213 lines (195 loc) · 9.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc
index ec3890a41f..943f69bb6c 100644
--- a/cpp/src/parquet/arrow/schema.cc
+++ b/cpp/src/parquet/arrow/schema.cc
@@ -178,7 +178,7 @@ static Status GetTimestampMetadata(const ::arrow::TimestampType& type,
// The user is explicitly asking for Impala int96 encoding, there is no
// logical type.
- if (arrow_properties.support_deprecated_int96_timestamps()) {
+ if (arrow_properties.support_deprecated_int96_timestamps() && target_unit == ::arrow::TimeUnit::NANO) {
*physical_type = ParquetType::INT96;
return Status::OK();
}
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 285e2a5973..aa6f92f077 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -1013,25 +1013,32 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
return Status::OK();
}
- int64_t num_rows = 0;
+ std::vector<int64_t> num_rows;
for (int row_group : row_groups) {
- num_rows += parquet_reader()->metadata()->RowGroup(row_group)->num_rows();
+ num_rows.push_back(parquet_reader()->metadata()->RowGroup(row_group)->num_rows());
}
using ::arrow::RecordBatchIterator;
+ int row_group_idx = 0;
// NB: This lambda will be invoked outside the scope of this call to
// `GetRecordBatchReader()`, so it must capture `readers` and `batch_schema` by value.
// `this` is a non-owning pointer so we are relying on the parent FileReader outliving
// this RecordBatchReader.
::arrow::Iterator<RecordBatchIterator> batches = ::arrow::MakeFunctionIterator(
- [readers, batch_schema, num_rows,
+ [readers, batch_schema, num_rows, row_group_idx,
this]() mutable -> ::arrow::Result<RecordBatchIterator> {
::arrow::ChunkedArrayVector columns(readers.size());
- // don't reserve more rows than necessary
- int64_t batch_size = std::min(properties().batch_size(), num_rows);
- num_rows -= batch_size;
+ int64_t batch_size = 0;
+ if (!num_rows.empty()) {
+ // don't reserve more rows than necessary
+ batch_size = std::min(properties().batch_size(), num_rows[row_group_idx]);
+ num_rows[row_group_idx] -= batch_size;
+ if (num_rows[row_group_idx] == 0 && (num_rows.size() - 1) != row_group_idx) {
+ row_group_idx++;
+ }
+ }
RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
reader_properties_.use_threads(), static_cast<int>(readers.size()),
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index 4fd7ef1b47..87326a54f1 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -314,6 +314,14 @@ class FileWriterImpl : public FileWriter {
return Status::OK();
}
+ int64_t GetBufferedSize() override {
+ if (row_group_writer_ == nullptr) {
+ return 0;
+ }
+ return row_group_writer_->total_compressed_bytes() +
+ row_group_writer_->total_compressed_bytes_written();
+ }
+
Status Close() override {
if (!closed_) {
// Make idempotent
@@ -418,10 +426,13 @@ class FileWriterImpl : public FileWriter {
// Max number of rows allowed in a row group.
const int64_t max_row_group_length = this->properties().max_row_group_length();
+ const int64_t max_row_group_size = this->properties().max_row_group_size();
// Initialize a new buffered row group writer if necessary.
if (row_group_writer_ == nullptr || !row_group_writer_->buffered() ||
- row_group_writer_->num_rows() >= max_row_group_length) {
+ row_group_writer_->num_rows() >= max_row_group_length ||
+ (row_group_writer_->total_compressed_bytes_written() +
+ row_group_writer_->total_compressed_bytes() >= max_row_group_size)) {
RETURN_NOT_OK(NewBufferedRowGroup());
}
diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h
index 4a1a033a7b..0f13d05e44 100644
--- a/cpp/src/parquet/arrow/writer.h
+++ b/cpp/src/parquet/arrow/writer.h
@@ -138,6 +138,9 @@ class PARQUET_EXPORT FileWriter {
/// option in this case.
virtual ::arrow::Status WriteRecordBatch(const ::arrow::RecordBatch& batch) = 0;
+ /// \brief Return the buffered size in bytes.
+ virtual int64_t GetBufferedSize() = 0;
+
/// \brief Write the footer and close the file.
virtual ::arrow::Status Close() = 0;
virtual ~FileWriter();
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index 4d3acb491e..3906ff3c59 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -139,6 +139,7 @@ static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true;
static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = kDefaultDataPageSize;
static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 1024 * 1024;
+static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 128 * 1024 * 1024;
static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true;
static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096;
static constexpr Encoding::type DEFAULT_ENCODING = Encoding::UNKNOWN;
@@ -232,6 +233,7 @@ class PARQUET_EXPORT WriterProperties {
dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT),
write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH),
+ max_row_group_size_(DEFAULT_MAX_ROW_GROUP_SIZE),
pagesize_(kDefaultDataPageSize),
version_(ParquetVersion::PARQUET_2_6),
data_page_version_(ParquetDataPageVersion::V1),
@@ -244,6 +246,7 @@ class PARQUET_EXPORT WriterProperties {
dictionary_pagesize_limit_(properties.dictionary_pagesize_limit()),
write_batch_size_(properties.write_batch_size()),
max_row_group_length_(properties.max_row_group_length()),
+ max_row_group_size_(properties.max_row_group_size()),
pagesize_(properties.data_pagesize()),
version_(properties.version()),
data_page_version_(properties.data_page_version()),
@@ -321,6 +324,13 @@ class PARQUET_EXPORT WriterProperties {
return this;
}
+ /// Specify the max bytes size to put in a single row group.
+ /// Default 128 M.
+ Builder* max_row_group_size(int64_t max_row_group_size) {
+ max_row_group_size_ = max_row_group_size;
+ return this;
+ }
+
/// Specify the data page size.
/// Default 1MB.
Builder* data_pagesize(int64_t pg_size) {
@@ -664,7 +674,7 @@ class PARQUET_EXPORT WriterProperties {
return std::shared_ptr<WriterProperties>(new WriterProperties(
pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_,
- pagesize_, version_, created_by_, page_checksum_enabled_,
+ max_row_group_size_, pagesize_, version_, created_by_, page_checksum_enabled_,
std::move(file_encryption_properties_), default_column_properties_,
column_properties, data_page_version_, store_decimal_as_integer_,
std::move(sorting_columns_)));
@@ -675,6 +685,7 @@ class PARQUET_EXPORT WriterProperties {
int64_t dictionary_pagesize_limit_;
int64_t write_batch_size_;
int64_t max_row_group_length_;
+ int64_t max_row_group_size_;
int64_t pagesize_;
ParquetVersion::type version_;
ParquetDataPageVersion data_page_version_;
@@ -705,6 +716,8 @@ class PARQUET_EXPORT WriterProperties {
inline int64_t max_row_group_length() const { return max_row_group_length_; }
+ inline int64_t max_row_group_size() const { return max_row_group_size_; }
+
inline int64_t data_pagesize() const { return pagesize_; }
inline ParquetDataPageVersion data_page_version() const {
@@ -810,7 +823,7 @@ class PARQUET_EXPORT WriterProperties {
private:
explicit WriterProperties(
MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size,
- int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type version,
+ int64_t max_row_group_length, int64_t max_row_group_size, int64_t pagesize, ParquetVersion::type version,
const std::string& created_by, bool page_write_checksum_enabled,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties,
const ColumnProperties& default_column_properties,
@@ -821,6 +834,7 @@ class PARQUET_EXPORT WriterProperties {
dictionary_pagesize_limit_(dictionary_pagesize_limit),
write_batch_size_(write_batch_size),
max_row_group_length_(max_row_group_length),
+ max_row_group_size_(max_row_group_size),
pagesize_(pagesize),
parquet_data_page_version_(data_page_version),
parquet_version_(version),
@@ -836,6 +850,7 @@ class PARQUET_EXPORT WriterProperties {
int64_t dictionary_pagesize_limit_;
int64_t write_batch_size_;
int64_t max_row_group_length_;
+ int64_t max_row_group_size_;
int64_t pagesize_;
ParquetDataPageVersion parquet_data_page_version_;
ParquetVersion::type parquet_version_;
diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake
--- a/cpp/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake
@@ -981,6 +981,11 @@ if(CMAKE_TOOLCHAIN_FILE)
list(APPEND EP_COMMON_CMAKE_ARGS -DCMAKE_TOOLCHAIN_FILE=${CMAKE_TOOLCHAIN_FILE})
endif()
+# Compatibility with bundled dependencies that require old CMake versions.
+if(CMAKE_VERSION VERSION_GREATER_EQUAL "3.30")
+ list(APPEND EP_COMMON_CMAKE_ARGS -DCMAKE_POLICY_VERSION_MINIMUM=3.5)
+endif()
+
# and crosscompiling emulator (for try_run() )
if(CMAKE_CROSSCOMPILING_EMULATOR)
string(REPLACE ";" ${EP_LIST_SEPARATOR} EP_CMAKE_CROSSCOMPILING_EMULATOR