Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions docs/content/primary-key-table/pk-clustering-override.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ CREATE TABLE my_table (
After this, data files within each bucket will be physically sorted by `city` instead of `id`. Queries like
`SELECT * FROM my_table WHERE city = 'Beijing'` can skip irrelevant data files by checking their min/max statistics
on the clustering column.

s
## How It Works

PK Clustering Override replaces the default LSM compaction with a two-phase clustering compaction:
Expand Down Expand Up @@ -82,16 +82,6 @@ temporary files to reduce memory consumption, preventing OOM during multi-way me
| `clustering.columns` | Must be set (one or more non-primary-key columns) |
| `deletion-vectors.enabled` | Must be `true` |
| `merge-engine` | `deduplicate` (default) or `first-row` only |
| `sequence.fields` | Must **not** be set |
| `record-level.expire-time` | Must **not** be set |

## Related Options

| Option | Default | Description |
|--------|---------|-------------|
| `clustering.columns` | (none) | Comma-separated column names used as the physical sort order for data files. |
| `sort-spill-threshold` | (auto) | When the number of merge readers exceeds this value, smaller files are spilled to row-based temp files to reduce memory usage. |
| `sort-spill-buffer-size` | `64 mb` | Buffer size used for external sort during Phase 1 rewrite. |

## When to Use

Expand All @@ -106,3 +96,4 @@ It is **not** suitable when:
- Point lookups by primary key are the dominant access pattern (default LSM sort is already optimal).
- You need `partial-update` or `aggregation` merge engine.
- `sequence.fields` or `record-level.expire-time` is required.
- Changelog producer`lookup` or `full-compaction` is required.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.List;
import java.util.concurrent.ExecutorService;

import static org.apache.paimon.schema.SchemaValidation.validatePkClusteringOverride;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** Factory to create {@link ClusteringCompactManager}. */
Expand Down Expand Up @@ -66,32 +67,7 @@ public ClusteringCompactManagerFactory(
this.keyType = keyType;
this.valueType = valueType;
this.cacheManager = cacheManager;

if (options.clusteringColumns().isEmpty()) {
throw new IllegalArgumentException(
"Cannot support 'pk-clustering-override' mode without 'clustering.columns'.");
}
if (!options.deletionVectorsEnabled()) {
throw new UnsupportedOperationException(
"Cannot support deletion-vectors disabled in 'pk-clustering-override' mode.");
}
if (options.recordLevelExpireTime() != null) {
throw new UnsupportedOperationException(
"Cannot support record level expire time enabled in 'pk-clustering-override' mode.");
}
if (options.mergeEngine() != CoreOptions.MergeEngine.DEDUPLICATE
&& options.mergeEngine() != CoreOptions.MergeEngine.FIRST_ROW) {
throw new UnsupportedOperationException(
"Cannot support merge engine: "
+ options.mergeEngine()
+ " in 'pk-clustering-override' mode.");
}
if (!options.sequenceField().isEmpty()) {
throw new UnsupportedOperationException(
"Cannot support sequence field: "
+ options.sequenceField()
+ " in 'pk-clustering-override' mode.");
}
validatePkClusteringOverride(options);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ public static void validateTableSchema(TableSchema schema) {
validateChainTable(schema, options);

validateChangelogReadSequenceNumber(schema, options);

validatePkClusteringOverride(options);
}

public static void validateFallbackBranch(SchemaManager schemaManager, TableSchema schema) {
Expand Down Expand Up @@ -795,4 +797,42 @@ private static void validateChangelogReadSequenceNumber(
CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key());
}
}

public static void validatePkClusteringOverride(CoreOptions options) {
if (options.pkClusteringOverride()) {
if (options.clusteringColumns().isEmpty()) {
throw new IllegalArgumentException(
"Cannot support 'pk-clustering-override' mode without 'clustering.columns'.");
}
if (!options.deletionVectorsEnabled()) {
throw new UnsupportedOperationException(
"Cannot support deletion-vectors disabled in 'pk-clustering-override' mode.");
}
if (options.recordLevelExpireTime() != null) {
throw new UnsupportedOperationException(
"Cannot support record level expire time enabled in 'pk-clustering-override' mode.");
}
if (options.mergeEngine() != CoreOptions.MergeEngine.DEDUPLICATE
&& options.mergeEngine() != CoreOptions.MergeEngine.FIRST_ROW) {
throw new UnsupportedOperationException(
"Cannot support merge engine: "
+ options.mergeEngine()
+ " in 'pk-clustering-override' mode.");
}
if (!options.sequenceField().isEmpty()) {
throw new UnsupportedOperationException(
"Cannot support sequence field: "
+ options.sequenceField()
+ " in 'pk-clustering-override' mode.");
}
ChangelogProducer changelogProducer = options.changelogProducer();
if (changelogProducer != ChangelogProducer.NONE
&& changelogProducer != ChangelogProducer.INPUT) {
throw new UnsupportedOperationException(
"Cannot support changelog producer: "
+ changelogProducer
+ " in 'pk-clustering-override' mode.");
}
}
}
}
Loading