Skip to content

Commit 2c18ef2

Browse files
authored
HIVE-29147: Iceberg: Table-level column stats filter support (apache#5724)
1 parent 6f53c7f commit 2c18ef2

6 files changed

Lines changed: 198 additions & 128 deletions

File tree

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 86 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020
package org.apache.iceberg.mr.hive;
2121

2222
import java.io.IOException;
23+
import java.io.Serializable;
2324
import java.io.UncheckedIOException;
2425
import java.net.URI;
2526
import java.net.URISyntaxException;
2627
import java.nio.ByteBuffer;
2728
import java.util.Arrays;
2829
import java.util.Collection;
2930
import java.util.Collections;
30-
import java.util.Iterator;
3131
import java.util.List;
3232
import java.util.ListIterator;
3333
import java.util.Map;
@@ -37,6 +37,7 @@
3737
import java.util.Set;
3838
import java.util.UUID;
3939
import java.util.concurrent.ExecutorService;
40+
import java.util.function.Predicate;
4041
import java.util.stream.Collectors;
4142
import java.util.stream.Stream;
4243
import org.apache.commons.collections.MapUtils;
@@ -45,6 +46,7 @@
4546
import org.apache.commons.lang3.StringUtils;
4647
import org.apache.hadoop.conf.Configuration;
4748
import org.apache.hadoop.fs.FileStatus;
49+
import org.apache.hadoop.fs.FileSystem;
4850
import org.apache.hadoop.fs.Path;
4951
import org.apache.hadoop.hive.common.FileUtils;
5052
import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -89,7 +91,6 @@
8991
import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
9092
import org.apache.hadoop.hive.ql.metadata.DummyPartition;
9193
import org.apache.hadoop.hive.ql.metadata.HiveException;
92-
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
9394
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
9495
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
9596
import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -183,7 +184,6 @@
183184
import org.apache.iceberg.puffin.BlobMetadata;
184185
import org.apache.iceberg.puffin.Puffin;
185186
import org.apache.iceberg.puffin.PuffinCompressionCodec;
186-
import org.apache.iceberg.puffin.PuffinReader;
187187
import org.apache.iceberg.puffin.PuffinWriter;
188188
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
189189
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -196,7 +196,6 @@
196196
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
197197
import org.apache.iceberg.types.Conversions;
198198
import org.apache.iceberg.types.Types;
199-
import org.apache.iceberg.util.ByteBuffers;
200199
import org.apache.iceberg.util.Pair;
201200
import org.apache.iceberg.util.SerializationUtil;
202201
import org.apache.iceberg.util.SnapshotUtil;
@@ -221,12 +220,14 @@ public class HiveIcebergStorageHandler extends DefaultStorageHandler implements
221220

222221
private static final String ICEBERG_URI_PREFIX = "iceberg://";
223222
private static final String TABLE_NAME_SEPARATOR = "..";
224-
// Column index for partition metadata table
225-
public static final String COPY_ON_WRITE = RowLevelOperationMode.COPY_ON_WRITE.modeName();
226-
public static final String MERGE_ON_READ = RowLevelOperationMode.MERGE_ON_READ.modeName();
223+
public static final String TABLE_DEFAULT_LOCATION = "TABLE_DEFAULT_LOCATION";
224+
225+
private static final String SPEC_ID = "spec-id";
226+
private static final String PARTITION = "partition";
227227
public static final String STATS = "/stats/snap-";
228228

229-
public static final String TABLE_DEFAULT_LOCATION = "TABLE_DEFAULT_LOCATION";
229+
public static final String COPY_ON_WRITE = RowLevelOperationMode.COPY_ON_WRITE.modeName();
230+
public static final String MERGE_ON_READ = RowLevelOperationMode.MERGE_ON_READ.modeName();
230231

231232
private static final List<VirtualColumn> ACID_VIRTUAL_COLS = ImmutableList.of(
232233
PARTITION_SPEC_ID, PARTITION_HASH, FILE_PATH, ROW_POSITION, PARTITION_PROJECTION);
@@ -601,20 +602,27 @@ private boolean writeColStats(List<ColumnStatistics> colStats, Table tbl) {
601602
long snapshotId = tbl.currentSnapshot().snapshotId();
602603
long snapshotSequenceNumber = tbl.currentSnapshot().sequenceNumber();
603604

604-
colStats.forEach(statsObj -> {
605-
byte[] serializeColStats = SerializationUtils.serialize(statsObj);
606-
puffinWriter.add(
607-
new Blob(
608-
ColumnStatisticsObj.class.getSimpleName(),
609-
ImmutableList.of(1),
610-
snapshotId,
611-
snapshotSequenceNumber,
612-
ByteBuffer.wrap(serializeColStats),
613-
PuffinCompressionCodec.NONE,
614-
ImmutableMap.of("partition",
615-
String.valueOf(statsObj.getStatsDesc().getPartName()))
616-
));
605+
colStats.forEach(stats -> {
606+
boolean isTblLevel = stats.getStatsDesc().isIsTblLevel();
607+
608+
for (Serializable statsObj : isTblLevel ? stats.getStatsObj() : Collections.singletonList(stats)) {
609+
byte[] serializeColStats = SerializationUtils.serialize(statsObj);
610+
puffinWriter.add(
611+
new Blob(
612+
ColumnStatisticsObj.class.getSimpleName(),
613+
ImmutableList.of(isTblLevel ? tbl.spec().schema().findField(
614+
((ColumnStatisticsObj) statsObj).getColName()).fieldId() : 1),
615+
snapshotId,
616+
snapshotSequenceNumber,
617+
ByteBuffer.wrap(serializeColStats),
618+
PuffinCompressionCodec.NONE,
619+
isTblLevel ?
620+
ImmutableMap.of(SPEC_ID, String.valueOf(tbl.spec().specId())) :
621+
ImmutableMap.of(PARTITION, String.valueOf(stats.getStatsDesc().getPartName()))
622+
));
623+
}
617624
});
625+
618626
puffinWriter.finish();
619627

620628
statisticsFile =
@@ -628,7 +636,13 @@ private boolean writeColStats(List<ColumnStatistics> colStats, Table tbl) {
628636
.collect(ImmutableList.toImmutableList())
629637
);
630638
} catch (IOException e) {
631-
LOG.warn("Unable to write stats to puffin file {}", e.getMessage());
639+
LOG.warn("Unable to write column stats to the Puffin file: {}", e.getMessage());
640+
641+
Path path = new Path(statsPath);
642+
FileSystem fs = path.getFileSystem(conf);
643+
if (fs.exists(path)) {
644+
fs.delete(path, false);
645+
}
632646
return false;
633647
}
634648
tbl.updateStatistics()
@@ -637,7 +651,7 @@ private boolean writeColStats(List<ColumnStatistics> colStats, Table tbl) {
637651
return true;
638652

639653
} catch (Exception e) {
640-
LOG.warn("Unable to invalidate or merge stats: {}", e.getMessage());
654+
LOG.warn("Unable to invalidate or merge column stats: {}", e.getMessage());
641655
}
642656
return false;
643657
}
@@ -653,21 +667,32 @@ public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table
653667
}
654668

655669
private boolean canProvideColStats(Table table, long snapshotId) {
656-
return IcebergTableUtil.getColStatsPath(table, snapshotId).isPresent();
670+
return IcebergTableUtil.getColStatsPath(table, snapshotId) != null;
657671
}
658672

659673
@Override
660-
public List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
674+
public List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
675+
List<String> colNames) {
661676
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
677+
662678
Snapshot snapshot = IcebergTableUtil.getTableSnapshot(table, hmsTable);
679+
if (snapshot == null) {
680+
return Lists.newArrayList();
681+
}
663682

664-
ColumnStatistics emptyStats = new ColumnStatistics();
665-
if (snapshot != null) {
666-
return IcebergTableUtil.getColStatsPath(table, snapshot.snapshotId())
667-
.map(statsPath -> readColStats(table, statsPath, null).getFirst())
668-
.orElse(emptyStats).getStatsObj();
683+
Predicate<BlobMetadata> filter;
684+
if (colNames != null) {
685+
Set<String> columns = Sets.newHashSet(colNames);
686+
filter = metadata -> {
687+
int specId = Integer.parseInt(metadata.properties().get(SPEC_ID));
688+
String column = table.specs().get(specId).schema().findColumnName(metadata.inputFields().getFirst());
689+
return columns.contains(column);
690+
};
691+
} else {
692+
filter = null;
669693
}
670-
return emptyStats.getStatsObj();
694+
695+
return IcebergTableUtil.readColStats(table, snapshot.snapshotId(), filter);
671696
}
672697

673698
@Override
@@ -684,9 +709,10 @@ public AggrStats getAggrColStatsFor(org.apache.hadoop.hive.ql.metadata.Table hms
684709
MetastoreConf.ConfVars.STATS_NDV_DENSITY_FUNCTION);
685710
double ndvTuner = MetastoreConf.getDoubleVar(getConf(), MetastoreConf.ConfVars.STATS_NDV_TUNER);
686711

687-
List<ColumnStatistics> partStats = IcebergTableUtil.getColStatsPath(table, snapshot.snapshotId())
688-
.map(statsPath -> readColStats(table, statsPath, Sets.newHashSet(partNames)))
689-
.orElse(Collections.emptyList());
712+
Set<String> partitions = Sets.newHashSet(partNames);
713+
Predicate<BlobMetadata> filter = metadata -> partitions.contains(metadata.properties().get(PARTITION));
714+
715+
List<ColumnStatistics> partStats = IcebergTableUtil.readColStats(table, snapshot.snapshotId(), filter);
690716

691717
partStats.forEach(colStats ->
692718
colStats.getStatsObj().removeIf(statsObj -> !colNames.contains(statsObj.getColName())));
@@ -700,30 +726,6 @@ public AggrStats getAggrColStatsFor(org.apache.hadoop.hive.ql.metadata.Table hms
700726
return new AggrStats(colStatsList, partStats.size());
701727
}
702728

703-
private List<ColumnStatistics> readColStats(Table table, Path statsPath, Set<String> partNames) {
704-
List<ColumnStatistics> colStats = Lists.newArrayList();
705-
706-
try (PuffinReader reader = Puffin.read(table.io().newInputFile(statsPath.toString())).build()) {
707-
List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
708-
709-
if (partNames != null) {
710-
blobMetadata = blobMetadata.stream()
711-
.filter(metadata -> partNames.contains(metadata.properties().get("partition")))
712-
.collect(Collectors.toList());
713-
}
714-
Iterator<ByteBuffer> it = Iterables.transform(reader.readAll(blobMetadata), Pair::second).iterator();
715-
LOG.info("Using col stats from : {}", statsPath);
716-
717-
while (it.hasNext()) {
718-
byte[] byteBuffer = ByteBuffers.toByteArray(it.next());
719-
colStats.add(SerializationUtils.deserialize(byteBuffer));
720-
}
721-
} catch (Exception e) {
722-
LOG.warn(" Unable to read col stats: ", e);
723-
}
724-
return colStats;
725-
}
726-
727729
@Override
728730
public boolean canComputeQueryUsingStats(Partish partish) {
729731
org.apache.hadoop.hive.ql.metadata.Table hmsTable = partish.getTable();
@@ -757,28 +759,30 @@ private String getStatsSource() {
757759
private boolean shouldRewriteColStats(Table tbl) {
758760
return SessionStateUtil.getQueryState(conf).map(QueryState::getHiveOperation)
759761
.filter(opType -> HiveOperation.ANALYZE_TABLE == opType).isPresent() ||
760-
IcebergTableUtil.getColStatsPath(tbl).isPresent();
762+
IcebergTableUtil.getColStatsPath(tbl) != null;
761763
}
762764

763765
private void checkAndMergeColStats(List<ColumnStatistics> statsNew, Table tbl) throws InvalidObjectException {
764766
Long previousSnapshotId = tbl.currentSnapshot().parentId();
765767
if (previousSnapshotId != null && canProvideColStats(tbl, previousSnapshotId)) {
766-
List<ColumnStatistics> statsOld = IcebergTableUtil.getColStatsPath(tbl, previousSnapshotId)
767-
.map(statsPath -> readColStats(tbl, statsPath, null))
768-
.orElse(Collections.emptyList());
769768

770769
boolean isTblLevel = statsNew.getFirst().getStatsDesc().isIsTblLevel();
771770
Map<String, ColumnStatistics> oldStatsMap = Maps.newHashMap();
772771

772+
List<?> statsOld = IcebergTableUtil.readColStats(tbl, previousSnapshotId, null);
773+
773774
if (!isTblLevel) {
774-
for (ColumnStatistics statsObjOld : statsOld) {
775+
for (ColumnStatistics statsObjOld : (List<ColumnStatistics>) statsOld) {
775776
oldStatsMap.put(statsObjOld.getStatsDesc().getPartName(), statsObjOld);
776777
}
778+
} else {
779+
statsOld = Collections.singletonList(
780+
new ColumnStatistics(null, (List<ColumnStatisticsObj>) statsOld));
777781
}
778782
for (ColumnStatistics statsObjNew : statsNew) {
779783
String partitionKey = statsObjNew.getStatsDesc().getPartName();
780784
ColumnStatistics statsObjOld = isTblLevel ?
781-
statsOld.getFirst() : oldStatsMap.get(partitionKey);
785+
(ColumnStatistics) statsOld.getFirst() : oldStatsMap.get(partitionKey);
782786

783787
if (statsObjOld != null && statsObjOld.getStatsObjSize() != 0 && !statsObjNew.getStatsObj().isEmpty()) {
784788
MetaStoreServerUtils.mergeColStats(statsObjNew, statsObjOld);
@@ -1864,19 +1868,6 @@ public void addResourcesForCreateTable(Map<String, String> tblProps, HiveConf hi
18641868
}
18651869
}
18661870

1867-
/**
1868-
* Check the operation type of all snapshots which are newer than the specified. The specified snapshot is excluded.
1869-
* @param hmsTable table metadata stored in Hive Metastore
1870-
* @param since the snapshot preceding the oldest snapshot which should be checked.
1871-
* The value null means all should be checked.
1872-
* @return null if table is empty, true if all snapshots are {@link SnapshotContext.WriteOperationType#APPEND}s,
1873-
* false otherwise.
1874-
*
1875-
* @deprecated
1876-
* <br>Use {@link HiveStorageHandler#getSnapshotContexts(
1877-
* org.apache.hadoop.hive.ql.metadata.Table hmsTable, SnapshotContext since)}
1878-
* and check {@link SnapshotContext.WriteOperationType#APPEND}.equals({@link SnapshotContext#getOperation()}).
1879-
*/
18801871
@Deprecated
18811872
@Override
18821873
public Boolean hasAppendsOnly(org.apache.hadoop.hive.ql.metadata.Table hmsTable, SnapshotContext since) {
@@ -2132,23 +2123,24 @@ public List<Partition> getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.Ta
21322123
.caseSensitive(false).includeColumnStats().ignoreResiduals();
21332124

21342125
try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
2135-
FluentIterable.from(tasks).filter(task -> task.spec().isPartitioned()).forEach(task -> {
2136-
DataFile file = task.file();
2137-
PartitionSpec spec = task.spec();
2138-
2139-
if (latestSpecOnly == null || latestSpecOnly && file.specId() == tableSpecId ||
2140-
!latestSpecOnly && file.specId() != tableSpecId) {
2141-
2142-
PartitionData partitionData = IcebergTableUtil.toPartitionData(task.partition(), spec.partitionType());
2143-
String partName = spec.partitionToPath(partitionData);
2144-
2145-
Map<String, String> partSpecMap = Maps.newLinkedHashMap();
2146-
Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null);
2147-
2148-
DummyPartition partition = new DummyPartition(hmsTable, partName, partSpecMap);
2149-
partitions.add(partition);
2150-
}
2151-
});
2126+
FluentIterable.from(tasks)
2127+
.filter(task -> task.spec().isPartitioned())
2128+
.forEach(task -> {
2129+
DataFile file = task.file();
2130+
PartitionSpec spec = task.spec();
2131+
2132+
if (latestSpecOnly == null || latestSpecOnly && file.specId() == tableSpecId ||
2133+
!latestSpecOnly && file.specId() != tableSpecId) {
2134+
PartitionData partitionData = IcebergTableUtil.toPartitionData(task.partition(), spec.partitionType());
2135+
String partName = spec.partitionToPath(partitionData);
2136+
2137+
Map<String, String> partSpecMap = Maps.newLinkedHashMap();
2138+
Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null);
2139+
2140+
DummyPartition partition = new DummyPartition(hmsTable, partName, partSpecMap);
2141+
partitions.add(partition);
2142+
}
2143+
});
21522144
} catch (IOException e) {
21532145
throw new SemanticException(String.format("Error while fetching the partitions due to: %s", e));
21542146
}

0 commit comments

Comments
 (0)