Skip to content

Commit 003d86a

Browse files
evanvdiaalexjo2144
authored andcommitted
Add Support for Iceberg table sort orders
Cherry-pick of trinodb/trino#14891 Co-authored-by: Alexander Jo <[email protected]>
1 parent 069a16f commit 003d86a

35 files changed

+1330
-82
lines changed

presto-docs/src/main/sphinx/connector/iceberg.rst

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ Property Name Description
258258

259259
Example: ``hdfs://nn:8020/warehouse/path``
260260
This property is required if the ``iceberg.catalog.type`` is
261-
``hadoop``.
261+
``hadoop``. Otherwise, it will be ignored.
262262

263263
``iceberg.catalog.cached-catalog-num`` The number of Iceberg catalogs to cache. This property is ``10``
264264
required if the ``iceberg.catalog.type`` is ``hadoop``.
@@ -1835,3 +1835,78 @@ Map of PrestoDB types to the relevant Iceberg types:
18351835

18361836

18371837
No other types are supported.
1838+
1839+
1840+
Sorted Tables
1841+
^^^^^^^^^^^^^
1842+
1843+
The Iceberg connector supports the creation of sorted tables.
1844+
Data in the Iceberg table is sorted as each file is written.
1845+
1846+
Sorted Iceberg tables can decrease query execution time in many cases; but query times can also depend on the query shape and cluster configuration.
1847+
Sorting is particularly beneficial when the sorted columns have a
1848+
high cardinality and are used as a filter for selective reads.
1849+
1850+
Configure sort order with the ``sorted_by`` table property to specify an array of
1851+
one or more columns to use for sorting.
1852+
The following example creates the table with the ``sorted_by`` property, and sorts the file based
1853+
on the field ``join_date``. The default sort direction is ASC, with null values ordered as NULLS FIRST.
1854+
1855+
.. code-block:: text
1856+
1857+
CREATE TABLE emp.employees.employee (
1858+
emp_id BIGINT,
1859+
emp_name VARCHAR,
1860+
join_date DATE,
1861+
country VARCHAR)
1862+
WITH (
1863+
sorted_by = ARRAY['join_date']
1864+
)
1865+
1866+
Explicitly configure sort directions or null ordering using the following example::
1867+
1868+
CREATE TABLE emp.employees.employee (
1869+
emp_id BIGINT,
1870+
emp_name VARCHAR,
1871+
join_date DATE,
1872+
country VARCHAR)
1873+
WITH (
1874+
sorted_by = ARRAY['join_date DESC NULLS FIRST', 'emp_id ASC NULLS LAST']
1875+
)
1876+
1877+
Sorting can be combined with partitioning on the same column. For example::
1878+
1879+
CREATE TABLE emp.employees.employee (
1880+
emp_id BIGINT,
1881+
emp_name VARCHAR,
1882+
join_date DATE,
1883+
country VARCHAR)
1884+
WITH (
1885+
partitioning = ARRAY['month(join_date)'],
1886+
sorted_by = ARRAY['join_date']
1887+
)
1888+
1889+
The Iceberg connector does not support sort order transforms. The following sort order transformations are not supported:
1890+
1891+
.. code-block:: text
1892+
1893+
bucket(n, column)
1894+
truncate(column, n)
1895+
year(column)
1896+
month(column)
1897+
day(column)
1898+
hour(column)
1899+
1900+
For example::
1901+
1902+
CREATE TABLE emp.employees.employee (
1903+
emp_id BIGINT,
1904+
emp_name VARCHAR,
1905+
join_date DATE,
1906+
country VARCHAR)
1907+
WITH (
1908+
sorted_by = ARRAY['month(join_date)']
1909+
)
1910+
1911+
If a user creates a table externally with non-identity sort columns and then inserts data, the following warning message will be shown.
1912+
``Iceberg table sort order has sort fields of <X>, <Y>, ... which are not currently supported by Presto``
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package com.facebook.presto.hive;
16+
17+
import com.facebook.airlift.configuration.Config;
18+
import com.facebook.airlift.configuration.ConfigDescription;
19+
import io.airlift.units.DataSize;
20+
import io.airlift.units.MaxDataSize;
21+
import io.airlift.units.MinDataSize;
22+
23+
import javax.validation.constraints.Max;
24+
import javax.validation.constraints.Min;
25+
26+
import static io.airlift.units.DataSize.Unit.MEGABYTE;
27+
28+
public class SortingFileWriterConfig
29+
{
30+
private DataSize writerSortBufferSize = new DataSize(64, MEGABYTE);
31+
private int maxOpenSortFiles = 50;
32+
33+
@MinDataSize("1MB")
34+
@MaxDataSize("1GB")
35+
public DataSize getWriterSortBufferSize()
36+
{
37+
return writerSortBufferSize;
38+
}
39+
40+
@Config("writer-sort-buffer-size")
41+
@ConfigDescription("Defines how much memory is used for this in-memory sorting process.")
42+
public SortingFileWriterConfig setWriterSortBufferSize(DataSize writerSortBufferSize)
43+
{
44+
this.writerSortBufferSize = writerSortBufferSize;
45+
return this;
46+
}
47+
48+
@Min(2)
49+
@Max(1000)
50+
public int getMaxOpenSortFiles()
51+
{
52+
return maxOpenSortFiles;
53+
}
54+
55+
@Config("max-open-sort-files")
56+
@ConfigDescription("When writing, the maximum number of temporary files opened at one time to write sorted data.")
57+
public SortingFileWriterConfig setMaxOpenSortFiles(int maxOpenSortFiles)
58+
{
59+
this.maxOpenSortFiles = maxOpenSortFiles;
60+
return this;
61+
}
62+
}

presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030

3131
import javax.validation.constraints.DecimalMax;
3232
import javax.validation.constraints.DecimalMin;
33-
import javax.validation.constraints.Max;
3433
import javax.validation.constraints.Min;
3534
import javax.validation.constraints.NotNull;
3635

@@ -279,20 +278,6 @@ public HiveClientConfig setDomainCompactionThreshold(int domainCompactionThresho
279278
return this;
280279
}
281280

282-
@MinDataSize("1MB")
283-
@MaxDataSize("1GB")
284-
public DataSize getWriterSortBufferSize()
285-
{
286-
return writerSortBufferSize;
287-
}
288-
289-
@Config("hive.writer-sort-buffer-size")
290-
public HiveClientConfig setWriterSortBufferSize(DataSize writerSortBufferSize)
291-
{
292-
this.writerSortBufferSize = writerSortBufferSize;
293-
return this;
294-
}
295-
296281
@Min(1)
297282
public int getMaxConcurrentFileRenames()
298283
{
@@ -695,22 +680,6 @@ public HiveClientConfig setMaxPartitionsPerWriter(int maxPartitionsPerWriter)
695680
this.maxPartitionsPerWriter = maxPartitionsPerWriter;
696681
return this;
697682
}
698-
699-
@Min(2)
700-
@Max(1000)
701-
public int getMaxOpenSortFiles()
702-
{
703-
return maxOpenSortFiles;
704-
}
705-
706-
@Config("hive.max-open-sort-files")
707-
@ConfigDescription("Maximum number of writer temporary files to read in one pass")
708-
public HiveClientConfig setMaxOpenSortFiles(int maxOpenSortFiles)
709-
{
710-
this.maxOpenSortFiles = maxOpenSortFiles;
711-
return this;
712-
}
713-
714683
public int getWriteValidationThreads()
715684
{
716685
return writeValidationThreads;

presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public void configure(Binder binder)
130130
binder.bind(HdfsConfigurationInitializer.class).in(Scopes.SINGLETON);
131131
newSetBinder(binder, DynamicConfigurationProvider.class);
132132
configBinder(binder).bindConfig(HiveClientConfig.class);
133-
133+
configBinder(binder).bindConfig(SortingFileWriterConfig.class);
134134
binder.bind(HiveSessionProperties.class).in(Scopes.SINGLETON);
135135
binder.bind(HiveTableProperties.class).in(Scopes.SINGLETON);
136136
binder.bind(HiveAnalyzeProperties.class).in(Scopes.SINGLETON);

presto-hive/src/main/java/com/facebook/presto/hive/HivePageSinkProvider.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ public HivePageSinkProvider(
9191
TypeManager typeManager,
9292
HiveClientConfig hiveClientConfig,
9393
MetastoreClientConfig metastoreClientConfig,
94+
SortingFileWriterConfig sortingFileWriterConfig,
9495
LocationService locationService,
9596
JsonCodec<PartitionUpdate> partitionUpdateCodec,
9697
SmileCodec<PartitionUpdate> partitionUpdateSmileCodec,
@@ -110,8 +111,8 @@ public HivePageSinkProvider(
110111
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
111112
this.typeManager = requireNonNull(typeManager, "typeManager is null");
112113
this.maxOpenPartitions = hiveClientConfig.getMaxPartitionsPerWriter();
113-
this.maxOpenSortFiles = hiveClientConfig.getMaxOpenSortFiles();
114-
this.writerSortBufferSize = requireNonNull(hiveClientConfig.getWriterSortBufferSize(), "writerSortBufferSize is null");
114+
this.maxOpenSortFiles = sortingFileWriterConfig.getMaxOpenSortFiles();
115+
this.writerSortBufferSize = requireNonNull(sortingFileWriterConfig.getWriterSortBufferSize(), "writerSortBufferSize is null");
115116
this.immutablePartitions = hiveClientConfig.isImmutablePartitions();
116117
this.locationService = requireNonNull(locationService, "locationService is null");
117118
this.writeVerificationExecutor = listeningDecorator(newFixedThreadPool(hiveClientConfig.getWriteValidationThreads(), daemonThreadsNamed("hive-write-validation-%s")));

presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,6 +1073,7 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi
10731073
FUNCTION_AND_TYPE_MANAGER,
10741074
getHiveClientConfig(),
10751075
getMetastoreClientConfig(),
1076+
getSortingFileWriterConfig(),
10761077
locationService,
10771078
HiveTestUtils.PARTITION_UPDATE_CODEC,
10781079
HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,
@@ -1099,12 +1100,17 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi
10991100
protected HiveClientConfig getHiveClientConfig()
11001101
{
11011102
return new HiveClientConfig()
1102-
.setMaxOpenSortFiles(10)
1103-
.setWriterSortBufferSize(new DataSize(100, KILOBYTE))
11041103
.setTemporaryTableSchema(database)
11051104
.setCreateEmptyBucketFilesForTemporaryTable(false);
11061105
}
11071106

1107+
protected SortingFileWriterConfig getSortingFileWriterConfig()
1108+
{
1109+
return new SortingFileWriterConfig()
1110+
.setMaxOpenSortFiles(10)
1111+
.setWriterSortBufferSize(new DataSize(100, KILOBYTE));
1112+
}
1113+
11081114
protected HiveCommonClientConfig getHiveCommonClientConfig()
11091115
{
11101116
return new HiveCommonClientConfig();
@@ -3109,7 +3115,7 @@ private void doTestBucketSortedTables(SchemaTableName table, boolean useTempPath
31093115
true);
31103116
assertThat(listAllDataFiles(context, path))
31113117
.filteredOn(file -> file.contains(".tmp-sort"))
3112-
.size().isGreaterThan(bucketCount * getHiveClientConfig().getMaxOpenSortFiles() * 2);
3118+
.size().isGreaterThan(bucketCount * getSortingFileWriterConfig().getMaxOpenSortFiles() * 2);
31133119

31143120
// finish the write
31153121
Collection<Slice> fragments = getFutureValue(sink.finish());

presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ protected void setup(String host, int port, String databaseName, BiFunction<Hive
254254
FUNCTION_AND_TYPE_MANAGER,
255255
config,
256256
metastoreClientConfig,
257+
new SortingFileWriterConfig(),
257258
locationService,
258259
HiveTestUtils.PARTITION_UPDATE_CODEC,
259260
HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,

presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ public void testDefaults()
6060
.setMaxInitialSplitSize(new DataSize(32, Unit.MEGABYTE))
6161
.setSplitLoaderConcurrency(4)
6262
.setDomainCompactionThreshold(100)
63-
.setWriterSortBufferSize(new DataSize(64, Unit.MEGABYTE))
6463
.setMaxConcurrentFileRenames(20)
6564
.setMaxConcurrentZeroRowFileCreations(20)
6665
.setRecursiveDirWalkerEnabled(false)
@@ -82,7 +81,6 @@ public void testDefaults()
8281
.setFailFastOnInsertIntoImmutablePartitionsEnabled(true)
8382
.setSortedWritingEnabled(true)
8483
.setMaxPartitionsPerWriter(100)
85-
.setMaxOpenSortFiles(50)
8684
.setWriteValidationThreads(16)
8785
.setTextMaxLineLength(new DataSize(100, Unit.MEGABYTE))
8886
.setUseOrcColumnNames(false)
@@ -195,7 +193,6 @@ public void testExplicitPropertyMappings()
195193
.put("hive.max-initial-split-size", "16MB")
196194
.put("hive.split-loader-concurrency", "1")
197195
.put("hive.domain-compaction-threshold", "42")
198-
.put("hive.writer-sort-buffer-size", "13MB")
199196
.put("hive.recursive-directories", "true")
200197
.put("hive.storage-format", "SEQUENCEFILE")
201198
.put("hive.compression-codec", "NONE")
@@ -207,7 +204,6 @@ public void testExplicitPropertyMappings()
207204
.put("hive.insert-overwrite-immutable-partitions-enabled", "true")
208205
.put("hive.fail-fast-on-insert-into-immutable-partitions-enabled", "false")
209206
.put("hive.max-partitions-per-writers", "222")
210-
.put("hive.max-open-sort-files", "333")
211207
.put("hive.write-validation-threads", "11")
212208
.put("hive.max-concurrent-file-renames", "100")
213209
.put("hive.max-concurrent-zero-row-file-creations", "100")
@@ -313,7 +309,6 @@ public void testExplicitPropertyMappings()
313309
.setMaxInitialSplitSize(new DataSize(16, Unit.MEGABYTE))
314310
.setSplitLoaderConcurrency(1)
315311
.setDomainCompactionThreshold(42)
316-
.setWriterSortBufferSize(new DataSize(13, Unit.MEGABYTE))
317312
.setMaxConcurrentFileRenames(100)
318313
.setMaxConcurrentZeroRowFileCreations(100)
319314
.setRecursiveDirWalkerEnabled(true)
@@ -331,7 +326,6 @@ public void testExplicitPropertyMappings()
331326
.setInsertOverwriteImmutablePartitionEnabled(true)
332327
.setFailFastOnInsertIntoImmutablePartitionsEnabled(false)
333328
.setMaxPartitionsPerWriter(222)
334-
.setMaxOpenSortFiles(333)
335329
.setWriteValidationThreads(11)
336330
.setDomainSocketPath("/foo")
337331
.setS3FileSystemType(S3FileSystemType.EMRFS)

presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,22 +114,23 @@ public void testAllFormats()
114114
throws Exception
115115
{
116116
HiveClientConfig config = new HiveClientConfig();
117+
SortingFileWriterConfig sortingFileWriterConfig = new SortingFileWriterConfig();
117118
MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig();
118119
File tempDir = Files.createTempDir();
119120
try {
120121
ExtendedHiveMetastore metastore = createTestingFileHiveMetastore(new File(tempDir, "metastore"));
121122
for (HiveStorageFormat format : getSupportedHiveStorageFormats()) {
122123
config.setHiveStorageFormat(format);
123124
config.setCompressionCodec(NONE);
124-
long uncompressedLength = writeTestFile(config, metastoreClientConfig, metastore, makeFileName(tempDir, config));
125+
long uncompressedLength = writeTestFile(config, metastoreClientConfig, metastore, makeFileName(tempDir, config), sortingFileWriterConfig);
125126
assertGreaterThan(uncompressedLength, 0L);
126127

127128
for (HiveCompressionCodec codec : HiveCompressionCodec.values()) {
128129
if (codec == NONE || !codec.isSupportedStorageFormat(format)) {
129130
continue;
130131
}
131132
config.setCompressionCodec(codec);
132-
long length = writeTestFile(config, metastoreClientConfig, metastore, makeFileName(tempDir, config));
133+
long length = writeTestFile(config, metastoreClientConfig, metastore, makeFileName(tempDir, config), sortingFileWriterConfig);
133134
assertTrue(uncompressedLength > length, format("%s with %s compressed to %s which is not less than %s", format, codec, length, uncompressedLength));
134135
}
135136
}
@@ -152,11 +153,11 @@ private static String makeFileName(File tempDir, HiveClientConfig config)
152153
return tempDir.getAbsolutePath() + "/" + config.getHiveStorageFormat().name() + "." + config.getCompressionCodec().name();
153154
}
154155

155-
private static long writeTestFile(HiveClientConfig config, MetastoreClientConfig metastoreClientConfig, ExtendedHiveMetastore metastore, String outputPath)
156+
private static long writeTestFile(HiveClientConfig config, MetastoreClientConfig metastoreClientConfig, ExtendedHiveMetastore metastore, String outputPath, SortingFileWriterConfig sortingFileWriterConfig)
156157
{
157158
HiveTransactionHandle transaction = new HiveTransactionHandle();
158159
HiveWriterStats stats = new HiveWriterStats();
159-
ConnectorPageSink pageSink = createPageSink(transaction, config, metastoreClientConfig, metastore, new Path("file:///" + outputPath), stats);
160+
ConnectorPageSink pageSink = createPageSink(transaction, config, metastoreClientConfig, metastore, new Path("file:///" + outputPath), stats, sortingFileWriterConfig);
160161
List<LineItemColumn> columns = getTestColumns();
161162
List<Type> columnTypes = columns.stream()
162163
.map(LineItemColumn::getType)
@@ -308,7 +309,7 @@ private static ConnectorPageSource createPageSource(HiveTransactionHandle transa
308309
return provider.createPageSource(transaction, getSession(config, new HiveCommonClientConfig()), split, tableHandle.getLayout().get(), ImmutableList.copyOf(getColumnHandles()), NON_CACHEABLE, new RuntimeStats());
309310
}
310311

311-
private static ConnectorPageSink createPageSink(HiveTransactionHandle transaction, HiveClientConfig config, MetastoreClientConfig metastoreClientConfig, ExtendedHiveMetastore metastore, Path outputPath, HiveWriterStats stats)
312+
private static ConnectorPageSink createPageSink(HiveTransactionHandle transaction, HiveClientConfig config, MetastoreClientConfig metastoreClientConfig, ExtendedHiveMetastore metastore, Path outputPath, HiveWriterStats stats, SortingFileWriterConfig sortingFileWriterConfig)
312313
{
313314
LocationHandle locationHandle = new LocationHandle(outputPath, outputPath, Optional.empty(), NEW, DIRECT_TO_TARGET_NEW_DIRECTORY);
314315
HiveOutputTableHandle handle = new HiveOutputTableHandle(
@@ -337,6 +338,7 @@ private static ConnectorPageSink createPageSink(HiveTransactionHandle transactio
337338
FUNCTION_AND_TYPE_MANAGER,
338339
config,
339340
metastoreClientConfig,
341+
sortingFileWriterConfig,
340342
new HiveLocationService(hdfsEnvironment),
341343
HiveTestUtils.PARTITION_UPDATE_CODEC,
342344
HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,

0 commit comments

Comments
 (0)