Skip to content

Commit 9810a14

Browse files
authored
feat: add configs for compaction and default target file size for different table (#151)
1 parent 7d25cd5 commit 9810a14

File tree

9 files changed

+125
-24
lines changed

9 files changed

+125
-24
lines changed

include/paimon/defs.h

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ struct PAIMON_EXPORT Options {
105105
/// Default value is local.
106106
static const char FILE_SYSTEM[];
107107

108-
/// "target-file-size" - Target size of a file. Default value is 256MB.
109-
// TODO(xinyu.lxy): change the default value to 128MB for primary key table.
108+
/// "target-file-size" - Target size of a file. primary key table: the default value is 128 MB.
109+
/// append table: the default value is 256 MB.
110110
static const char TARGET_FILE_SIZE[];
111111

112112
/// "blob.target-file-size" - Target size of a blob file. Default is TARGET_FILE_SIZE.
@@ -193,6 +193,10 @@ struct PAIMON_EXPORT Options {
193193
/// cause performance issue. Default value is false.
194194
static const char SNAPSHOT_CLEAN_EMPTY_DIRECTORIES[];
195195

196+
/// "commit.force-compact" - Whether to force a compaction before commit. Default value is
197+
/// "false".
198+
static const char COMMIT_FORCE_COMPACT[];
199+
196200
/// "commit.timeout" - Timeout duration of retry when commit failed. No default value.
197201
static const char COMMIT_TIMEOUT[];
198202

@@ -289,6 +293,16 @@ struct PAIMON_EXPORT Options {
289293
static const char GLOBAL_INDEX_EXTERNAL_PATH[];
290294
/// "scan.tag-name" - Optional tag name used in case of "from-snapshot" scan mode.
291295
static const char SCAN_TAG_NAME[];
296+
/// "write-only" - If set to "true", compactions and snapshot expiration will be skipped. This
297+
/// option is used along with dedicated compact jobs. Default value is "false".
298+
/// @note: This option will be ignore until compaction is supported.
299+
static const char WRITE_ONLY[];
300+
/// "compaction.min.file-num" - For file set [f_0,...,f_N], the minimum file number to trigger a
301+
/// compaction for append-only table. Default value is 5.
302+
static const char COMPACTION_MIN_FILE_NUM[];
303+
/// "compaction.force-rewrite-all-files" - Whether to force pick all files for a full
304+
/// compaction. Usually seen in a compaction task to external paths. Default value is "false".
305+
static const char COMPACTION_FORCE_REWRITE_ALL_FILES[];
292306
};
293307

294308
static constexpr int64_t BATCH_WRITE_COMMIT_IDENTIFIER = std::numeric_limits<int64_t>::max();

src/paimon/common/defs.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ const char Options::SNAPSHOT_NUM_RETAINED_MAX[] = "snapshot.num-retained.max";
5353
const char Options::SNAPSHOT_TIME_RETAINED[] = "snapshot.time-retained";
5454
const char Options::SNAPSHOT_EXPIRE_LIMIT[] = "snapshot.expire.limit";
5555
const char Options::SNAPSHOT_CLEAN_EMPTY_DIRECTORIES[] = "snapshot.clean-empty-directories";
56+
const char Options::COMMIT_FORCE_COMPACT[] = "commit.force-compact";
5657
const char Options::COMMIT_TIMEOUT[] = "commit.timeout";
5758
const char Options::COMMIT_MAX_RETRIES[] = "commit.max-retries";
5859
const char Options::SEQUENCE_FIELD[] = "sequence.field";
@@ -82,4 +83,8 @@ const char Options::BLOB_AS_DESCRIPTOR[] = "blob-as-descriptor";
8283
const char Options::GLOBAL_INDEX_ENABLED[] = "global-index.enabled";
8384
const char Options::GLOBAL_INDEX_EXTERNAL_PATH[] = "global-index.external-path";
8485
const char Options::SCAN_TAG_NAME[] = "scan.tag-name";
86+
const char Options::WRITE_ONLY[] = "write-only";
87+
const char Options::COMPACTION_MIN_FILE_NUM[] = "compaction.min.file-num";
88+
const char Options::COMPACTION_FORCE_REWRITE_ALL_FILES[] = "compaction.force-rewrite-all-files";
89+
8590
} // namespace paimon

src/paimon/core/append/append_only_writer.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ AppendOnlyWriter::RollingFileWriterResult AppendOnlyWriter::CreateRollingRowWrit
113113
return CreateRollingBlobWriter(schemas);
114114
} else {
115115
return std::make_unique<RollingFileWriter<::ArrowArray*, std::shared_ptr<DataFileMeta>>>(
116-
options_.GetTargetFileSize(), GetDataFileWriterCreator(write_schema_, write_cols_));
116+
options_.GetTargetFileSize(/*has_primary_key=*/false),
117+
GetDataFileWriterCreator(write_schema_, write_cols_));
117118
}
118119
}
119120

