3
3
#include " duckdb.hpp"
4
4
#include " geo_parquet.hpp"
5
5
#include " parquet_dbp_encoder.hpp"
6
+ #include " parquet_dlba_encoder.hpp"
6
7
#include " parquet_rle_bp_decoder.hpp"
7
8
#include " parquet_rle_bp_encoder.hpp"
9
+ #include " parquet_bss_encoder.hpp"
8
10
#include " parquet_statistics.hpp"
9
11
#include " parquet_writer.hpp"
10
12
#ifndef DUCKDB_AMALGAMATION
@@ -1059,34 +1061,42 @@ class StandardColumnWriterState : public BasicColumnWriterState {
1059
1061
}
1060
1062
~StandardColumnWriterState () override = default ;
1061
1063
1062
- // analysis state for integer values for DELTA_BINARY_PACKED
1064
+ // analysis state for integer values for DELTA_BINARY_PACKED/DELTA_LENGTH_BYTE_ARRAY
1063
1065
idx_t total_value_count = 0 ;
1066
+ idx_t total_string_size = 0 ;
1064
1067
1065
1068
unordered_map<T, uint32_t > dictionary;
1066
1069
duckdb_parquet::Encoding::type encoding;
1067
1070
};
1068
1071
1069
- template <class T >
1072
+ template <class SRC , class TGT >
1070
1073
class StandardWriterPageState : public ColumnWriterPageState {
1071
1074
public:
1072
- explicit StandardWriterPageState (const idx_t total_value_count, Encoding::type encoding_p,
1073
- const unordered_map<T, uint32_t > &dictionary_p)
1074
- : encoding(encoding_p), dbp_initialized(false ), dbp_encoder(total_value_count), dictionary(dictionary_p),
1075
- dict_written_value(false ), dict_bit_width(RleBpDecoder::ComputeBitWidth(dictionary.size())),
1076
- dict_encoder(dict_bit_width) {
1075
+ explicit StandardWriterPageState (const idx_t total_value_count, const idx_t total_string_size,
1076
+ Encoding::type encoding_p, const unordered_map<SRC, uint32_t > &dictionary_p)
1077
+ : encoding(encoding_p), dbp_initialized(false ), dbp_encoder(total_value_count), dlba_initialized(false ),
1078
+ dlba_encoder(total_value_count, total_string_size), bss_encoder(total_value_count, sizeof (TGT)),
1079
+ dictionary(dictionary_p), dict_written_value(false ),
1080
+ dict_bit_width(RleBpDecoder::ComputeBitWidth(dictionary.size())), dict_encoder(dict_bit_width) {
1077
1081
}
1078
1082
duckdb_parquet::Encoding::type encoding;
1079
1083
1080
1084
bool dbp_initialized;
1081
1085
DbpEncoder dbp_encoder;
1082
1086
1083
- const unordered_map<T, uint32_t > &dictionary;
1087
+ bool dlba_initialized;
1088
+ DlbaEncoder dlba_encoder;
1089
+
1090
+ BssEncoder bss_encoder;
1091
+
1092
+ const unordered_map<SRC, uint32_t > &dictionary;
1084
1093
bool dict_written_value;
1085
1094
uint32_t dict_bit_width;
1086
1095
RleBpEncoder dict_encoder;
1087
1096
};
1088
1097
1089
1098
namespace dbp_encoder {
1099
+
1090
1100
template <class T >
1091
1101
void BeginWrite (DbpEncoder &encoder, WriteStream &writer, const T &first_value) {
1092
1102
throw InternalException (" Can't write type to DELTA_BINARY_PACKED column" );
@@ -1139,6 +1149,60 @@ void WriteValue(DbpEncoder &encoder, WriteStream &writer, const uint32_t &value)
1139
1149
1140
1150
} // namespace dbp_encoder
1141
1151
1152
+ namespace dlba_encoder {
1153
+
1154
+ template <class T >
1155
+ void BeginWrite (DlbaEncoder &encoder, WriteStream &writer, const T &first_value) {
1156
+ throw InternalException (" Can't write type to DELTA_LENGTH_BYTE_ARRAY column" );
1157
+ }
1158
+
1159
+ template <>
1160
+ void BeginWrite (DlbaEncoder &encoder, WriteStream &writer, const string_t &first_value) {
1161
+ encoder.BeginWrite (writer, first_value);
1162
+ }
1163
+
1164
+ template <class T >
1165
+ void WriteValue (DlbaEncoder &encoder, WriteStream &writer, const T &value) {
1166
+ throw InternalException (" Can't write type to DELTA_LENGTH_BYTE_ARRAY column" );
1167
+ }
1168
+
1169
+ template <>
1170
+ void WriteValue (DlbaEncoder &encoder, WriteStream &writer, const string_t &value) {
1171
+ encoder.WriteValue (writer, value);
1172
+ }
1173
+
1174
+ // helpers to get size from strings
1175
+ template <class SRC >
1176
+ static constexpr idx_t GetDlbaStringSize (const SRC &src_value) {
1177
+ return 0 ;
1178
+ }
1179
+
1180
+ template <>
1181
+ idx_t GetDlbaStringSize (const string_t &src_value) {
1182
+ return src_value.GetSize ();
1183
+ }
1184
+
1185
+ } // namespace dlba_encoder
1186
+
1187
+ namespace bss_encoder {
1188
+
1189
+ template <class T >
1190
+ void WriteValue (BssEncoder &encoder, const T &value) {
1191
+ throw InternalException (" Can't write type to BYTE_STREAM_SPLIT column" );
1192
+ }
1193
+
1194
+ template <>
1195
+ void WriteValue (BssEncoder &encoder, const float &value) {
1196
+ encoder.WriteValue (value);
1197
+ }
1198
+
1199
+ template <>
1200
+ void WriteValue (BssEncoder &encoder, const double &value) {
1201
+ encoder.WriteValue (value);
1202
+ }
1203
+
1204
+ } // namespace bss_encoder
1205
+
1142
1206
template <class SRC , class TGT , class OP = ParquetCastOperator>
1143
1207
class StandardColumnWriter : public BasicColumnWriter {
1144
1208
public:
@@ -1159,13 +1223,13 @@ class StandardColumnWriter : public BasicColumnWriter {
1159
1223
unique_ptr<ColumnWriterPageState> InitializePageState (BasicColumnWriterState &state_p) override {
1160
1224
auto &state = state_p.Cast <StandardColumnWriterState<SRC>>();
1161
1225
1162
- auto result =
1163
- make_uniq<StandardWriterPageState<SRC>>(state. total_value_count , state.encoding , state.dictionary );
1226
+ auto result = make_uniq<StandardWriterPageState<SRC, TGT>>(state. total_value_count , state. total_string_size ,
1227
+ state.encoding , state.dictionary );
1164
1228
return std::move (result);
1165
1229
}
1166
1230
1167
1231
void FlushPageState (WriteStream &temp_writer, ColumnWriterPageState *state_p) override {
1168
- auto &page_state = state_p->Cast <StandardWriterPageState<SRC>>();
1232
+ auto &page_state = state_p->Cast <StandardWriterPageState<SRC, TGT >>();
1169
1233
switch (page_state.encoding ) {
1170
1234
case Encoding::DELTA_BINARY_PACKED:
1171
1235
if (!page_state.dbp_initialized ) {
@@ -1182,7 +1246,15 @@ class StandardColumnWriter : public BasicColumnWriter {
1182
1246
return ;
1183
1247
}
1184
1248
page_state.dict_encoder .FinishWrite (temp_writer);
1185
-
1249
+ break ;
1250
+ case Encoding::DELTA_LENGTH_BYTE_ARRAY:
1251
+ if (!page_state.dlba_initialized ) {
1252
+ dlba_encoder::BeginWrite<string_t >(page_state.dlba_encoder , temp_writer, string_t (" " ));
1253
+ }
1254
+ page_state.dlba_encoder .FinishWrite (temp_writer);
1255
+ break ;
1256
+ case Encoding::BYTE_STREAM_SPLIT:
1257
+ page_state.bss_encoder .FinishWrite (temp_writer);
1186
1258
break ;
1187
1259
case Encoding::PLAIN:
1188
1260
break ;
@@ -1220,14 +1292,15 @@ class StandardColumnWriter : public BasicColumnWriter {
1220
1292
continue ;
1221
1293
}
1222
1294
if (validity.RowIsValid (vector_index)) {
1295
+ const auto &src_value = data_ptr[vector_index];
1223
1296
if (state.dictionary .size () <= writer.DictionarySizeLimit ()) {
1224
- const auto &src_value = data_ptr[vector_index];
1225
1297
if (state.dictionary .find (src_value) == state.dictionary .end ()) {
1226
1298
state.dictionary [src_value] = new_value_index;
1227
1299
new_value_index++;
1228
1300
}
1229
1301
}
1230
1302
state.total_value_count ++;
1303
+ state.total_string_size += dlba_encoder::GetDlbaStringSize (src_value);
1231
1304
}
1232
1305
vector_index++;
1233
1306
}
@@ -1238,9 +1311,22 @@ class StandardColumnWriter : public BasicColumnWriter {
1238
1311
1239
1312
auto &state = state_p.Cast <StandardColumnWriterState<SRC>>();
1240
1313
if (state.dictionary .size () == 0 || state.dictionary .size () > writer.DictionarySizeLimit ()) {
1241
- // special handling for int column: dpb, otherwise plain
1242
- state.encoding = (type == Type::type::INT32 || type == Type::type::INT64) ? Encoding::DELTA_BINARY_PACKED
1243
- : Encoding::PLAIN;
1314
+ // If we aren't doing dictionary encoding, the following encodings are virtually always better than PLAIN
1315
+ switch (type) {
1316
+ case Type::type::INT32:
1317
+ case Type::type::INT64:
1318
+ state.encoding = Encoding::DELTA_BINARY_PACKED;
1319
+ break ;
1320
+ case Type::type::BYTE_ARRAY:
1321
+ state.encoding = Encoding::DELTA_LENGTH_BYTE_ARRAY;
1322
+ break ;
1323
+ case Type::type::FLOAT:
1324
+ case Type::type::DOUBLE:
1325
+ state.encoding = Encoding::BYTE_STREAM_SPLIT;
1326
+ break ;
1327
+ default :
1328
+ state.encoding = Encoding::PLAIN;
1329
+ }
1244
1330
state.dictionary .clear ();
1245
1331
}
1246
1332
}
@@ -1261,7 +1347,7 @@ class StandardColumnWriter : public BasicColumnWriter {
1261
1347
1262
1348
void WriteVector (WriteStream &temp_writer, ColumnWriterStatistics *stats, ColumnWriterPageState *page_state_p,
1263
1349
Vector &input_column, idx_t chunk_start, idx_t chunk_end) override {
1264
- auto &page_state = page_state_p->Cast <StandardWriterPageState<SRC>>();
1350
+ auto &page_state = page_state_p->Cast <StandardWriterPageState<SRC, TGT >>();
1265
1351
1266
1352
const auto &mask = FlatVector::Validity (input_column);
1267
1353
const auto *data_ptr = FlatVector::GetData<SRC>(input_column);
@@ -1287,7 +1373,6 @@ class StandardColumnWriter : public BasicColumnWriter {
1287
1373
}
1288
1374
break ;
1289
1375
}
1290
-
1291
1376
case Encoding::DELTA_BINARY_PACKED: {
1292
1377
idx_t r = chunk_start;
1293
1378
if (!page_state.dbp_initialized ) {
@@ -1315,6 +1400,44 @@ class StandardColumnWriter : public BasicColumnWriter {
1315
1400
}
1316
1401
break ;
1317
1402
}
1403
+ case Encoding::DELTA_LENGTH_BYTE_ARRAY: {
1404
+ idx_t r = chunk_start;
1405
+ if (!page_state.dlba_initialized ) {
1406
+ // find first non-null value
1407
+ for (; r < chunk_end; r++) {
1408
+ if (!mask.RowIsValid (r)) {
1409
+ continue ;
1410
+ }
1411
+ const TGT target_value = OP::template Operation<SRC, TGT>(data_ptr[r]);
1412
+ OP::template HandleStats<SRC, TGT>(stats, target_value);
1413
+ dlba_encoder::BeginWrite (page_state.dlba_encoder , temp_writer, target_value);
1414
+ page_state.dlba_initialized = true ;
1415
+ r++; // skip over
1416
+ break ;
1417
+ }
1418
+ }
1419
+
1420
+ for (; r < chunk_end; r++) {
1421
+ if (!mask.RowIsValid (r)) {
1422
+ continue ;
1423
+ }
1424
+ const TGT target_value = OP::template Operation<SRC, TGT>(data_ptr[r]);
1425
+ OP::template HandleStats<SRC, TGT>(stats, target_value);
1426
+ dlba_encoder::WriteValue (page_state.dlba_encoder , temp_writer, target_value);
1427
+ }
1428
+ break ;
1429
+ }
1430
+ case Encoding::BYTE_STREAM_SPLIT: {
1431
+ for (idx_t r = chunk_start; r < chunk_end; r++) {
1432
+ if (!mask.RowIsValid (r)) {
1433
+ continue ;
1434
+ }
1435
+ const TGT target_value = OP::template Operation<SRC, TGT>(data_ptr[r]);
1436
+ OP::template HandleStats<SRC, TGT>(stats, target_value);
1437
+ bss_encoder::WriteValue (page_state.bss_encoder , target_value);
1438
+ }
1439
+ break ;
1440
+ }
1318
1441
case Encoding::PLAIN: {
1319
1442
D_ASSERT (page_state.encoding == Encoding::PLAIN);
1320
1443
TemplatedWritePlain<SRC, TGT, OP>(input_column, stats, chunk_start, chunk_end, mask, temp_writer);
0 commit comments