Skip to content

Commit

Permalink
Add manifest caching to iceberg connector
Browse files Browse the repository at this point in the history
  • Loading branch information
ZacBlanco committed Feb 3, 2025
1 parent a8a6ffd commit 7db7896
Show file tree
Hide file tree
Showing 16 changed files with 352 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.airlift.log.Logger;
import org.apache.iceberg.io.ByteBufferInputStream;
import org.apache.iceberg.io.IOUtil;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SeekableInputStream;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

public class HdfsCachedInputFile
implements InputFile
{
private static final Logger LOG = Logger.get(HdfsCachedInputFile.class);
private static final long BUFFER_CHUNK_SIZE = 2 * 1024 * 1024;

private final InputFile delegate;
private final ManifestFileCacheKey cacheKey;
private final ManifestFileCache cache;

public HdfsCachedInputFile(InputFile delegate, ManifestFileCacheKey cacheKey, ManifestFileCache cache)
{
this.delegate = delegate;
this.cacheKey = cacheKey;
this.cache = cache;
}

@Override
public long getLength()
{
ManifestFileCachedContent cachedContent = cache.getIfPresent(cacheKey);
if (cachedContent != null) {
return cachedContent.getLength();
}
return delegate.getLength();
}

@Override
public SeekableInputStream newStream()
{
ManifestFileCachedContent cachedContent = cache.getIfPresent(cacheKey);
if (cachedContent != null) {
return ByteBufferInputStream.wrap(cachedContent.getData());
}

if (delegate.getLength() <= cache.getMaxFileLength() && cache.isEnabled()) {
try {
ManifestFileCachedContent content = downloadManifest(delegate);
cache.put(cacheKey, content);
cache.recordFileSize(content.getLength());
return ByteBufferInputStream.wrap(content.getData());
}
catch (IOException e) {
LOG.warn("Failed to cache input file. Falling back to direct read.", e);
}
}

return delegate.newStream();
}

@Override
public String location()
{
return delegate.location();
}

@Override
public boolean exists()
{
return false;
}

private static ManifestFileCachedContent downloadManifest(InputFile input)
throws IOException
{
try (SeekableInputStream stream = input.newStream()) {
long fileLength = input.getLength();
long totalBytesToRead = fileLength;
List<ByteBuffer> buffers = new ArrayList<>(((int) (fileLength / BUFFER_CHUNK_SIZE)) + (fileLength % BUFFER_CHUNK_SIZE == 0 ? 0 : 1));

while (totalBytesToRead > 0) {
int bytesToRead = (int) Math.min(BUFFER_CHUNK_SIZE, totalBytesToRead);
byte[] buf = new byte[bytesToRead];
int bytesRead = IOUtil.readRemaining(stream, buf, 0, bytesToRead);
totalBytesToRead -= bytesRead;

if (bytesRead < bytesToRead) {
throw new IOException(
String.format(
"Failed to read %d bytes: %d bytes in stream",
fileLength, fileLength - totalBytesToRead));
}
else {
buffers.add(ByteBuffer.wrap(buf));
}
}
return new ManifestFileCachedContent(buffers, fileLength);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,19 @@ public class HdfsFileIO
{
private final HdfsEnvironment environment;
private final HdfsContext context;
private final ManifestFileCache manifestFileCache;

public HdfsFileIO(HdfsEnvironment environment, HdfsContext context)
public HdfsFileIO(ManifestFileCache manifestFileCache, HdfsEnvironment environment, HdfsContext context)
{
this.environment = requireNonNull(environment, "environment is null");
this.context = requireNonNull(context, "context is null");
this.manifestFileCache = requireNonNull(manifestFileCache, "manifestFileCache is null");
}

@Override
public InputFile newInputFile(String path)
{
return new HdfsInputFile(new Path(path), environment, context);
return new HdfsCachedInputFile(new HdfsInputFile(new Path(path), environment, context), new ManifestFileCacheKey(path), manifestFileCache);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@ public HiveTableOperations(
HdfsEnvironment hdfsEnvironment,
HdfsContext hdfsContext,
IcebergHiveTableOperationsConfig config,
ManifestFileCache manifestFileCache,
String database,
String table)
{
this(new HdfsFileIO(hdfsEnvironment, hdfsContext),
this(new HdfsFileIO(manifestFileCache, hdfsEnvironment, hdfsContext),
metastore,
metastoreContext,
config,
Expand All @@ -140,12 +141,13 @@ public HiveTableOperations(
HdfsEnvironment hdfsEnvironment,
HdfsContext hdfsContext,
IcebergHiveTableOperationsConfig config,
ManifestFileCache manifestFileCache,
String database,
String table,
String owner,
String location)
{
this(new HdfsFileIO(hdfsEnvironment, hdfsContext),
this(new HdfsFileIO(manifestFileCache, hdfsEnvironment, hdfsContext),
metastore,
metastoreContext,
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@

import javax.inject.Singleton;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -200,6 +202,21 @@ public StatisticsFileCache createStatisticsFileCache(IcebergConfig config, MBean
return new StatisticsFileCache(delegate);
}

@Singleton
@Provides
public ManifestFileCache createManifestFileCache(IcebergConfig config, MBeanExporter exporter)
{
Cache<ManifestFileCacheKey, ManifestFileCachedContent> delegate = CacheBuilder.newBuilder()
.maximumWeight(config.getManifestCachingEnabled() ? config.getMaxManifestCacheSize() : 0)
.<ManifestFileCacheKey, ManifestFileCachedContent>weigher((key, entry) -> (int) entry.getData().stream().mapToLong(ByteBuffer::capacity).sum())
.expireAfterWrite(Duration.ofMillis(config.getManifestCacheExpireDuration()))
.recordStats()
.build();
ManifestFileCache manifestFileCache = new ManifestFileCache(delegate, config.getManifestCachingEnabled(), config.getManifestCacheMaxContentLength());
exporter.export(generatedNameOf(ManifestFileCache.class, connectorId), manifestFileCache);
return manifestFileCache;
}

@ForCachingHiveMetastore
@Singleton
@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class IcebergConfig

private EnumSet<ColumnStatisticType> hiveStatisticsMergeFlags = EnumSet.noneOf(ColumnStatisticType.class);
private String fileIOImpl = HadoopFileIO.class.getName();
private boolean manifestCachingEnabled;
private boolean manifestCachingEnabled = true;
private long maxManifestCacheSize = IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT;
private long manifestCacheExpireDuration = IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
private long manifestCacheMaxContentLength = IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public class IcebergHiveMetadata
private final DateTimeZone timeZone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(ZoneId.of(TimeZone.getDefault().getID())));
private final IcebergHiveTableOperationsConfig hiveTableOeprationsConfig;
private final Cache<SchemaTableName, Optional<Table>> tableCache;
private final ManifestFileCache manifestFileCache;

public IcebergHiveMetadata(
ExtendedHiveMetastore metastore,
Expand All @@ -169,14 +170,17 @@ public IcebergHiveMetadata(
JsonCodec<CommitTaskData> commitTaskCodec,
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService,
IcebergConfig icebergConfig,
IcebergHiveTableOperationsConfig hiveTableOeprationsConfig,
StatisticsFileCache statisticsFileCache)
StatisticsFileCache statisticsFileCache,
ManifestFileCache manifestFileCache)
{
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache);
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.hiveTableOeprationsConfig = requireNonNull(hiveTableOeprationsConfig, "hiveTableOperationsConfig is null");
this.tableCache = CacheBuilder.newBuilder().maximumSize(MAXIMUM_PER_QUERY_TABLE_CACHE_SIZE).build();
this.manifestFileCache = requireNonNull(manifestFileCache, "manifestFileCache is null");
}

public ExtendedHiveMetastore getMetastore()
Expand All @@ -187,7 +191,7 @@ public ExtendedHiveMetastore getMetastore()
@Override
protected org.apache.iceberg.Table getRawIcebergTable(ConnectorSession session, SchemaTableName schemaTableName)
{
return getHiveIcebergTable(metastore, hdfsEnvironment, hiveTableOeprationsConfig, session, schemaTableName);
return getHiveIcebergTable(metastore, hdfsEnvironment, hiveTableOeprationsConfig, manifestFileCache, session, schemaTableName);
}

@Override
Expand Down Expand Up @@ -331,6 +335,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
hdfsEnvironment,
hdfsContext,
hiveTableOeprationsConfig,
manifestFileCache,
schemaName,
tableName,
session.getUser(),
Expand Down Expand Up @@ -610,7 +615,7 @@ public void registerTable(ConnectorSession clientSession, SchemaTableName schema
InputFile inputFile = new HdfsInputFile(metadataLocation, hdfsEnvironment, hdfsContext);
TableMetadata tableMetadata;
try {
tableMetadata = TableMetadataParser.read(new HdfsFileIO(hdfsEnvironment, hdfsContext), inputFile);
tableMetadata = TableMetadataParser.read(new HdfsFileIO(manifestFileCache, hdfsEnvironment, hdfsContext), inputFile);
}
catch (Exception e) {
throw new PrestoException(ICEBERG_INVALID_METADATA, String.format("Unable to read metadata file %s", metadataLocation), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ public class IcebergHiveMetadataFactory
final RowExpressionService rowExpressionService;
final NodeVersion nodeVersion;
final FilterStatsCalculatorService filterStatsCalculatorService;
final IcebergConfig icebergConfig;
final IcebergHiveTableOperationsConfig operationsConfig;
final StatisticsFileCache statisticsFileCache;
final ManifestFileCache manifestFileCache;

@Inject
public IcebergHiveMetadataFactory(
Expand All @@ -52,8 +54,10 @@ public IcebergHiveMetadataFactory(
JsonCodec<CommitTaskData> commitTaskCodec,
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService,
IcebergConfig icebergConfig,
IcebergHiveTableOperationsConfig operationsConfig,
StatisticsFileCache statisticsFileCache)
StatisticsFileCache statisticsFileCache,
ManifestFileCache manifestFileCache)
{
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
Expand All @@ -63,8 +67,10 @@ public IcebergHiveMetadataFactory(
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.icebergConfig = requireNonNull(icebergConfig, "icebergConfig is null");
this.operationsConfig = requireNonNull(operationsConfig, "operationsConfig is null");
this.statisticsFileCache = requireNonNull(statisticsFileCache, "statisticsFileCache is null");
this.manifestFileCache = requireNonNull(manifestFileCache, "manifestFileCache is null");
}

public ConnectorMetadata create()
Expand All @@ -78,7 +84,9 @@ public ConnectorMetadata create()
commitTaskCodec,
nodeVersion,
filterStatsCalculatorService,
icebergConfig,
operationsConfig,
statisticsFileCache);
statisticsFileCache,
manifestFileCache);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ protected Map<String, String> getProperties(ConnectorSession session)
{
Map<String, String> properties = new HashMap<>();
if (icebergConfig.getManifestCachingEnabled()) {
loadCachingProperties(properties, icebergConfig);
properties.putAll(loadCachingProperties(icebergConfig));
}
if (icebergConfig.getFileIOImpl() != null) {
properties.put(FILE_IO_IMPL, icebergConfig.getFileIOImpl());
Expand Down
Loading

0 comments on commit 7db7896

Please sign in to comment.