@@ -192,7 +193,7 @@ AppendOnlyWriter::RollingFileWriterResult AppendOnlyWriter::CreateRollingBlobWri
192193
options_.GetBlobTargetFileSize(), single_blob_file_writer_creator);
193194
};
194195
return std::make_unique<RollingBlobFileWriter>(
195-
options_.GetTargetFileSize(),
196+
options_.GetTargetFileSize(/*has_primary_key=*/false),
196197
GetDataFileWriterCreator(schemas.main_schema, schemas.main_schema->field_names()),
197198
rolling_blob_file_writer_creator, arrow::struct_(write_schema_->fields()));
198199
}

src/paimon/core/core_options.cpp

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <cstring>
2020
#include <limits>
2121
#include <memory>
22+
#include <optional>
2223
#include <utility>
2324

2425
#include "fmt/format.h"
@@ -246,6 +247,10 @@ class ConfigParser {
246247
return Status::OK();
247248
}
248249

250+
bool ContainsKey(const std::string& key) const {
251+
return config_map_.find(key) != config_map_.end();
252+
}
253+
249254
private:
250255
const std::map<std::string, std::string> config_map_;
251256
};
@@ -254,9 +259,8 @@ class ConfigParser {
254259
// storing various configurable fields and their default values.
255260
struct CoreOptions::Impl {
256261
int64_t page_size = 64 * 1024;
257-
int64_t target_file_size = 256 * 1024 * 1024; // TODO(xinyu.lxy): target_file_size of primary
258-
// key table is 128 MB and append table is 256 MB
259-
int64_t blob_target_file_size = 256 * 1024 * 1024;
262+
std::optional<int64_t> target_file_size;
263+
std::optional<int64_t> blob_target_file_size;
260264
int64_t source_split_target_size = 128 * 1024 * 1024;
261265
int64_t source_split_open_file_cost = 4 * 1024 * 1024;
262266
int64_t manifest_target_file_size = 8 * 1024 * 1024;
@@ -293,6 +297,7 @@ struct CoreOptions::Impl {
293297
int32_t read_batch_size = 1024;
294298
int32_t write_batch_size = 1024;
295299
int32_t commit_max_retries = 10;
300+
int32_t compaction_min_file_num = 5;
296301

297302
SortOrder sequence_field_sort_order = SortOrder::ASCENDING;
298303
MergeEngine merge_engine = MergeEngine::DEDUPLICATE;
@@ -303,6 +308,7 @@ struct CoreOptions::Impl {
303308
int32_t file_compression_zstd_level = 1;
304309

305310
bool ignore_delete = false;
311+
bool write_only = false;
306312
bool deletion_vectors_enabled = false;
307313
bool force_lookup = false;
308314
bool partial_update_remove_record_on_delete = false;
@@ -313,6 +319,8 @@ struct CoreOptions::Impl {
313319
bool data_evolution_enabled = false;
314320
bool legacy_partition_name_enabled = true;
315321
bool global_index_enabled = true;
322+
bool commit_force_compact = false;
323+
bool compaction_force_rewrite_all_files = false;
316324
std::optional<std::string> global_index_external_path;
317325

318326
std::optional<std::string> scan_tag_name;
@@ -348,11 +356,17 @@ Result<CoreOptions> CoreOptions::FromMap(
348356

349357
// Parse memory size configurations
350358
PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::PAGE_SIZE, &impl->page_size));
351-
PAIMON_RETURN_NOT_OK(
352-
parser.ParseMemorySize(Options::TARGET_FILE_SIZE, &impl->target_file_size));
353-
impl->blob_target_file_size = impl->target_file_size;
354-
PAIMON_RETURN_NOT_OK(
355-
parser.ParseMemorySize(Options::BLOB_TARGET_FILE_SIZE, &impl->blob_target_file_size));
359+
if (parser.ContainsKey(Options::TARGET_FILE_SIZE)) {
360+
int64_t target_file_size;
361+
PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::TARGET_FILE_SIZE, &target_file_size));
362+
impl->target_file_size = target_file_size;
363+
}
364+
if (parser.ContainsKey(Options::BLOB_TARGET_FILE_SIZE)) {
365+
int64_t blob_target_file_size;
366+
PAIMON_RETURN_NOT_OK(
367+
parser.ParseMemorySize(Options::BLOB_TARGET_FILE_SIZE, &blob_target_file_size));
368+
impl->blob_target_file_size = blob_target_file_size;
369+
}
356370
PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::MANIFEST_TARGET_FILE_SIZE,
357371
&impl->manifest_target_file_size));
358372
PAIMON_RETURN_NOT_OK(
@@ -411,6 +425,9 @@ Result<CoreOptions> CoreOptions::FromMap(
411425
// Parse ignore delete
412426
PAIMON_RETURN_NOT_OK(parser.Parse<bool>(Options::IGNORE_DELETE, &impl->ignore_delete));
413427

428+
// Parse write-only
429+
PAIMON_RETURN_NOT_OK(parser.Parse<bool>(Options::WRITE_ONLY, &impl->write_only));
430+
414431
// Parse default agg function
415432
std::string field_default_func;
416433
PAIMON_RETURN_NOT_OK(parser.ParseString(Options::FIELDS_DEFAULT_AGG_FUNC, &field_default_func));
@@ -490,6 +507,18 @@ Result<CoreOptions> CoreOptions::FromMap(
490507
impl->scan_tag_name = scan_tag_name;
491508
}
492509

510+
// Parse commit.force-compact
511+
PAIMON_RETURN_NOT_OK(
512+
parser.Parse<bool>(Options::COMMIT_FORCE_COMPACT, &impl->commit_force_compact));
513+
514+
// Parse compaction.min.file-num
515+
PAIMON_RETURN_NOT_OK(
516+
parser.Parse(Options::COMPACTION_MIN_FILE_NUM, &impl->compaction_min_file_num));
517+
518+
// Parse compaction.force-rewrite-all-files
519+
PAIMON_RETURN_NOT_OK(parser.Parse<bool>(Options::COMPACTION_FORCE_REWRITE_ALL_FILES,
520+
&impl->compaction_force_rewrite_all_files));
521+
493522
return options;
494523
}
495524

@@ -531,12 +560,25 @@ int64_t CoreOptions::GetPageSize() const {
531560
return impl_->page_size;
532561
}
533562

534-
int64_t CoreOptions::GetTargetFileSize() const {
535-
return impl_->target_file_size;
563+
int64_t CoreOptions::GetTargetFileSize(bool has_primary_key) const {
564+
if (impl_->target_file_size == std::nullopt) {
565+
return has_primary_key ? 128 * 1024 * 1024 : 256 * 1024 * 1024;
566+
}
567+
return impl_->target_file_size.value();
536568
}
537569

538570
int64_t CoreOptions::GetBlobTargetFileSize() const {
539-
return impl_->blob_target_file_size;
571+
if (impl_->blob_target_file_size == std::nullopt) {
572+
return GetTargetFileSize(/*has_primary_key=*/false);
573+
}
574+
return impl_->blob_target_file_size.value();
575+
}
576+
577+
int64_t CoreOptions::GetCompactionFileSize(bool has_primary_key) const {
578+
// file size to join the compaction, we don't process on middle file size to avoid
579+
// compact a same file twice (the compression is not calculate so accurately. the output
580+
// file maybe be less than target file generated by rolling file write).
581+
return GetTargetFileSize(has_primary_key) / 10 * 7;
540582
}
541583

542584
std::string CoreOptions::GetPartitionDefaultName() const {
@@ -594,6 +636,10 @@ int64_t CoreOptions::GetWriteBufferSize() const {
594636
return impl_->write_buffer_size;
595637
}
596638

639+
bool CoreOptions::CommitForceCompact() const {
640+
return impl_->commit_force_compact;
641+
}
642+
597643
int64_t CoreOptions::GetCommitTimeout() const {
598644
return impl_->commit_timeout;
599645
}
@@ -602,6 +648,10 @@ int32_t CoreOptions::GetCommitMaxRetries() const {
602648
return impl_->commit_max_retries;
603649
}
604650

651+
int32_t CoreOptions::GetCompactionMinFileNum() const {
652+
return impl_->compaction_min_file_num;
653+
}
654+
605655
const ExpireConfig& CoreOptions::GetExpireConfig() const {
606656
return impl_->expire_config;
607657
}
@@ -626,6 +676,10 @@ bool CoreOptions::IgnoreDelete() const {
626676
return impl_->ignore_delete;
627677
}
628678

679+
bool CoreOptions::WriteOnly() const {
680+
return impl_->write_only;
681+
}
682+
629683
std::optional<std::string> CoreOptions::GetFieldsDefaultFunc() const {
630684
return impl_->field_default_func;
631685
}
@@ -674,6 +728,10 @@ bool CoreOptions::NeedLookup() const {
674728
impl_->force_lookup;
675729
}
676730

731+
bool CoreOptions::CompactionForceRewriteAllFiles() const {
732+
return impl_->compaction_force_rewrite_all_files;
733+
}
734+
677735
std::map<std::string, std::string> CoreOptions::GetFieldsSequenceGroups() const {
678736
auto raw_options = impl_->raw_options;
679737
std::map<std::string, std::string> sequence_groups;

src/paimon/core/core_options.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,9 @@ class PAIMON_EXPORT CoreOptions {
5757
const std::string& GetFileCompression() const;
5858
int32_t GetFileCompressionZstdLevel() const;
5959
int64_t GetPageSize() const;
60-
int64_t GetTargetFileSize() const;
60+
int64_t GetTargetFileSize(bool has_primary_key) const;
6161
int64_t GetBlobTargetFileSize() const;
62+
int64_t GetCompactionFileSize(bool has_primary_key) const;
6263
std::string GetPartitionDefaultName() const;
6364

6465
std::shared_ptr<FileFormat> GetManifestFormat() const;
@@ -78,14 +79,18 @@ class PAIMON_EXPORT CoreOptions {
7879

7980
const ExpireConfig& GetExpireConfig() const;
8081

82+
bool CommitForceCompact() const;
83+
bool CompactionForceRewriteAllFiles() const;
8184
int64_t GetCommitTimeout() const;
8285
int32_t GetCommitMaxRetries() const;
86+
int32_t GetCompactionMinFileNum() const;
8387

8488
const std::vector<std::string>& GetSequenceField() const;
8589
bool SequenceFieldSortOrderIsAscending() const;
8690
MergeEngine GetMergeEngine() const;
8791
SortEngine GetSortEngine() const;
8892
bool IgnoreDelete() const;
93+
bool WriteOnly() const;
8994

9095
std::optional<std::string> GetFieldsDefaultFunc() const;
9196
Result<std::optional<std::string>> GetFieldAggFunc(const std::string& field_name) const;

src/paimon/core/core_options_test.cpp

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,12 @@ TEST(CoreOptionsTest, TestDefaultValue) {
3737
ASSERT_TRUE(core_options.GetFileSystem());
3838
ASSERT_EQ(-1, core_options.GetBucket());
3939
ASSERT_EQ(64 * 1024L, core_options.GetPageSize());
40-
ASSERT_EQ(256 * 1024 * 1024L, core_options.GetTargetFileSize());
40+
ASSERT_EQ(256 * 1024 * 1024L, core_options.GetTargetFileSize(/*has_primary_key=*/false));
41+
ASSERT_EQ(128 * 1024 * 1024L, core_options.GetTargetFileSize(/*has_primary_key=*/true));
4142
ASSERT_EQ(256 * 1024 * 1024L, core_options.GetBlobTargetFileSize());
43+
ASSERT_EQ(187904815, core_options.GetCompactionFileSize(/*has_primary_key=*/false));
44+
ASSERT_EQ(93952404, core_options.GetCompactionFileSize(/*has_primary_key=*/true));
45+
4246
ASSERT_EQ("__DEFAULT_PARTITION__", core_options.GetPartitionDefaultName());
4347
ASSERT_EQ(std::nullopt, core_options.GetScanSnapshotId());
4448
ASSERT_EQ("zstd", core_options.GetFileCompression());
@@ -53,6 +57,7 @@ TEST(CoreOptionsTest, TestDefaultValue) {
5357
ASSERT_EQ(1024, core_options.GetReadBatchSize());
5458
ASSERT_EQ(1024, core_options.GetWriteBatchSize());
5559
ASSERT_EQ(256 * 1024 * 1024, core_options.GetWriteBufferSize());
60+
ASSERT_FALSE(core_options.CommitForceCompact());
5661
ASSERT_EQ(std::numeric_limits<int64_t>::max(), core_options.GetCommitTimeout());
5762
ASSERT_EQ(10, core_options.GetCommitMaxRetries());
5863
ExpireConfig expire_config = core_options.GetExpireConfig();
@@ -66,6 +71,9 @@ TEST(CoreOptionsTest, TestDefaultValue) {
6671
ASSERT_EQ(MergeEngine::DEDUPLICATE, core_options.GetMergeEngine());
6772
ASSERT_EQ(SortEngine::LOSER_TREE, core_options.GetSortEngine());
6873
ASSERT_FALSE(core_options.IgnoreDelete());
74+
ASSERT_FALSE(core_options.WriteOnly());
75+
ASSERT_EQ(5, core_options.GetCompactionMinFileNum());
76+
ASSERT_FALSE(core_options.CompactionForceRewriteAllFiles());
6977
ASSERT_EQ(std::nullopt, core_options.GetFieldsDefaultFunc());
7078
ASSERT_EQ(std::nullopt, core_options.GetFieldAggFunc("f0").value());
7179
ASSERT_FALSE(core_options.FieldAggIgnoreRetract("f1").value());
@@ -109,6 +117,7 @@ TEST(CoreOptionsTest, TestFromMap) {
109117
{Options::READ_BATCH_SIZE, "2048"},
110118
{Options::WRITE_BUFFER_SIZE, "16MB"},
111119
{Options::WRITE_BATCH_SIZE, "1234"},
120+
{Options::COMMIT_FORCE_COMPACT, "true"},
112121
{Options::COMMIT_TIMEOUT, "120s"},
113122
{Options::COMMIT_MAX_RETRIES, "20"},
114123
{Options::SCAN_SNAPSHOT_ID, "5"},
@@ -148,7 +157,9 @@ TEST(CoreOptionsTest, TestFromMap) {
148157
{Options::GLOBAL_INDEX_ENABLED, "false"},
149158
{Options::GLOBAL_INDEX_EXTERNAL_PATH, "FILE:///tmp/global_index/"},
150159
{Options::SCAN_TAG_NAME, "test-tag"},
151-
};
160+
{Options::WRITE_ONLY, "true"},
161+
{Options::COMPACTION_MIN_FILE_NUM, "10"},
162+
{Options::COMPACTION_FORCE_REWRITE_ALL_FILES, "true"}};
152163

153164
ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options));
154165
auto fs = core_options.GetFileSystem();
@@ -162,7 +173,8 @@ TEST(CoreOptionsTest, TestFromMap) {
162173

163174
ASSERT_EQ(3, core_options.GetBucket());
164175
ASSERT_EQ(128 * 1024L, core_options.GetPageSize());
165-
ASSERT_EQ(512 * 1024 * 1024L, core_options.GetTargetFileSize());
176+
ASSERT_EQ(512 * 1024 * 1024L, core_options.GetTargetFileSize(/*has_primary_key=*/true));
177+
ASSERT_EQ(512 * 1024 * 1024L, core_options.GetTargetFileSize(/*has_primary_key=*/false));
166178
ASSERT_EQ(1024 * 1024 * 1024L, core_options.GetBlobTargetFileSize());
167179
ASSERT_EQ("foo", core_options.GetPartitionDefaultName());
168180
ASSERT_EQ(16 * 1024 * 1024L, core_options.GetManifestTargetFileSize());
@@ -173,6 +185,7 @@ TEST(CoreOptionsTest, TestFromMap) {
173185
ASSERT_EQ(2048, core_options.GetReadBatchSize());
174186
ASSERT_EQ(1234, core_options.GetWriteBatchSize());
175187
ASSERT_EQ(16 * 1024 * 1024, core_options.GetWriteBufferSize());
188+
ASSERT_TRUE(core_options.CommitForceCompact());
176189
ASSERT_EQ(120 * 1000, core_options.GetCommitTimeout());
177190
ASSERT_EQ(20, core_options.GetCommitMaxRetries());
178191
ASSERT_EQ(5, core_options.GetScanSnapshotId().value_or(-1));
@@ -220,7 +233,12 @@ TEST(CoreOptionsTest, TestFromMap) {
220233
ASSERT_EQ(core_options.GetGlobalIndexExternalPath().value(), "FILE:///tmp/global_index/");
221234
ASSERT_EQ("test-tag", core_options.GetScanTagName().value());
222235
ASSERT_EQ(StartupMode::FromSnapshot(), core_options.GetStartupMode());
223-
}
236+
ASSERT_EQ(375809637, core_options.GetCompactionFileSize(/*has_primary_key=*/true));
237+
ASSERT_EQ(375809637, core_options.GetCompactionFileSize(/*has_primary_key=*/false));
238+
ASSERT_TRUE(core_options.WriteOnly());
239+
ASSERT_EQ(10, core_options.GetCompactionMinFileNum());
240+
ASSERT_TRUE(core_options.CompactionForceRewriteAllFiles());
241+
} // namespace paimon::test
224242

225243
TEST(CoreOptionsTest, TestInvalidCase) {
226244
ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::BUCKET, "3.5"}}),

src/paimon/core/io/data_file_meta.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,8 @@ struct DataFileMeta {
136136
std::vector<std::optional<std::string>> extra_files;
137137
Timestamp creation_time;
138138

139-
// row_count = addRowCount + deleteRowCount
140-
// Why don't we keep addRowCount and deleteRowCount?
139+
// row_count = add_row_count + delete_row_count
140+
// Why don't we keep add_row_count and delete_row_count?
141141
// Because in previous versions of DataFileMeta, we only keep row_count.
142142
// We have to keep the compatibility.
143143
std::optional<int64_t> delete_row_count;

src/paimon/core/mergetree/merge_tree_writer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ MergeTreeWriter::CreateRollingRowWriter() const {
204204
return writer;
205205
};
206206
return std::make_unique<RollingFileWriter<KeyValueBatch, std::shared_ptr<DataFileMeta>>>(
207-
options_.GetTargetFileSize(), create_file_writer);
207+
options_.GetTargetFileSize(/*has_primary_key=*/true), create_file_writer);
208208
}
209209

210210
Result<int64_t> MergeTreeWriter::EstimateMemoryUse(const std::shared_ptr<arrow::Array>& array) {

src/paimon/core/postpone/postpone_bucket_writer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ PostponeBucketWriter::CreateRollingRowWriter() const {
261261
return writer;
262262
};
263263
return std::make_unique<RollingFileWriter<KeyValueBatch, std::shared_ptr<DataFileMeta>>>(
264-
options_.GetTargetFileSize(), create_file_writer);
264+
options_.GetTargetFileSize(/*has_primary_key=*/true), create_file_writer);
265265
}
266266

267267
Status PostponeBucketWriter::Flush() {

0 commit comments

Comments
 (0)