Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PA-7420] Sync fork with 0.15.0 release #8

Open
wants to merge 2 commits into
base: release-0.15.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.CompactHelpers;

Expand Down Expand Up @@ -80,7 +81,7 @@ public HoodieCompactionPlan generateCompactionPlan() throws IOException {
// TODO : check if maxMemory is not greater than JVM or executor memory
// TODO - rollback any compactions in flight
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
List<String> partitionPaths = FSUtils.getAllPartitionPaths(engineContext, metaClient.getStorage(), writeConfig.getMetadataConfig(), metaClient.getBasePath());
List<String> partitionPaths = listPartitionsPaths(engineContext, metaClient.getStorage(), writeConfig, metaClient.getBasePath());

// filter the partition paths if needed to reduce list status
partitionPaths = filterPartitionPathsByStrategy(writeConfig, partitionPaths);
Expand Down Expand Up @@ -159,6 +160,10 @@ public HoodieCompactionPlan generateCompactionPlan() throws IOException {
return compactionPlan;
}

protected List<String> listPartitionsPaths(HoodieEngineContext engineContext, HoodieStorage storage, HoodieWriteConfig writeConfig, String basePathStr) {
return FSUtils.getAllPartitionPaths(engineContext, storage, writeConfig.getMetadataConfig(), basePathStr);
}

protected abstract HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, List<HoodieCompactionOperation> operations);

protected abstract boolean filterLogCompactionOperations();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.table.HoodieTable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;

import static java.util.stream.Collectors.toList;
Expand All @@ -52,6 +54,21 @@ protected HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClien
CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList()));
}

@Override
protected List<String> listPartitionsPaths(HoodieEngineContext engineContext, HoodieStorage storage, HoodieWriteConfig writeConfig, String basePathStr) {
String compactionStrategy = writeConfig.getCompactionStrategy().getClass().getName();
LOG.info("Compaction strategy is " + compactionStrategy);
if (compactionStrategy.equals("com.heap.datalake.compaction.SpecificPartitionsCompactionStrategy")) {
String[] partitions = writeConfig.getString("hoodie.compaction.include.partitions").split(",");
if (partitions.length > 0) {
LOG.info("Skipping listing all partitions in favor of partitions provided in config: " + Arrays.toString(partitions));
return Arrays.asList(partitions);
}
}
LOG.info("Defaulting to listing all partitions");
return super.listPartitionsPaths(engineContext, storage, writeConfig, basePathStr);
}

