Skip to content

Commit

Permalink
[server][dvc] support hybrid store in blob transfer with recreating s…
Browse files Browse the repository at this point in the history
…napshots (#1240)
  • Loading branch information
jingy-li authored Oct 21, 2024
1 parent 911f9be commit 5a6fb94
Show file tree
Hide file tree
Showing 18 changed files with 798 additions and 312 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,11 @@ public DaVinciBackend(
configLoader.getVeniceServerConfig().getDvcP2pBlobTransferClientPort(),
configLoader.getVeniceServerConfig().getRocksDBPath(),
clientConfig,
storageMetadataService);
storageMetadataService,
readOnlyStoreRepository,
storageService.getStorageEngineRepository(),
backendConfig.getMaxConcurrentSnapshotUser(),
backendConfig.getSnapshotRetentionTimeInMin());
} else {
blobTransferManager = null;
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ public class BlobTransferPayload {
private final int partition;
private final String topicName;
private final String partitionDir;
private final String storeName;

public BlobTransferPayload(String baseDir, String storeName, int version, int partition) {
this.partition = partition;
this.storeName = storeName;
this.topicName = storeName + "_v" + version;
this.partitionDir = composePartitionDbDir(baseDir, topicName, partition);
}
Expand All @@ -39,4 +41,8 @@ public String getTopicName() {
public int getPartition() {
return partition;
}

public String getStoreName() {
return storeName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

import com.linkedin.davinci.blobtransfer.client.NettyFileTransferClient;
import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.venice.blobtransfer.DaVinciBlobFinder;
import com.linkedin.venice.blobtransfer.ServerBlobFinder;
import com.linkedin.venice.client.store.AbstractAvroStoreClient;
import com.linkedin.venice.client.store.AvroGenericStoreClientImpl;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import java.util.concurrent.CompletableFuture;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -32,26 +34,44 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
int p2pTransferPort,
String baseDir,
ClientConfig clientConfig,
StorageMetadataService storageMetadataService) {
StorageMetadataService storageMetadataService,
ReadOnlyStoreRepository readOnlyStoreRepository,
StorageEngineRepository storageEngineRepository,
int maxConcurrentSnapshotUser,
int snapshotRetentionTimeInMin) {
return getP2PBlobTransferManagerForDVCAndStart(
p2pTransferPort,
p2pTransferPort,
baseDir,
clientConfig,
storageMetadataService);
storageMetadataService,
readOnlyStoreRepository,
storageEngineRepository,
maxConcurrentSnapshotUser,
snapshotRetentionTimeInMin);
}

public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
int p2pTransferServerPort,
int p2pTransferClientPort,
String baseDir,
ClientConfig clientConfig,
StorageMetadataService storageMetadataService) {
StorageMetadataService storageMetadataService,
ReadOnlyStoreRepository readOnlyStoreRepository,
StorageEngineRepository storageEngineRepository,
int maxConcurrentSnapshotUser,
int snapshotRetentionTimeInMin) {
try {
BlobSnapshotManager blobSnapshotManager = new BlobSnapshotManager(
readOnlyStoreRepository,
storageEngineRepository,
storageMetadataService,
maxConcurrentSnapshotUser,
snapshotRetentionTimeInMin);
AbstractAvroStoreClient storeClient =
new AvroGenericStoreClientImpl<>(getTransportClient(clientConfig), false, clientConfig);
BlobTransferManager<Void> manager = new NettyP2PBlobTransferManager(
new P2PBlobTransferService(p2pTransferServerPort, baseDir, storageMetadataService),
new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobSnapshotManager),
new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService),
new DaVinciBlobFinder(storeClient));
manager.start();
Expand All @@ -76,10 +96,20 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForServerAndSta
int p2pTransferClientPort,
String baseDir,
CompletableFuture<HelixCustomizedViewOfflinePushRepository> customizedViewFuture,
StorageMetadataService storageMetadataService) {
StorageMetadataService storageMetadataService,
ReadOnlyStoreRepository readOnlyStoreRepository,
StorageEngineRepository storageEngineRepository,
int maxConcurrentSnapshotUser,
int snapshotRetentionTimeInMin) {
try {
BlobSnapshotManager blobSnapshotManager = new BlobSnapshotManager(
readOnlyStoreRepository,
storageEngineRepository,
storageMetadataService,
maxConcurrentSnapshotUser,
snapshotRetentionTimeInMin);
BlobTransferManager<Void> manager = new NettyP2PBlobTransferManager(
new P2PBlobTransferService(p2pTransferServerPort, baseDir, storageMetadataService),
new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobSnapshotManager),
new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService),
new ServerBlobFinder(customizedViewFuture));
manager.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,17 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex
Path partitionDir = Paths.get(payload.getPartitionDir());
Files.createDirectories(partitionDir);

