Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Iceberg] Add manifest file caching for HMS-based deployments #24481

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.airlift.log.Logger;
import org.apache.iceberg.io.ByteBufferInputStream;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SeekableInputStream;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.io.IOUtil.readRemaining;

public class HdfsCachedInputFile
implements InputFile
{
private static final Logger LOG = Logger.get(HdfsCachedInputFile.class);

private final InputFile delegate;
private final ManifestFileCacheKey cacheKey;
private final ManifestFileCache cache;

public HdfsCachedInputFile(InputFile delegate, ManifestFileCacheKey cacheKey, ManifestFileCache cache)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.cacheKey = requireNonNull(cacheKey, "cacheKey is null");
this.cache = requireNonNull(cache, "cache is null");
}

@Override
public long getLength()
{
ManifestFileCachedContent cachedContent = cache.getIfPresent(cacheKey);
if (cachedContent != null) {
return cachedContent.getLength();
}
return delegate.getLength();
}

@Override
public SeekableInputStream newStream()
{
ManifestFileCachedContent cachedContent = cache.getIfPresent(cacheKey);
if (cachedContent != null) {
return ByteBufferInputStream.wrap(cachedContent.getData());
}

long fileLength = delegate.getLength();
if (fileLength <= cache.getMaxFileLength() && cache.isEnabled()) {
try {
ManifestFileCachedContent content = readFully(delegate, fileLength, cache.getBufferChunkSize());
cache.put(cacheKey, content);
cache.recordFileSize(content.getLength());
return ByteBufferInputStream.wrap(content.getData());
}
catch (IOException e) {
LOG.warn("Failed to cache input file. Falling back to direct read.", e);
}
}

return delegate.newStream();
}

@Override
public String location()
{
return delegate.location();
}

@Override
public boolean exists()
{
return cache.getIfPresent(cacheKey) != null || delegate.exists();
}

private static ManifestFileCachedContent readFully(InputFile input, long fileLength, long chunkSize)
throws IOException
{
try (SeekableInputStream stream = input.newStream()) {
long totalBytesToRead = fileLength;
List<ByteBuffer> buffers = new ArrayList<>(
((int) (fileLength / chunkSize)) +
(fileLength % chunkSize == 0 ? 0 : 1));

while (totalBytesToRead > 0) {
int bytesToRead = (int) Math.min(chunkSize, totalBytesToRead);
byte[] buf = new byte[bytesToRead];
int bytesRead = readRemaining(stream, buf, 0, bytesToRead);
totalBytesToRead -= bytesRead;

if (bytesRead < bytesToRead) {
throw new IOException(
String.format(
"Failed to read %d bytes: %d bytes in stream",
fileLength, fileLength - totalBytesToRead));
}
else {
buffers.add(ByteBuffer.wrap(buf));
}
}
return new ManifestFileCachedContent(buffers, fileLength);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,30 @@
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.spi.PrestoException;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;

import java.io.IOException;
import java.util.Optional;

import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class HdfsFileIO
implements FileIO
{
private final HdfsEnvironment environment;
private final HdfsContext context;
private final ManifestFileCache manifestFileCache;

public HdfsFileIO(HdfsEnvironment environment, HdfsContext context)
public HdfsFileIO(ManifestFileCache manifestFileCache, HdfsEnvironment environment, HdfsContext context)
{
this.environment = requireNonNull(environment, "environment is null");
this.context = requireNonNull(context, "context is null");
this.manifestFileCache = requireNonNull(manifestFileCache, "manifestFileCache is null");
}

@Override
Expand All @@ -44,6 +49,33 @@ public InputFile newInputFile(String path)
return new HdfsInputFile(new Path(path), environment, context);
}

@Override
public InputFile newInputFile(String path, long length)
{
return new HdfsInputFile(new Path(path), environment, context, Optional.of(length));
}

protected InputFile newCachedInputFile(String path)
{
InputFile inputFile = new HdfsInputFile(new Path(path), environment, context);
return manifestFileCache.isEnabled() ?
new HdfsCachedInputFile(inputFile, new ManifestFileCacheKey(path), manifestFileCache) :
inputFile;
}

@Override
public InputFile newInputFile(ManifestFile manifest)
{
checkArgument(
manifest.keyMetadata() == null,
"Cannot decrypt manifest: {} (use EncryptingFileIO)",
manifest.path());
InputFile inputFile = new HdfsInputFile(new Path(manifest.path()), environment, context, Optional.of(manifest.length()));
return manifestFileCache.isEnabled() ?
new HdfsCachedInputFile(inputFile, new ManifestFileCacheKey(manifest.path()), manifestFileCache) :
inputFile;
}

@Override
public OutputFile newOutputFile(String path)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.iceberg.io.SeekableInputStream;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static java.util.Objects.requireNonNull;
Expand All @@ -32,25 +34,42 @@ public class HdfsInputFile
private final InputFile delegate;
private final HdfsEnvironment environment;
private final String user;
private final AtomicLong length;

public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context)
public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context, Optional<Long> length)
{
requireNonNull(path, "path is null");
this.environment = requireNonNull(environment, "environment is null");
this.length = new AtomicLong(length.orElse(-1L));
requireNonNull(context, "context is null");
try {
this.delegate = HadoopInputFile.fromPath(path, environment.getFileSystem(context, path), environment.getConfiguration(context, path));
if (this.length.get() < 0) {
this.delegate = HadoopInputFile.fromPath(path, environment.getFileSystem(context, path), environment.getConfiguration(context, path));
}
else {
this.delegate = HadoopInputFile.fromPath(path, this.length.get(), environment.getFileSystem(context, path), environment.getConfiguration(context, path));
}
}
catch (IOException e) {
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "Failed to create input file: " + path, e);
}
this.user = context.getIdentity().getUser();
}

