diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java index 3c84bfb9d2089..6085f4b323efe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java @@ -39,6 +39,7 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.compact.CompactHelpers; @@ -80,7 +81,7 @@ public HoodieCompactionPlan generateCompactionPlan() throws IOException { // TODO : check if maxMemory is not greater than JVM or executor memory // TODO - rollback any compactions in flight HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); - List partitionPaths = FSUtils.getAllPartitionPaths(engineContext, metaClient.getStorage(), writeConfig.getMetadataConfig(), metaClient.getBasePath()); + List partitionPaths = listPartitionsPaths(engineContext, metaClient.getStorage(), writeConfig, metaClient.getBasePath()); // filter the partition paths if needed to reduce list status partitionPaths = filterPartitionPathsByStrategy(writeConfig, partitionPaths); @@ -159,6 +160,10 @@ public HoodieCompactionPlan generateCompactionPlan() throws IOException { return compactionPlan; } + protected List listPartitionsPaths(HoodieEngineContext engineContext, HoodieStorage storage, HoodieWriteConfig writeConfig, String basePathStr) { + return FSUtils.getAllPartitionPaths(engineContext, storage, writeConfig.getMetadataConfig(), basePathStr); + } + protected abstract HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, List operations); protected abstract boolean filterLogCompactionOperations(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java index 445c8eb775661..4cd43d0d36d09 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java @@ -26,11 +26,13 @@ import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.table.HoodieTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.List; import static java.util.stream.Collectors.toList; @@ -52,6 +54,21 @@ protected HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClien CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList())); } + @Override + protected List listPartitionsPaths(HoodieEngineContext engineContext, HoodieStorage storage, HoodieWriteConfig writeConfig, String basePathStr) { + String compactionStrategy = writeConfig.getCompactionStrategy().getClass().getName(); + LOG.info("Compaction strategy is " + compactionStrategy); + if (compactionStrategy.equals("com.heap.datalake.compaction.SpecificPartitionsCompactionStrategy")) { + String[] partitions = writeConfig.getString("hoodie.compaction.include.partitions").split(","); + if (partitions.length > 0) { + LOG.info("Skipping listing all partitions in favor of partitions provided in config: " + Arrays.toString(partitions)); + return Arrays.asList(partitions); + } + } + LOG.info("Defaulting to listing all partitions"); + return super.listPartitionsPaths(engineContext, storage, writeConfig, basePathStr); + } + @Override protected List filterPartitionPathsByStrategy(HoodieWriteConfig writeConfig, List partitionPaths) { return writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, partitionPaths); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index 1eecf9d3d58cf..63c0c7c662f14 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -26,6 +26,7 @@ import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; +import org.apache.hadoop.fs.GlobPattern; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.databind.JsonNode; @@ -175,20 +176,22 @@ public Map getFileGroupIdAndFullPaths(String basePath * @param basePath The base path * @return the file full path to file status mapping */ - public Map getFullPathToInfo(HoodieStorage storage, - String basePath) { + public Map getFullPathToInfo(HoodieStorage storage, String basePath, String fileNamePattern) { Map fullPathToInfoMap = new HashMap<>(); + GlobPattern globMatcher = new GlobPattern(fileNamePattern); for (List stats : getPartitionToWriteStats().values()) { // Iterate through all the written files. for (HoodieWriteStat stat : stats) { String relativeFilePath = stat.getPath(); - StoragePath fullPath = relativeFilePath != null - ? FSUtils.constructAbsolutePath(basePath, relativeFilePath) : null; - if (fullPath != null) { - long blockSize = storage.getDefaultBlockSize(fullPath); - StoragePathInfo pathInfo = new StoragePathInfo( - fullPath, stat.getFileSizeInBytes(), false, (short) 0, blockSize, 0); - fullPathToInfoMap.put(fullPath.getName(), pathInfo); + if (fileNamePattern.isEmpty() || globMatcher.matches(relativeFilePath)) { + StoragePath fullPath = relativeFilePath != null + ? FSUtils.constructAbsolutePath(basePath, relativeFilePath) : null; + if (fullPath != null) { + long blockSize = storage.getDefaultBlockSize(fullPath); + StoragePathInfo pathInfo = new StoragePathInfo( + fullPath, stat.getFileSizeInBytes(), false, (short) 0, blockSize, 0); + fullPathToInfoMap.put(fullPath.getName(), pathInfo); + } } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java index b2c7d56f62456..a6cc11869d6a2 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java @@ -151,7 +151,7 @@ private static Map getFilesToRead( case COPY_ON_WRITE: return metadata.getFileIdToInfo(basePath); case MERGE_ON_READ: - return metadata.getFullPathToInfo(new HoodieHadoopStorage(basePath, hadoopConf), basePath); + return metadata.getFullPathToInfo(new HoodieHadoopStorage(basePath, hadoopConf), basePath, ""); default: throw new AssertionError(); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java index fac2336836b11..752a1ba06965b 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java @@ -193,8 +193,7 @@ protected List listStatusForIncrementalMode(JobConf job, // build fileGroup from fsView List affectedPathInfoList = HoodieInputFormatUtils - .listAffectedFilesForCommits(job, tableMetaClient.getBasePathV2(), - metadataList); + .listAffectedFilesForCommits(job, tableMetaClient.getBasePathV2(), metadataList, ""); // step3 HoodieTableFileSystemView fsView = new HoodieTableFileSystemView( tableMetaClient, commitsTimelineToReturn, affectedPathInfoList); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index fe88855d4581d..ffc4f51daf544 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -520,15 +520,13 @@ private static HoodieBaseFile refreshFileStatus(Configuration conf, HoodieBaseFi * @param metadataList The metadata list to read the data from * @return the affected file status array */ - public static List listAffectedFilesForCommits(Configuration hadoopConf, - StoragePath basePath, - List metadataList) { + public static List listAffectedFilesForCommits(Configuration hadoopConf, StoragePath basePath, List metadataList, String fileNamePattern) { // TODO: Use HoodieMetaTable to extract affected file directly. HashMap fullPathToInfoMap = new HashMap<>(); HoodieStorage storage = new HoodieHadoopStorage(basePath, HadoopFSUtils.getStorageConf(hadoopConf)); // Iterate through the given commits. for (HoodieCommitMetadata metadata : metadataList) { - fullPathToInfoMap.putAll(metadata.getFullPathToInfo(storage, basePath.toString())); + fullPathToInfoMap.putAll(metadata.getFullPathToInfo(storage, basePath.toString(), fileNamePattern)); } return new ArrayList<>(fullPathToInfoMap.values()); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 45134f91278f1..03e8aa41c41df 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -162,6 +162,12 @@ object DataSourceReadOptions { .withDocumentation("For the use-cases like users only want to incremental pull from certain partitions " + "instead of the full table. This option allows using glob pattern to directly filter on path.") + val INCR_PARTITION_GLOB: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.read.incr.partition.glob") + .defaultValue("") + .withDocumentation("For the use-cases like users only want to incremental pull from certain partitions " + + "instead of the full table. This option allows using glob pattern to directly filter on partition.") + val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = HoodieCommonConfig.TIMESTAMP_AS_OF val ENABLE_DATA_SKIPPING: ConfigProperty[Boolean] = ConfigProperty diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 97d9307dc6a67..5200580ee317d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -108,13 +108,14 @@ case class MergeOnReadIncrementalRelation(override val sqlContext: SQLContext, metaClient, timeline, affectedFilesInCommits) val modifiedPartitions = getWritePartitionPaths(commitsMetadata) + val globMatcher = new GlobPattern("*" + globPartitionPattern) - modifiedPartitions.asScala.flatMap { relativePartitionPath => + modifiedPartitions.asScala.filter(p => globMatcher.matches(p)).flatMap { relativePartitionPath => fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala }.toSeq } - buildSplits(filterFileSlices(fileSlices, globPattern)) + buildSplits(fileSlices) } } @@ -181,7 +182,7 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation { protected lazy val commitsMetadata = includedCommits.map(getCommitMetadata(_, super.timeline)).asJava protected lazy val affectedFilesInCommits: java.util.List[StoragePathInfo] = { - listAffectedFilesForCommits(conf, metaClient.getBasePathV2, commitsMetadata) + listAffectedFilesForCommits(conf, metaClient.getBasePathV2, commitsMetadata, "*" + globPathPattern) } protected lazy val (includeStartTime, startTs) = if (startInstantArchived) { @@ -233,8 +234,10 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation { } } - protected def globPattern: String = + protected def globPathPattern: String = optParams.getOrElse(DataSourceReadOptions.INCR_PATH_GLOB.key, DataSourceReadOptions.INCR_PATH_GLOB.defaultValue) + protected def globPartitionPattern: String = + optParams.getOrElse(DataSourceReadOptions.INCR_PARTITION_GLOB.key, DataSourceReadOptions.INCR_PARTITION_GLOB.defaultValue) }