// Prepare the file
// Prepare the file, remove it if it exists
if (Files.deleteIfExists(partitionDir.resolve(fileName))) {
LOGGER.debug(
"File {} already exists for topic {} partition {}. Overwriting it.",
fileName,
payload.getTopicName(),
payload.getPartition());
}

Path file = Files.createFile(partitionDir.resolve(fileName));

outputFileChannel = FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.APPEND);

} else if (msg instanceof HttpContent) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.davinci.blobtransfer.server;

import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.blobtransfer.BlobSnapshotManager;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
Expand All @@ -12,11 +12,11 @@

public class BlobTransferNettyChannelInitializer extends ChannelInitializer<SocketChannel> {
private final String baseDir;
private StorageMetadataService storageMetadataService;
private BlobSnapshotManager blobSnapshotManager;

public BlobTransferNettyChannelInitializer(String baseDir, StorageMetadataService storageMetadataService) {
public BlobTransferNettyChannelInitializer(String baseDir, BlobSnapshotManager blobSnapshotManager) {
this.baseDir = baseDir;
this.storageMetadataService = storageMetadataService;
this.blobSnapshotManager = blobSnapshotManager;
}

@Override
Expand All @@ -32,6 +32,6 @@ protected void initChannel(SocketChannel ch) throws Exception {
// for safe writing of chunks for responses
.addLast("chunker", new ChunkedWriteHandler())
// for handling p2p file transfer
.addLast("p2pFileTransferHandler", new P2PFileTransferServerHandler(baseDir, storageMetadataService));
.addLast("p2pFileTransferHandler", new P2PFileTransferServerHandler(baseDir, blobSnapshotManager));
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.davinci.blobtransfer.server;

import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.blobtransfer.BlobSnapshotManager;
import com.linkedin.venice.service.AbstractVeniceService;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -31,7 +31,7 @@ public class P2PBlobTransferService extends AbstractVeniceService {
// TODO 5: add compression support
// TODO 6: consider either increasing worker threads or have a dedicated thread pool to handle requests.

public P2PBlobTransferService(int port, String baseDir, StorageMetadataService storageMetadataService) {
public P2PBlobTransferService(int port, String baseDir, BlobSnapshotManager blobSnapshotManager) {
this.port = port;
this.serverBootstrap = new ServerBootstrap();

Expand All @@ -48,7 +48,7 @@ public P2PBlobTransferService(int port, String baseDir, StorageMetadataService s

serverBootstrap.group(bossGroup, workerGroup)
.channel(socketChannelClass)
.childHandler(new BlobTransferNettyChannelInitializer(baseDir, storageMetadataService))
.childHandler(new BlobTransferNettyChannelInitializer(baseDir, blobSnapshotManager))
.option(ChannelOption.SO_BACKLOG, 1000)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,10 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.davinci.blobtransfer.BlobSnapshotManager;
import com.linkedin.davinci.blobtransfer.BlobTransferPartitionMetadata;
import com.linkedin.davinci.blobtransfer.BlobTransferPayload;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.request.RequestHelper;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.utils.ObjectMapperFactory;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
Expand All @@ -44,7 +40,6 @@
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.URI;
import java.nio.ByteBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -56,15 +51,14 @@
@ChannelHandler.Sharable
public class P2PFileTransferServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final Logger LOGGER = LogManager.getLogger(P2PFileTransferServerHandler.class);
private static final InternalAvroSpecificSerializer<StoreVersionState> storeVersionStateSerializer =
AvroProtocolDefinition.STORE_VERSION_STATE.getSerializer();

private boolean useZeroCopy = false;
private final String baseDir;
private StorageMetadataService storageMetadataService;
private BlobSnapshotManager blobSnapshotManager;

public P2PFileTransferServerHandler(String baseDir, StorageMetadataService storageMetadataService) {
public P2PFileTransferServerHandler(String baseDir, BlobSnapshotManager blobSnapshotManager) {
this.baseDir = baseDir;
this.storageMetadataService = storageMetadataService;
this.blobSnapshotManager = blobSnapshotManager;
}

@Override
Expand Down Expand Up @@ -99,52 +93,67 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest httpReque
ctx);
return;
}
final BlobTransferPayload blobTransferRequest;
final File snapshotDir;
BlobTransferPayload blobTransferRequest = null;
try {
blobTransferRequest = parseBlobTransferPayload(URI.create(httpRequest.uri()));
snapshotDir = new File(blobTransferRequest.getSnapshotDir());
if (!snapshotDir.exists() || !snapshotDir.isDirectory()) {
byte[] errBody = ("Snapshot for " + blobTransferRequest.getFullResourceName() + " doesn't exist").getBytes();
setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, errBody, false, ctx);
final File snapshotDir;
BlobTransferPartitionMetadata transferPartitionMetadata;

try {
blobTransferRequest = parseBlobTransferPayload(URI.create(httpRequest.uri()));
snapshotDir = new File(blobTransferRequest.getSnapshotDir());
try {
transferPartitionMetadata = blobSnapshotManager.getTransferMetadata(blobTransferRequest);
} catch (Exception e) {
setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, e.getMessage().getBytes(), false, ctx);
return;
}

if (!snapshotDir.exists() || !snapshotDir.isDirectory()) {
byte[] errBody = ("Snapshot for " + blobTransferRequest.getFullResourceName() + " doesn't exist").getBytes();
setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, errBody, false, ctx);
return;
}
} catch (IllegalArgumentException e) {
setupResponseAndFlush(HttpResponseStatus.BAD_REQUEST, e.getMessage().getBytes(), false, ctx);
return;
} catch (SecurityException e) {
setupResponseAndFlush(HttpResponseStatus.FORBIDDEN, e.getMessage().getBytes(), false, ctx);
return;
}
} catch (IllegalArgumentException e) {
setupResponseAndFlush(HttpResponseStatus.BAD_REQUEST, e.getMessage().getBytes(), false, ctx);
return;
} catch (SecurityException e) {
setupResponseAndFlush(HttpResponseStatus.FORBIDDEN, e.getMessage().getBytes(), false, ctx);
return;
}

File[] files = snapshotDir.listFiles();
if (files == null || files.length == 0) {
setupResponseAndFlush(
HttpResponseStatus.INTERNAL_SERVER_ERROR,
("Failed to access files at " + snapshotDir).getBytes(),
false,
ctx);
return;
}
File[] files = snapshotDir.listFiles();
if (files == null || files.length == 0) {
setupResponseAndFlush(
HttpResponseStatus.INTERNAL_SERVER_ERROR,
("Failed to access files at " + snapshotDir).getBytes(),
false,
ctx);
return;
}

// transfer files
for (File file: files) {
sendFile(file, ctx);
}
// transfer files
for (File file: files) {
sendFile(file, ctx);
}

// transfer metadata
sendMetadata(blobTransferRequest, ctx);
sendMetadata(ctx, transferPartitionMetadata);

// end of transfer
HttpResponse endOfTransfer = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
endOfTransfer.headers().set(BLOB_TRANSFER_STATUS, BLOB_TRANSFER_COMPLETED);
ctx.writeAndFlush(endOfTransfer).addListener(future -> {
if (future.isSuccess()) {
LOGGER.debug("All files sent successfully for {}", blobTransferRequest.getFullResourceName());
} else {
LOGGER.error("Failed to send all files for {}", blobTransferRequest.getFullResourceName(), future.cause());
// end of transfer
HttpResponse endOfTransfer = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
endOfTransfer.headers().set(BLOB_TRANSFER_STATUS, BLOB_TRANSFER_COMPLETED);
String fullResourceName = blobTransferRequest.getFullResourceName();
ctx.writeAndFlush(endOfTransfer).addListener(future -> {
if (future.isSuccess()) {
LOGGER.debug("All files sent successfully for {}", fullResourceName);
} else {
LOGGER.error("Failed to send all files for {}", fullResourceName, future.cause());
}
});
} finally {
if (blobTransferRequest != null) {
blobSnapshotManager.decreaseConcurrentUserCount(blobTransferRequest);
}
});
}
}

/**
Expand Down Expand Up @@ -210,30 +219,14 @@ private void sendFile(File file, ChannelHandlerContext ctx) throws IOException {
});
}

public void sendMetadata(BlobTransferPayload blobTransferRequest, ChannelHandlerContext ctx)
/**
* Send metadata for the given blob transfer request
* @param ctx the channel context
* @param metadata the metadata to be sent
* @throws JsonProcessingException
*/
public void sendMetadata(ChannelHandlerContext ctx, BlobTransferPartitionMetadata metadata)
throws JsonProcessingException {
// prepare metadata
BlobTransferPartitionMetadata metadata = null;
try {
StoreVersionState storeVersionState =
storageMetadataService.getStoreVersionState(blobTransferRequest.getTopicName());
java.nio.ByteBuffer storeVersionStateByte =
ByteBuffer.wrap(storeVersionStateSerializer.serialize(blobTransferRequest.getTopicName(), storeVersionState));

OffsetRecord offsetRecord =
storageMetadataService.getLastOffset(blobTransferRequest.getTopicName(), blobTransferRequest.getPartition());
java.nio.ByteBuffer offsetRecordByte = ByteBuffer.wrap(offsetRecord.toBytes());

metadata = new BlobTransferPartitionMetadata(
blobTransferRequest.getTopicName(),
blobTransferRequest.getPartition(),
offsetRecordByte,
storeVersionStateByte);
} catch (Exception e) {
byte[] errBody = ("Failed to get metadata for " + blobTransferRequest.getTopicName()).getBytes();
setupResponseAndFlush(HttpResponseStatus.INTERNAL_SERVER_ERROR, errBody, false, ctx);
}

ObjectMapper objectMapper = ObjectMapperFactory.getInstance();
String jsonMetadata = objectMapper.writeValueAsString(metadata);
byte[] metadataBytes = jsonMetadata.getBytes();
Expand All @@ -247,9 +240,9 @@ public void sendMetadata(BlobTransferPayload blobTransferRequest, ChannelHandler

ctx.writeAndFlush(metadataResponse).addListener(future -> {
if (future.isSuccess()) {
LOGGER.debug("Metadata for {} sent successfully", blobTransferRequest.getTopicName());
LOGGER.debug("Metadata for {} sent successfully", metadata.getTopicName());
} else {
LOGGER.error("Failed to send metadata for {}", blobTransferRequest.getTopicName());
LOGGER.error("Failed to send metadata for {}", metadata.getTopicName());
}
});
}
Expand Down
Loading

0 comments on commit 5a6fb94

Please sign in to comment.