public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context)
{
this(path, environment, context, Optional.empty());
}

@Override
public long getLength()
{
return environment.doAs(user, delegate::getLength);
return length.updateAndGet(value -> {
if (value < 0) {
return environment.doAs(user, delegate::getLength);
}
return value;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class HiveTableOperations
private final String tableName;
private final Optional<String> owner;
private final Optional<String> location;
private final FileIO fileIO;
private final HdfsFileIO fileIO;
private final IcebergHiveTableOperationsConfig config;

private TableMetadata currentMetadata;
Expand All @@ -121,10 +121,11 @@ public HiveTableOperations(
HdfsEnvironment hdfsEnvironment,
HdfsContext hdfsContext,
IcebergHiveTableOperationsConfig config,
ManifestFileCache manifestFileCache,
String database,
String table)
{
this(new HdfsFileIO(hdfsEnvironment, hdfsContext),
this(new HdfsFileIO(manifestFileCache, hdfsEnvironment, hdfsContext),
metastore,
metastoreContext,
config,
Expand All @@ -140,12 +141,13 @@ public HiveTableOperations(
HdfsEnvironment hdfsEnvironment,
HdfsContext hdfsContext,
IcebergHiveTableOperationsConfig config,
ManifestFileCache manifestFileCache,
String database,
String table,
String owner,
String location)
{
this(new HdfsFileIO(hdfsEnvironment, hdfsContext),
this(new HdfsFileIO(manifestFileCache, hdfsEnvironment, hdfsContext),
metastore,
metastoreContext,
config,
Expand All @@ -156,7 +158,7 @@ public HiveTableOperations(
}

private HiveTableOperations(
FileIO fileIO,
HdfsFileIO fileIO,
ExtendedHiveMetastore metastore,
MetastoreContext metastoreContext,
IcebergHiveTableOperationsConfig config,
Expand Down Expand Up @@ -409,7 +411,7 @@ private void refreshFromMetadataLocation(String newLocation)
config.getTableRefreshMaxRetryTime().toMillis(),
config.getTableRefreshBackoffScaleFactor())
.run(metadataLocation -> newMetadata.set(
TableMetadataParser.read(fileIO, io().newInputFile(metadataLocation))));
TableMetadataParser.read(fileIO, fileIO.newCachedInputFile(metadataLocation))));
}
catch (RuntimeException e) {
throw new TableNotFoundException(getSchemaTableName(), "Table metadata is missing", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@

import javax.inject.Singleton;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -208,6 +210,25 @@ public StatisticsFileCache createStatisticsFileCache(IcebergConfig config, MBean
return statisticsFileCache;
}

@Singleton
@Provides
public ManifestFileCache createManifestFileCache(IcebergConfig config, MBeanExporter exporter)
{
Cache<ManifestFileCacheKey, ManifestFileCachedContent> delegate = CacheBuilder.newBuilder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just use Caffeine as the caching library since iceberg-core already brings it in ? It appears to have better performance and is recommended by the Guava team too

Copy link
Contributor Author

@ZacBlanco ZacBlanco Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same thought too. Caching performance would likely improve too because eviction decisions in caffeine use global weight for eviction versus rather than per-segment weight in guava. However, most of the Presto codebase uses guava caches. Since caffeine and guava are different types, it would not be compatible with the current infrastructure such as the CacheStatsMBean object. Additionally, we use use guava's SimpleForwardingCache which is not available in caffeine, so I would have to roll my own. Not a terrible amount of effort, but I think there's enough work there to push that effort into a separate PR

.maximumWeight(config.getMaxManifestCacheSize())
.<ManifestFileCacheKey, ManifestFileCachedContent>weigher((key, entry) -> (int) entry.getData().stream().mapToLong(ByteBuffer::capacity).sum())
.expireAfterWrite(Duration.ofMillis(config.getManifestCacheExpireDuration()))
.recordStats()
.build();
ManifestFileCache manifestFileCache = new ManifestFileCache(
delegate,
config.getManifestCachingEnabled(),
config.getManifestCacheMaxContentLength(),
config.getManifestCacheMaxChunkSize().toBytes());
exporter.export(generatedNameOf(ManifestFileCache.class, connectorId), manifestFileCache);
return manifestFileCache;
}

@ForCachingHiveMetastore
@Singleton
@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ public class IcebergConfig

private EnumSet<ColumnStatisticType> hiveStatisticsMergeFlags = EnumSet.noneOf(ColumnStatisticType.class);
private String fileIOImpl = HadoopFileIO.class.getName();
private boolean manifestCachingEnabled;
private boolean manifestCachingEnabled = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intended?

Copy link
Contributor Author

@ZacBlanco ZacBlanco Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is intentional. Performance is significantly worse with it disabled, and I don't think there are any known downsides to making this enabled by default other than an increased memory footprint

private long maxManifestCacheSize = IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT;
private long manifestCacheExpireDuration = IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
private long manifestCacheMaxContentLength = IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;
private DataSize manifestCacheMaxChunkSize = succinctDataSize(2, MEGABYTE);
private int splitManagerThreads = Runtime.getRuntime().availableProcessors();
private DataSize maxStatisticsFileCacheSize = succinctDataSize(256, MEGABYTE);

Expand Down Expand Up @@ -348,6 +349,20 @@ public IcebergConfig setManifestCacheMaxContentLength(long manifestCacheMaxConte
return this;
}

public DataSize getManifestCacheMaxChunkSize()
{
return manifestCacheMaxChunkSize;
}

@Min(1024)
@Config("iceberg.io.manifest.cache.max-chunk-size")
@ConfigDescription("Maximum length of a buffer used to cache manifest file content. Only applicable to HIVE catalog.")
public IcebergConfig setManifestCacheMaxChunkSize(DataSize manifestCacheMaxChunkSize)
{
this.manifestCacheMaxChunkSize = manifestCacheMaxChunkSize;
return this;
}

@Min(0)
public int getSplitManagerThreads()
{
Expand Down
Loading
Loading