@Override
protected List<String> filterPartitionPathsByStrategy(HoodieWriteConfig writeConfig, List<String> partitionPaths) {
return writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, partitionPaths);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hadoop.fs.GlobPattern;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -175,20 +176,22 @@ public Map<HoodieFileGroupId, String> getFileGroupIdAndFullPaths(String basePath
* @param basePath The base path
* @return the file full path to file status mapping
*/
public Map<String, StoragePathInfo> getFullPathToInfo(HoodieStorage storage,
String basePath) {
public Map<String, StoragePathInfo> getFullPathToInfo(HoodieStorage storage, String basePath, String fileNamePattern) {
Map<String, StoragePathInfo> fullPathToInfoMap = new HashMap<>();
GlobPattern globMatcher = new GlobPattern(fileNamePattern);
for (List<HoodieWriteStat> stats : getPartitionToWriteStats().values()) {
// Iterate through all the written files.
for (HoodieWriteStat stat : stats) {
String relativeFilePath = stat.getPath();
StoragePath fullPath = relativeFilePath != null
? FSUtils.constructAbsolutePath(basePath, relativeFilePath) : null;
if (fullPath != null) {
long blockSize = storage.getDefaultBlockSize(fullPath);
StoragePathInfo pathInfo = new StoragePathInfo(
fullPath, stat.getFileSizeInBytes(), false, (short) 0, blockSize, 0);
fullPathToInfoMap.put(fullPath.getName(), pathInfo);
if (fileNamePattern.isEmpty() || globMatcher.matches(relativeFilePath)) {
StoragePath fullPath = relativeFilePath != null
? FSUtils.constructAbsolutePath(basePath, relativeFilePath) : null;
if (fullPath != null) {
long blockSize = storage.getDefaultBlockSize(fullPath);
StoragePathInfo pathInfo = new StoragePathInfo(
fullPath, stat.getFileSizeInBytes(), false, (short) 0, blockSize, 0);
fullPathToInfoMap.put(fullPath.getName(), pathInfo);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private static Map<String, StoragePathInfo> getFilesToRead(
case COPY_ON_WRITE:
return metadata.getFileIdToInfo(basePath);
case MERGE_ON_READ:
return metadata.getFullPathToInfo(new HoodieHadoopStorage(basePath, hadoopConf), basePath);
return metadata.getFullPathToInfo(new HoodieHadoopStorage(basePath, hadoopConf), basePath, "");
default:
throw new AssertionError();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,7 @@ protected List<FileStatus> listStatusForIncrementalMode(JobConf job,

// build fileGroup from fsView
List<StoragePathInfo> affectedPathInfoList = HoodieInputFormatUtils
.listAffectedFilesForCommits(job, tableMetaClient.getBasePathV2(),
metadataList);
.listAffectedFilesForCommits(job, tableMetaClient.getBasePathV2(), metadataList, "");
// step3
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(
tableMetaClient, commitsTimelineToReturn, affectedPathInfoList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,15 +520,13 @@ private static HoodieBaseFile refreshFileStatus(Configuration conf, HoodieBaseFi
* @param metadataList The metadata list to read the data from
* @return the affected file status array
*/
public static List<StoragePathInfo> listAffectedFilesForCommits(Configuration hadoopConf,
StoragePath basePath,
List<HoodieCommitMetadata> metadataList) {
public static List<StoragePathInfo> listAffectedFilesForCommits(Configuration hadoopConf, StoragePath basePath, List<HoodieCommitMetadata> metadataList, String fileNamePattern) {
// TODO: Use HoodieMetaTable to extract affected file directly.
HashMap<String, StoragePathInfo> fullPathToInfoMap = new HashMap<>();
HoodieStorage storage = new HoodieHadoopStorage(basePath, HadoopFSUtils.getStorageConf(hadoopConf));
// Iterate through the given commits.
for (HoodieCommitMetadata metadata : metadataList) {
fullPathToInfoMap.putAll(metadata.getFullPathToInfo(storage, basePath.toString()));
fullPathToInfoMap.putAll(metadata.getFullPathToInfo(storage, basePath.toString(), fileNamePattern));
}
return new ArrayList<>(fullPathToInfoMap.values());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ object DataSourceReadOptions {
.withDocumentation("For the use-cases like users only want to incremental pull from certain partitions "
+ "instead of the full table. This option allows using glob pattern to directly filter on path.")

val INCR_PARTITION_GLOB: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.incr.partition.glob")
.defaultValue("")
.withDocumentation("For the use-cases like users only want to incremental pull from certain partitions "
+ "instead of the full table. This option allows using glob pattern to directly filter on partition.")

val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = HoodieCommonConfig.TIMESTAMP_AS_OF

val ENABLE_DATA_SKIPPING: ConfigProperty[Boolean] = ConfigProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,14 @@ case class MergeOnReadIncrementalRelation(override val sqlContext: SQLContext,
metaClient, timeline, affectedFilesInCommits)

val modifiedPartitions = getWritePartitionPaths(commitsMetadata)
val globMatcher = new GlobPattern("*" + globPartitionPattern)

modifiedPartitions.asScala.flatMap { relativePartitionPath =>
modifiedPartitions.asScala.filter(p => globMatcher.matches(p)).flatMap { relativePartitionPath =>
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala
}.toSeq
}

buildSplits(filterFileSlices(fileSlices, globPattern))
buildSplits(fileSlices)
}
}

Expand Down Expand Up @@ -181,7 +182,7 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
protected lazy val commitsMetadata = includedCommits.map(getCommitMetadata(_, super.timeline)).asJava

protected lazy val affectedFilesInCommits: java.util.List[StoragePathInfo] = {
listAffectedFilesForCommits(conf, metaClient.getBasePathV2, commitsMetadata)
listAffectedFilesForCommits(conf, metaClient.getBasePathV2, commitsMetadata, "*" + globPathPattern)
}

protected lazy val (includeStartTime, startTs) = if (startInstantArchived) {
Expand Down Expand Up @@ -233,8 +234,10 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
}
}

protected def globPattern: String =
protected def globPathPattern: String =
optParams.getOrElse(DataSourceReadOptions.INCR_PATH_GLOB.key, DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)

protected def globPartitionPattern: String =
optParams.getOrElse(DataSourceReadOptions.INCR_PARTITION_GLOB.key, DataSourceReadOptions.INCR_PARTITION_GLOB.defaultValue)
}

Loading