diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index beb84ca31cc..e57544190dd 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -295,7 +295,11 @@ public DaVinciBackend( configLoader.getVeniceServerConfig().getDvcP2pBlobTransferClientPort(), configLoader.getVeniceServerConfig().getRocksDBPath(), clientConfig, - storageMetadataService); + storageMetadataService, + readOnlyStoreRepository, + storageService.getStorageEngineRepository(), + backendConfig.getMaxConcurrentSnapshotUser(), + backendConfig.getSnapshotRetentionTimeInMin()); } else { blobTransferManager = null; } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java index d0dad303679..c7df7fa118b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java @@ -1,15 +1,25 @@ package com.linkedin.davinci.blobtransfer; import com.google.common.annotations.VisibleForTesting; +import com.linkedin.davinci.storage.StorageEngineRepository; +import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.store.AbstractStorageEngine; +import com.linkedin.davinci.store.AbstractStoragePartition; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.kafka.protocol.state.StoreVersionState; import com.linkedin.venice.meta.ReadOnlyStoreRepository; import com.linkedin.venice.meta.Store; -import com.linkedin.venice.store.rocksdb.RocksDBUtils; +import com.linkedin.venice.offsets.OffsetRecord; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; +import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.locks.AutoCloseableLock; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -22,85 +32,203 @@ /** - * This class will take and return a snapshot of a hybrid store, if someone is using a snapshot then it will return - * the snapshot that person is using, otherwise, it will return the last snapshot taken if it is not stale - * If the snapshot is stale and no one is using it, it will update the snapshot and return the new one + * This class will manage the snapshot creation, for batch store and hybrid store. */ public class BlobSnapshotManager { - private final Map> concurrentSnapshotUsers; - private Map> snapShotTimestamps; - private final long snapshotRetentionTime; - private final String basePath; + private static final Logger LOGGER = LogManager.getLogger(BlobSnapshotManager.class); + private static final InternalAvroSpecificSerializer storeVersionStateSerializer = + AvroProtocolDefinition.STORE_VERSION_STATE.getSerializer(); + private final static int DEFAULT_SNAPSHOT_RETENTION_TIME_IN_MIN = 30; + public final static int DEFAULT_MAX_CONCURRENT_USERS = 5; + + // A map to keep track of the number of hosts using a snapshot for a particular topic and partition, use to restrict + // concurrent user count + // Example: > + private Map> concurrentSnapshotUsers; + // A map to keep track of the snapshot timestamps for a particular topic and partition, use to track snapshot + // staleness + // Example: > + private Map> snapshotTimestamps; + // A map to keep track the snapshot respective offset record for a particular topic and partition, use to keep + // snapshot/offset consistency + // Example: > + private Map> snapshotMetadataRecords; + private final ReadOnlyStoreRepository readOnlyStoreRepository; + private final StorageEngineRepository storageEngineRepository; + private final StorageMetadataService storageMetadataService; + private final int maxConcurrentUsers; + private final long snapshotRetentionTimeInMillis; private final Lock lock = new ReentrantLock(); - private static final Logger LOGGER = LogManager.getLogger(BlobSnapshotManager.class); /** * Constructor for the BlobSnapshotManager */ public BlobSnapshotManager( - String basePath, - long snapshotRetentionTime, - ReadOnlyStoreRepository readOnlyStoreRepository) { - this.basePath = basePath; - this.snapshotRetentionTime = snapshotRetentionTime; + ReadOnlyStoreRepository readOnlyStoreRepository, + StorageEngineRepository storageEngineRepository, + StorageMetadataService storageMetadataService, + int maxConcurrentUsers, + int snapshotRetentionTimeInMin) { this.readOnlyStoreRepository = readOnlyStoreRepository; - this.snapShotTimestamps = new VeniceConcurrentHashMap<>(); + this.storageEngineRepository = storageEngineRepository; + this.storageMetadataService = storageMetadataService; + this.maxConcurrentUsers = maxConcurrentUsers; + this.snapshotRetentionTimeInMillis = TimeUnit.MINUTES.toMillis(snapshotRetentionTimeInMin); + this.concurrentSnapshotUsers = new VeniceConcurrentHashMap<>(); + this.snapshotTimestamps = new VeniceConcurrentHashMap<>(); + this.snapshotMetadataRecords = new VeniceConcurrentHashMap<>(); } /** - * Checks if the snapshot is stale, if it is and no one is using it, it updates the snapshot, - * otherwise it increases the count of people using the snapshot + * The constructor for the BlobSnapshotManager, + * with default max concurrent users and snapshot retention time */ - public void maybeUpdateHybridSnapshot(RocksDB rocksDB, String topicName, int partitionId) { - if (rocksDB == null || topicName == null) { - throw new IllegalArgumentException("RocksDB instance and topicName cannot be null"); - } - if (!isStoreHybrid(topicName)) { - LOGGER.warn("Store {} is not hybrid, skipping snapshot update", topicName); - return; - } - String fullPathForPartitionDBSnapshot = RocksDBUtils.composeSnapshotDir(this.basePath, topicName, partitionId); - snapShotTimestamps.putIfAbsent(topicName, new VeniceConcurrentHashMap<>()); - concurrentSnapshotUsers.putIfAbsent(topicName, new VeniceConcurrentHashMap<>()); - concurrentSnapshotUsers.get(topicName).putIfAbsent(partitionId, new AtomicLong(0)); - try (AutoCloseableLock ignored = AutoCloseableLock.of(lock)) { - if (isSnapshotStale(topicName, partitionId)) { - if (concurrentSnapshotUsers.get(topicName).get(partitionId) == null - || concurrentSnapshotUsers.get(topicName).get(partitionId).get() == 0) { - updateHybridSnapshot(rocksDB, topicName, partitionId); - snapShotTimestamps.get(topicName).put(partitionId, System.currentTimeMillis()); + @VisibleForTesting + public BlobSnapshotManager( + ReadOnlyStoreRepository readOnlyStoreRepository, + StorageEngineRepository storageEngineRepository, + StorageMetadataService storageMetadataService) { + this( + readOnlyStoreRepository, + storageEngineRepository, + storageMetadataService, + DEFAULT_MAX_CONCURRENT_USERS, + DEFAULT_SNAPSHOT_RETENTION_TIME_IN_MIN); + } + + /** + * Get the transfer metadata for a particular payload + * 0. pre-check: throttle the request if many concurrent users. + * 1. the store is not hybrid, it will prepare the metadata and return it. + * 2. the store is hybrid: + * 2. 1. check snapshot staleness + * 2. 1. 1. if stale, recreate the snapshot and metadata, then return the metadata + * 2. 1. 2. if not stale, directly return the metadata + * + * @param payload the blob transfer payload + * @return the need transfer metadata to client + */ + public BlobTransferPartitionMetadata getTransferMetadata(BlobTransferPayload payload) throws VeniceException { + String topicName = payload.getTopicName(); + int partitionId = payload.getPartition(); + + // check if the concurrent user count exceeds the limit + checkIfConcurrentUserExceedsLimit(topicName, partitionId); + concurrentSnapshotUsers.computeIfAbsent(topicName, k -> new VeniceConcurrentHashMap<>()) + .computeIfAbsent(partitionId, k -> new AtomicLong(0)) + .incrementAndGet(); + + boolean isHybrid = isStoreHybrid(payload.getStoreName()); + if (!isHybrid) { + return prepareMetadata(payload); + } else { + snapshotTimestamps.putIfAbsent(topicName, new VeniceConcurrentHashMap<>()); + snapshotMetadataRecords.putIfAbsent(topicName, new VeniceConcurrentHashMap<>()); + + try (AutoCloseableLock ignored = AutoCloseableLock.of(lock)) { + // check if the snapshot is stale and need to be recreated + if (isSnapshotStale(topicName, partitionId)) { + // recreate the snapshot and metadata + recreateSnapshotAndMetadata(payload); + } else { + LOGGER.info( + "Snapshot for topic {} partition {} is not stale, skip creating new snapshot. ", + topicName, + partitionId); } + return snapshotMetadataRecords.get(topicName).get(partitionId); } - concurrentSnapshotUsers.get(topicName).get(partitionId).incrementAndGet(); - LOGGER.info( - "Retrieved snapshot from {} with timestamp {}", - fullPathForPartitionDBSnapshot, - snapShotTimestamps.get(topicName).get(partitionId)); } } - public void decreaseConcurrentUserCount(String topicName, int partitionId) { + /** + * Recreate a snapshot and metadata for a hybrid store + * and update the snapshot timestamp and metadata records + * @param blobTransferRequest the blob transfer request + */ + private void recreateSnapshotAndMetadata(BlobTransferPayload blobTransferRequest) { + String topicName = blobTransferRequest.getTopicName(); + int partitionId = blobTransferRequest.getPartition(); + try { + // 1. get the snapshot metadata before recreating the snapshot + BlobTransferPartitionMetadata metadataBeforeRecreateSnapshot = prepareMetadata(blobTransferRequest); + // 2. recreate the snapshot + createSnapshot(topicName, partitionId); + + // update the snapshot timestamp to reflect the latest snapshot creation time + snapshotTimestamps.get(topicName).put(partitionId, System.currentTimeMillis()); + // update the snapshot offset record to reflect the latest snapshot offset + snapshotMetadataRecords.get(topicName).put(partitionId, metadataBeforeRecreateSnapshot); + LOGGER.info("Successfully recreated snapshot for topic {} partition {}. ", topicName, partitionId); + } catch (Exception e) { + String errorMessage = + String.format("Failed to create snapshot for topic %s partition %d", topicName, partitionId); + LOGGER.error(errorMessage, e); + throw new VeniceException(errorMessage); + } + } + + /** + * Check if the concurrent user count exceeds the limit + * @param topicName the topic name + * @param partitionId the partition id + * @throws VeniceException if the concurrent user count exceeds the limit + */ + private void checkIfConcurrentUserExceedsLimit(String topicName, int partitionId) throws VeniceException { + boolean exceededMaxConcurrentUsers = getConcurrentSnapshotUsers(topicName, partitionId) >= maxConcurrentUsers; + if (exceededMaxConcurrentUsers) { + String errorMessage = String.format( + "Exceeded the maximum number of concurrent users %d for topic %s partition %d", + maxConcurrentUsers, + topicName, + partitionId); + LOGGER.error(errorMessage); + throw new VeniceException(errorMessage); + } + } + + /** + * Check if the snapshot is stale + * @param topicName the topic name + * @param partitionId the partition id + * @return true if the snapshot is stale, false otherwise + */ + private boolean isSnapshotStale(String topicName, int partitionId) { + if (!snapshotTimestamps.containsKey(topicName) || !snapshotTimestamps.get(topicName).containsKey(partitionId)) { + return true; + } + return System.currentTimeMillis() + - snapshotTimestamps.get(topicName).get(partitionId) > snapshotRetentionTimeInMillis; + } + + /** + * Decrease the count of hosts using the snapshot + */ + public void decreaseConcurrentUserCount(BlobTransferPayload blobTransferRequest) { + String topicName = blobTransferRequest.getTopicName(); + int partitionId = blobTransferRequest.getPartition(); Map concurrentPartitionUsers = concurrentSnapshotUsers.get(topicName); if (concurrentPartitionUsers == null) { throw new VeniceException("No topic found: " + topicName); } + AtomicLong concurrentUsers = concurrentPartitionUsers.get(partitionId); if (concurrentUsers == null) { throw new VeniceException(String.format("%d partition not found on topic %s", partitionId, topicName)); } - try (AutoCloseableLock ignored = AutoCloseableLock.of(lock)) { - long result = concurrentUsers.decrementAndGet(); - if (result < 0) { - throw new VeniceException("Concurrent user count cannot be negative"); - } + long result = concurrentUsers.decrementAndGet(); + if (result < 0) { + throw new VeniceException("Concurrent user count cannot be negative"); } + + LOGGER.info("Concurrent user count for topic {} partition {} decreased to {}", topicName, partitionId, result); } - long getConcurrentSnapshotUsers(RocksDB rocksDB, String topicName, int partitionId) { - if (rocksDB == null || topicName == null) { + protected long getConcurrentSnapshotUsers(String topicName, int partitionId) { + if (topicName == null) { throw new IllegalArgumentException("RocksDB instance and topicName cannot be null"); } if (!concurrentSnapshotUsers.containsKey(topicName) @@ -110,22 +238,13 @@ long getConcurrentSnapshotUsers(RocksDB rocksDB, String topicName, int partition return concurrentSnapshotUsers.get(topicName).get(partitionId).get(); } - void setSnapShotTimestamps(Map> snapShotTimestamps) { - this.snapShotTimestamps = snapShotTimestamps; - } - /** - * Checks if the current snapshot of the partition is stale + * Check if the store is hybrid + * @param storeName the name of the store + * @return true if the store is hybrid, false otherwise */ - private boolean isSnapshotStale(String topicName, int partitionId) { - if (!snapShotTimestamps.containsKey(topicName) || !snapShotTimestamps.get(topicName).containsKey(partitionId)) { - return true; - } - return System.currentTimeMillis() - snapShotTimestamps.get(topicName).get(partitionId) > snapshotRetentionTime; - } - - private boolean isStoreHybrid(String topicName) { - Store store = readOnlyStoreRepository.getStore(topicName); + public boolean isStoreHybrid(String storeName) { + Store store = readOnlyStoreRepository.getStore(storeName); if (store != null) { return store.isHybrid(); } @@ -133,42 +252,11 @@ private boolean isStoreHybrid(String topicName) { } /** - * Updates the snapshot of the hybrid store - */ - private void updateHybridSnapshot(RocksDB rocksDB, String topicName, int partitionId) { - String fullPathForPartitionDBSnapshot = RocksDBUtils.composeSnapshotDir(this.basePath, topicName, partitionId); - File partitionSnapshotDir = new File(fullPathForPartitionDBSnapshot); - if (partitionSnapshotDir.exists()) { - if (!partitionSnapshotDir.delete()) { - throw new VeniceException( - "Failed to delete the existing snapshot directory: " + fullPathForPartitionDBSnapshot); - } - return; - } - try { - Checkpoint checkpoint = createCheckpoint(rocksDB); - - LOGGER.info("Creating snapshots in directory: {}", fullPathForPartitionDBSnapshot); - checkpoint.createCheckpoint(fullPathForPartitionDBSnapshot); - LOGGER.info("Finished creating snapshots in directory: {}", fullPathForPartitionDBSnapshot); - } catch (RocksDBException e) { - throw new VeniceException( - "Received exception during RocksDB's snapshot creation in directory " + fullPathForPartitionDBSnapshot, - e); - } - } - - @VisibleForTesting - protected Checkpoint createCheckpoint(RocksDB rocksDB) { - return Checkpoint.create(rocksDB); - } - - /** - * util method to create a snapshot for batch only + * util method to create a snapshot * It will check the snapshot directory and delete it if it exists, then generate a new snapshot */ - public static void createSnapshotForBatch(RocksDB rocksDB, String fullPathForPartitionDBSnapshot) { - LOGGER.info("Creating snapshot for batch in directory: {}", fullPathForPartitionDBSnapshot); + public static void createSnapshot(RocksDB rocksDB, String fullPathForPartitionDBSnapshot) { + LOGGER.info("Creating snapshot in directory: {}", fullPathForPartitionDBSnapshot); // clean up the snapshot directory if it exists File partitionSnapshotDir = new File(fullPathForPartitionDBSnapshot); @@ -184,16 +272,59 @@ public static void createSnapshotForBatch(RocksDB rocksDB, String fullPathForPar } try { - LOGGER.info("Start creating snapshots for batch in directory: {}", fullPathForPartitionDBSnapshot); + LOGGER.info("Start creating snapshots in directory: {}", fullPathForPartitionDBSnapshot); Checkpoint checkpoint = Checkpoint.create(rocksDB); checkpoint.createCheckpoint(fullPathForPartitionDBSnapshot); - LOGGER.info("Finished creating snapshots for batch in directory: {}", fullPathForPartitionDBSnapshot); + LOGGER.info("Finished creating snapshots in directory: {}", fullPathForPartitionDBSnapshot); } catch (RocksDBException e) { throw new VeniceException( "Received exception during RocksDB's snapshot creation in directory " + fullPathForPartitionDBSnapshot, e); } } + + /** + * Create a snapshot for a particular partition + */ + public void createSnapshot(String kafkaVersionTopic, int partitionId) { + AbstractStorageEngine storageEngine = + Objects.requireNonNull(storageEngineRepository.getLocalStorageEngine(kafkaVersionTopic)); + AbstractStoragePartition partition = storageEngine.getPartitionOrThrow(partitionId); + partition.createSnapshot(); + } + + /** + * Get the snapshot metadata for a particular topic and partition + * @param topicName the topic name + * @param partitionId the partition id + * @return the snapshot metadata + */ + public BlobTransferPartitionMetadata getTransferredSnapshotMetadata(String topicName, int partitionId) { + return snapshotMetadataRecords.get(topicName).get(partitionId); + } + + /** + * Prepare the metadata for a blob transfer request + * @param blobTransferRequest the blob transfer request + * @return the metadata for the blob transfer request + */ + public BlobTransferPartitionMetadata prepareMetadata(BlobTransferPayload blobTransferRequest) { + // prepare metadata + StoreVersionState storeVersionState = + storageMetadataService.getStoreVersionState(blobTransferRequest.getTopicName()); + java.nio.ByteBuffer storeVersionStateByte = + ByteBuffer.wrap(storeVersionStateSerializer.serialize(blobTransferRequest.getTopicName(), storeVersionState)); + + OffsetRecord offsetRecord = + storageMetadataService.getLastOffset(blobTransferRequest.getTopicName(), blobTransferRequest.getPartition()); + java.nio.ByteBuffer offsetRecordByte = ByteBuffer.wrap(offsetRecord.toBytes()); + + return new BlobTransferPartitionMetadata( + blobTransferRequest.getTopicName(), + blobTransferRequest.getPartition(), + offsetRecordByte, + storeVersionStateByte); + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferPayload.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferPayload.java index d39b9a29a95..3de5e572f00 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferPayload.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferPayload.java @@ -13,9 +13,11 @@ public class BlobTransferPayload { private final int partition; private final String topicName; private final String partitionDir; + private final String storeName; public BlobTransferPayload(String baseDir, String storeName, int version, int partition) { this.partition = partition; + this.storeName = storeName; this.topicName = storeName + "_v" + version; this.partitionDir = composePartitionDbDir(baseDir, topicName, partition); } @@ -39,4 +41,8 @@ public String getTopicName() { public int getPartition() { return partition; } + + public String getStoreName() { + return storeName; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java index 7a18d402daf..4caa3f87056 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java @@ -4,6 +4,7 @@ import com.linkedin.davinci.blobtransfer.client.NettyFileTransferClient; import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService; +import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.venice.blobtransfer.DaVinciBlobFinder; import com.linkedin.venice.blobtransfer.ServerBlobFinder; @@ -11,6 +12,7 @@ import com.linkedin.venice.client.store.AvroGenericStoreClientImpl; import com.linkedin.venice.client.store.ClientConfig; import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository; +import com.linkedin.venice.meta.ReadOnlyStoreRepository; import java.util.concurrent.CompletableFuture; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -32,13 +34,21 @@ public static BlobTransferManager getP2PBlobTransferManagerForDVCAndStart( int p2pTransferPort, String baseDir, ClientConfig clientConfig, - StorageMetadataService storageMetadataService) { + StorageMetadataService storageMetadataService, + ReadOnlyStoreRepository readOnlyStoreRepository, + StorageEngineRepository storageEngineRepository, + int maxConcurrentSnapshotUser, + int snapshotRetentionTimeInMin) { return getP2PBlobTransferManagerForDVCAndStart( p2pTransferPort, p2pTransferPort, baseDir, clientConfig, - storageMetadataService); + storageMetadataService, + readOnlyStoreRepository, + storageEngineRepository, + maxConcurrentSnapshotUser, + snapshotRetentionTimeInMin); } public static BlobTransferManager getP2PBlobTransferManagerForDVCAndStart( @@ -46,12 +56,22 @@ public static BlobTransferManager getP2PBlobTransferManagerForDVCAndStart( int p2pTransferClientPort, String baseDir, ClientConfig clientConfig, - StorageMetadataService storageMetadataService) { + StorageMetadataService storageMetadataService, + ReadOnlyStoreRepository readOnlyStoreRepository, + StorageEngineRepository storageEngineRepository, + int maxConcurrentSnapshotUser, + int snapshotRetentionTimeInMin) { try { + BlobSnapshotManager blobSnapshotManager = new BlobSnapshotManager( + readOnlyStoreRepository, + storageEngineRepository, + storageMetadataService, + maxConcurrentSnapshotUser, + snapshotRetentionTimeInMin); AbstractAvroStoreClient storeClient = new AvroGenericStoreClientImpl<>(getTransportClient(clientConfig), false, clientConfig); BlobTransferManager manager = new NettyP2PBlobTransferManager( - new P2PBlobTransferService(p2pTransferServerPort, baseDir, storageMetadataService), + new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobSnapshotManager), new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService), new DaVinciBlobFinder(storeClient)); manager.start(); @@ -76,10 +96,20 @@ public static BlobTransferManager getP2PBlobTransferManagerForServerAndSta int p2pTransferClientPort, String baseDir, CompletableFuture customizedViewFuture, - StorageMetadataService storageMetadataService) { + StorageMetadataService storageMetadataService, + ReadOnlyStoreRepository readOnlyStoreRepository, + StorageEngineRepository storageEngineRepository, + int maxConcurrentSnapshotUser, + int snapshotRetentionTimeInMin) { try { + BlobSnapshotManager blobSnapshotManager = new BlobSnapshotManager( + readOnlyStoreRepository, + storageEngineRepository, + storageMetadataService, + maxConcurrentSnapshotUser, + snapshotRetentionTimeInMin); BlobTransferManager manager = new NettyP2PBlobTransferManager( - new P2PBlobTransferService(p2pTransferServerPort, baseDir, storageMetadataService), + new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobSnapshotManager), new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService), new ServerBlobFinder(customizedViewFuture)); manager.start(); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PFileTransferClientHandler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PFileTransferClientHandler.java index b5c3eed3a03..7688281d2e1 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PFileTransferClientHandler.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PFileTransferClientHandler.java @@ -100,8 +100,17 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex Path partitionDir = Paths.get(payload.getPartitionDir()); Files.createDirectories(partitionDir); - // Prepare the file + // Prepare the file, remove it if it exists + if (Files.deleteIfExists(partitionDir.resolve(fileName))) { + LOGGER.debug( + "File {} already exists for topic {} partition {}. Overwriting it.", + fileName, + payload.getTopicName(), + payload.getPartition()); + } + Path file = Files.createFile(partitionDir.resolve(fileName)); + outputFileChannel = FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.APPEND); } else if (msg instanceof HttpContent) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java index 01c951d920d..340a192ee2c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java @@ -1,6 +1,6 @@ package com.linkedin.davinci.blobtransfer.server; -import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.blobtransfer.BlobSnapshotManager; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; @@ -12,11 +12,11 @@ public class BlobTransferNettyChannelInitializer extends ChannelInitializer { private final String baseDir; - private StorageMetadataService storageMetadataService; + private BlobSnapshotManager blobSnapshotManager; - public BlobTransferNettyChannelInitializer(String baseDir, StorageMetadataService storageMetadataService) { + public BlobTransferNettyChannelInitializer(String baseDir, BlobSnapshotManager blobSnapshotManager) { this.baseDir = baseDir; - this.storageMetadataService = storageMetadataService; + this.blobSnapshotManager = blobSnapshotManager; } @Override @@ -32,6 +32,6 @@ protected void initChannel(SocketChannel ch) throws Exception { // for safe writing of chunks for responses .addLast("chunker", new ChunkedWriteHandler()) // for handling p2p file transfer - .addLast("p2pFileTransferHandler", new P2PFileTransferServerHandler(baseDir, storageMetadataService)); + .addLast("p2pFileTransferHandler", new P2PFileTransferServerHandler(baseDir, blobSnapshotManager)); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PBlobTransferService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PBlobTransferService.java index d7257579fe4..132fe3713f3 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PBlobTransferService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PBlobTransferService.java @@ -1,6 +1,6 @@ package com.linkedin.davinci.blobtransfer.server; -import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.blobtransfer.BlobSnapshotManager; import com.linkedin.venice.service.AbstractVeniceService; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; @@ -31,7 +31,7 @@ public class P2PBlobTransferService extends AbstractVeniceService { // TODO 5: add compression support // TODO 6: consider either increasing worker threads or have a dedicated thread pool to handle requests. - public P2PBlobTransferService(int port, String baseDir, StorageMetadataService storageMetadataService) { + public P2PBlobTransferService(int port, String baseDir, BlobSnapshotManager blobSnapshotManager) { this.port = port; this.serverBootstrap = new ServerBootstrap(); @@ -48,7 +48,7 @@ public P2PBlobTransferService(int port, String baseDir, StorageMetadataService s serverBootstrap.group(bossGroup, workerGroup) .channel(socketChannelClass) - .childHandler(new BlobTransferNettyChannelInitializer(baseDir, storageMetadataService)) + .childHandler(new BlobTransferNettyChannelInitializer(baseDir, blobSnapshotManager)) .option(ChannelOption.SO_BACKLOG, 1000) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java index b89ddbe595b..d842dca279f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java @@ -10,14 +10,10 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.davinci.blobtransfer.BlobSnapshotManager; import com.linkedin.davinci.blobtransfer.BlobTransferPartitionMetadata; import com.linkedin.davinci.blobtransfer.BlobTransferPayload; -import com.linkedin.davinci.storage.StorageMetadataService; -import com.linkedin.venice.kafka.protocol.state.StoreVersionState; -import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.request.RequestHelper; -import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; -import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer; import com.linkedin.venice.utils.ObjectMapperFactory; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; @@ -44,7 +40,6 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.net.URI; -import java.nio.ByteBuffer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -56,15 +51,14 @@ @ChannelHandler.Sharable public class P2PFileTransferServerHandler extends SimpleChannelInboundHandler { private static final Logger LOGGER = LogManager.getLogger(P2PFileTransferServerHandler.class); - private static final InternalAvroSpecificSerializer storeVersionStateSerializer = - AvroProtocolDefinition.STORE_VERSION_STATE.getSerializer(); + private boolean useZeroCopy = false; private final String baseDir; - private StorageMetadataService storageMetadataService; + private BlobSnapshotManager blobSnapshotManager; - public P2PFileTransferServerHandler(String baseDir, StorageMetadataService storageMetadataService) { + public P2PFileTransferServerHandler(String baseDir, BlobSnapshotManager blobSnapshotManager) { this.baseDir = baseDir; - this.storageMetadataService = storageMetadataService; + this.blobSnapshotManager = blobSnapshotManager; } @Override @@ -99,52 +93,67 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest httpReque ctx); return; } - final BlobTransferPayload blobTransferRequest; - final File snapshotDir; + BlobTransferPayload blobTransferRequest = null; try { - blobTransferRequest = parseBlobTransferPayload(URI.create(httpRequest.uri())); - snapshotDir = new File(blobTransferRequest.getSnapshotDir()); - if (!snapshotDir.exists() || !snapshotDir.isDirectory()) { - byte[] errBody = ("Snapshot for " + blobTransferRequest.getFullResourceName() + " doesn't exist").getBytes(); - setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, errBody, false, ctx); + final File snapshotDir; + BlobTransferPartitionMetadata transferPartitionMetadata; + + try { + blobTransferRequest = parseBlobTransferPayload(URI.create(httpRequest.uri())); + snapshotDir = new File(blobTransferRequest.getSnapshotDir()); + try { + transferPartitionMetadata = blobSnapshotManager.getTransferMetadata(blobTransferRequest); + } catch (Exception e) { + setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, e.getMessage().getBytes(), false, ctx); + return; + } + + if (!snapshotDir.exists() || !snapshotDir.isDirectory()) { + byte[] errBody = ("Snapshot for " + blobTransferRequest.getFullResourceName() + " doesn't exist").getBytes(); + setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, errBody, false, ctx); + return; + } + } catch (IllegalArgumentException e) { + setupResponseAndFlush(HttpResponseStatus.BAD_REQUEST, e.getMessage().getBytes(), false, ctx); + return; + } catch (SecurityException e) { + setupResponseAndFlush(HttpResponseStatus.FORBIDDEN, e.getMessage().getBytes(), false, ctx); return; } - } catch (IllegalArgumentException e) { - setupResponseAndFlush(HttpResponseStatus.BAD_REQUEST, e.getMessage().getBytes(), false, ctx); - return; - } catch (SecurityException e) { - setupResponseAndFlush(HttpResponseStatus.FORBIDDEN, e.getMessage().getBytes(), false, ctx); - return; - } - File[] files = snapshotDir.listFiles(); - if (files == null || files.length == 0) { - setupResponseAndFlush( - HttpResponseStatus.INTERNAL_SERVER_ERROR, - ("Failed to access files at " + snapshotDir).getBytes(), - false, - ctx); - return; - } + File[] files = snapshotDir.listFiles(); + if (files == null || files.length == 0) { + setupResponseAndFlush( + HttpResponseStatus.INTERNAL_SERVER_ERROR, + ("Failed to access files at " + snapshotDir).getBytes(), + false, + ctx); + return; + } - // transfer files - for (File file: files) { - sendFile(file, ctx); - } + // transfer files + for (File file: files) { + sendFile(file, ctx); + } - // transfer metadata - sendMetadata(blobTransferRequest, ctx); + sendMetadata(ctx, transferPartitionMetadata); - // end of transfer - HttpResponse endOfTransfer = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - endOfTransfer.headers().set(BLOB_TRANSFER_STATUS, BLOB_TRANSFER_COMPLETED); - ctx.writeAndFlush(endOfTransfer).addListener(future -> { - if (future.isSuccess()) { - LOGGER.debug("All files sent successfully for {}", blobTransferRequest.getFullResourceName()); - } else { - LOGGER.error("Failed to send all files for {}", blobTransferRequest.getFullResourceName(), future.cause()); + // end of transfer + HttpResponse endOfTransfer = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + endOfTransfer.headers().set(BLOB_TRANSFER_STATUS, BLOB_TRANSFER_COMPLETED); + String fullResourceName = blobTransferRequest.getFullResourceName(); + ctx.writeAndFlush(endOfTransfer).addListener(future -> { + if (future.isSuccess()) { + LOGGER.debug("All files sent successfully for {}", fullResourceName); + } else { + LOGGER.error("Failed to send all files for {}", fullResourceName, future.cause()); + } + }); + } finally { + if (blobTransferRequest != null) { + blobSnapshotManager.decreaseConcurrentUserCount(blobTransferRequest); } - }); + } } /** @@ -210,30 +219,14 @@ private void sendFile(File file, ChannelHandlerContext ctx) throws IOException { }); } - public void sendMetadata(BlobTransferPayload blobTransferRequest, ChannelHandlerContext ctx) + /** + * Send metadata for the given blob transfer request + * @param ctx the channel context + * @param metadata the metadata to be sent + * @throws JsonProcessingException + */ + public void sendMetadata(ChannelHandlerContext ctx, BlobTransferPartitionMetadata metadata) throws JsonProcessingException { - // prepare metadata - BlobTransferPartitionMetadata metadata = null; - try { - StoreVersionState storeVersionState = - storageMetadataService.getStoreVersionState(blobTransferRequest.getTopicName()); - java.nio.ByteBuffer storeVersionStateByte = - ByteBuffer.wrap(storeVersionStateSerializer.serialize(blobTransferRequest.getTopicName(), storeVersionState)); - - OffsetRecord offsetRecord = - storageMetadataService.getLastOffset(blobTransferRequest.getTopicName(), blobTransferRequest.getPartition()); - java.nio.ByteBuffer offsetRecordByte = ByteBuffer.wrap(offsetRecord.toBytes()); - - metadata = new BlobTransferPartitionMetadata( - blobTransferRequest.getTopicName(), - blobTransferRequest.getPartition(), - offsetRecordByte, - storeVersionStateByte); - } catch (Exception e) { - byte[] errBody = ("Failed to get metadata for " + blobTransferRequest.getTopicName()).getBytes(); - setupResponseAndFlush(HttpResponseStatus.INTERNAL_SERVER_ERROR, errBody, false, ctx); - } - ObjectMapper objectMapper = ObjectMapperFactory.getInstance(); String jsonMetadata = objectMapper.writeValueAsString(metadata); byte[] metadataBytes = jsonMetadata.getBytes(); @@ -247,9 +240,9 @@ public void sendMetadata(BlobTransferPayload blobTransferRequest, ChannelHandler ctx.writeAndFlush(metadataResponse).addListener(future -> { if (future.isSuccess()) { - LOGGER.debug("Metadata for {} sent successfully", blobTransferRequest.getTopicName()); + LOGGER.debug("Metadata for {} sent successfully", metadata.getTopicName()); } else { - LOGGER.error("Failed to send metadata for {}", blobTransferRequest.getTopicName()); + LOGGER.error("Failed to send metadata for {}", metadata.getTopicName()); } }); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index 241905848a1..88f411d86b6 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -5,6 +5,8 @@ import static com.linkedin.venice.ConfigConstants.DEFAULT_MAX_RECORD_SIZE_BYTES_BACKFILL; import static com.linkedin.venice.ConfigKeys.AUTOCREATE_DATA_PATH; import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_MANAGER_ENABLED; +import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_MAX_CONCURRENT_SNAPSHOT_USER; +import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_SNAPSHOT_RETENTION_TIME_IN_MIN; import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH; import static com.linkedin.venice.ConfigKeys.DAVINCI_P2P_BLOB_TRANSFER_CLIENT_PORT; import static com.linkedin.venice.ConfigKeys.DAVINCI_P2P_BLOB_TRANSFER_SERVER_PORT; @@ -532,6 +534,8 @@ public class VeniceServerConfig extends VeniceClusterConfig { private final boolean recordLevelMetricWhenBootstrappingCurrentVersionEnabled; private final String identityParserClassName; private final boolean blobTransferManagerEnabled; + private final int snapshotRetentionTimeInMin; + private final int maxConcurrentSnapshotUser; private final int dvcP2pBlobTransferServerPort; private final int dvcP2pBlobTransferClientPort; private final boolean daVinciCurrentVersionBootstrappingSpeedupEnabled; @@ -571,6 +575,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map bootstrapFuture = @@ -98,10 +96,7 @@ public void startConsumption(VeniceStoreVersionConfig storeConfig, int partition * Blob transfer should be enabled to boostrap from blobs, and it currently only supports batch-stores. */ CompletionStage bootstrapFromBlobs(Store store, int versionNumber, int partitionId) { - // TODO: need to differentiate that's DVC or server. Right now, it doesn't tell so both components can create, - // though - // Only DVC would create blobTransferManager. - if (!store.isBlobTransferEnabled() || store.isHybrid() || blobTransferManager == null) { + if (!store.isBlobTransferEnabled() || blobTransferManager == null) { return CompletableFuture.completedFuture(null); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java index 87a084dba9f..f1fb0fee295 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java @@ -489,7 +489,7 @@ public synchronized void endBatchWrite() { @Override public synchronized void createSnapshot() { if (blobTransferEnabled) { - BlobSnapshotManager.createSnapshotForBatch(rocksDB, fullPathForPartitionDBSnapshot); + BlobSnapshotManager.createSnapshot(rocksDB, fullPathForPartitionDBSnapshot); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManagerTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManagerTest.java index 20cbc2064aa..08ebe35dd45 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManagerTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManagerTest.java @@ -1,16 +1,27 @@ package com.linkedin.davinci.blobtransfer; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import com.linkedin.davinci.storage.StorageEngineRepository; +import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.store.AbstractStorageEngine; +import com.linkedin.davinci.store.AbstractStoragePartition; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.meta.ReadOnlyStoreRepository; import com.linkedin.venice.meta.Store; import com.linkedin.venice.store.rocksdb.RocksDBUtils; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import java.io.File; import java.io.IOException; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -25,126 +36,189 @@ public class BlobSnapshotManagerTest { - private static final long SNAPSHOT_RETENTION_TIME = 60000; - private static final String STORE_NAME = Utils.getUniqueString("sstTest"); + private static final String STORE_NAME = "test-store"; + private static final int VERSION_ID = 1; + private static final String TOPIC_NAME = STORE_NAME + "_v" + VERSION_ID; private static final int PARTITION_ID = 0; private static final String BASE_PATH = Utils.getUniqueTempPath("sstTest"); private static final ReadOnlyStoreRepository readOnlyStoreRepository = mock(ReadOnlyStoreRepository.class); - private static final String DB_DIR = - BASE_PATH + "/" + STORE_NAME + "/" + RocksDBUtils.getPartitionDbName(STORE_NAME, PARTITION_ID); + private static final StorageEngineRepository storageEngineRepository = mock(StorageEngineRepository.class); + private static final StorageMetadataService storageMetadataService = mock(StorageMetadataService.class); + private static final BlobTransferPartitionMetadata blobTransferPartitionMetadata = + new BlobTransferPartitionMetadata(); + private static final String DB_DIR = BASE_PATH + "/" + STORE_NAME + "_v" + VERSION_ID + "/" + + RocksDBUtils.getPartitionDbName(STORE_NAME + "_v" + VERSION_ID, PARTITION_ID); + private static final BlobTransferPayload blobTransferPayload = + new BlobTransferPayload(BASE_PATH, STORE_NAME, VERSION_ID, PARTITION_ID); @Test - public void testHybridSnapshot() throws RocksDBException { - RocksDB mockRocksDB = mock(RocksDB.class); - Checkpoint mockCheckpoint = mock(Checkpoint.class); + public void testHybridSnapshot() { + AbstractStorageEngine storageEngine = Mockito.mock(AbstractStorageEngine.class); + Mockito.doReturn(storageEngine).when(storageEngineRepository).getLocalStorageEngine(TOPIC_NAME); + + AbstractStoragePartition storagePartition = Mockito.mock(AbstractStoragePartition.class); + Mockito.doReturn(storagePartition).when(storageEngine).getPartitionOrThrow(PARTITION_ID); + Mockito.doNothing().when(storagePartition).createSnapshot(); + Store mockStore = mock(Store.class); when(readOnlyStoreRepository.getStore(STORE_NAME)).thenReturn(mockStore); when(mockStore.isHybrid()).thenReturn(true); + BlobSnapshotManager blobSnapshotManager = - spy(new BlobSnapshotManager(BASE_PATH, SNAPSHOT_RETENTION_TIME, readOnlyStoreRepository)); - doReturn(mockCheckpoint).when(blobSnapshotManager).createCheckpoint(mockRocksDB); - doNothing().when(mockCheckpoint) - .createCheckpoint( - BASE_PATH + "/" + STORE_NAME + "/" + RocksDBUtils.getPartitionDbName(STORE_NAME, PARTITION_ID)); + spy(new BlobSnapshotManager(readOnlyStoreRepository, storageEngineRepository, storageMetadataService)); + doReturn(blobTransferPartitionMetadata).when(blobSnapshotManager).prepareMetadata(blobTransferPayload); - blobSnapshotManager.maybeUpdateHybridSnapshot(mockRocksDB, STORE_NAME, PARTITION_ID); + BlobTransferPartitionMetadata actualBlobTransferPartitionMetadata = + blobSnapshotManager.getTransferMetadata(blobTransferPayload); - verify(mockCheckpoint, times(1)).createCheckpoint(DB_DIR + "/.snapshot_files"); + // Due to the store is hybrid, it will re-create a new snapshot. + verify(storagePartition, times(1)).createSnapshot(); + Assert.assertEquals(actualBlobTransferPartitionMetadata, blobTransferPartitionMetadata); } @Test - public void testSnapshotUpdatesWhenStale() throws RocksDBException { - RocksDB mockRocksDB = mock(RocksDB.class); - Checkpoint mockCheckpoint = mock(Checkpoint.class); + public void testSameSnapshotWhenConcurrentUsersNotExceedMaxAllowedUsers() { Store mockStore = mock(Store.class); + when(readOnlyStoreRepository.getStore(STORE_NAME)).thenReturn(mockStore); when(mockStore.isHybrid()).thenReturn(true); + BlobSnapshotManager blobSnapshotManager = - spy(new BlobSnapshotManager(BASE_PATH, SNAPSHOT_RETENTION_TIME, readOnlyStoreRepository)); - doReturn(mockCheckpoint).when(blobSnapshotManager).createCheckpoint(mockRocksDB); - doNothing().when(mockCheckpoint).createCheckpoint(DB_DIR); - blobSnapshotManager.maybeUpdateHybridSnapshot(mockRocksDB, STORE_NAME, PARTITION_ID); - Map> snapShotTimestamps = new VeniceConcurrentHashMap<>(); - snapShotTimestamps.put(STORE_NAME, new VeniceConcurrentHashMap<>()); - snapShotTimestamps.get(STORE_NAME).put(PARTITION_ID, System.currentTimeMillis() - SNAPSHOT_RETENTION_TIME - 1); - blobSnapshotManager.setSnapShotTimestamps(snapShotTimestamps); - - blobSnapshotManager.maybeUpdateHybridSnapshot(mockRocksDB, STORE_NAME, PARTITION_ID); - - verify(mockCheckpoint, times(1)).createCheckpoint(DB_DIR + "/.snapshot_files"); + spy(new BlobSnapshotManager(readOnlyStoreRepository, storageEngineRepository, storageMetadataService)); + doReturn(blobTransferPartitionMetadata).when(blobSnapshotManager).prepareMetadata(blobTransferPayload); + + AbstractStoragePartition storagePartition = Mockito.mock(AbstractStoragePartition.class); + AbstractStorageEngine storageEngine = Mockito.mock(AbstractStorageEngine.class); + Mockito.doReturn(storageEngine).when(storageEngineRepository).getLocalStorageEngine(TOPIC_NAME); + Mockito.doReturn(storagePartition).when(storageEngine).getPartitionOrThrow(PARTITION_ID); + Mockito.doNothing().when(storagePartition).createSnapshot(); + + // Create snapshot for the first time + BlobTransferPartitionMetadata actualBlobTransferPartitionMetadata = + blobSnapshotManager.getTransferMetadata(blobTransferPayload); + Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers(TOPIC_NAME, PARTITION_ID), 1); + Assert.assertEquals(actualBlobTransferPartitionMetadata, blobTransferPartitionMetadata); + + // Try to create snapshot again with concurrent users + actualBlobTransferPartitionMetadata = blobSnapshotManager.getTransferMetadata(blobTransferPayload); + Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers(TOPIC_NAME, PARTITION_ID), 2); + Assert.assertEquals(actualBlobTransferPartitionMetadata, blobTransferPartitionMetadata); } @Test - public void testSameSnapshotWhenConcurrentUsers() throws RocksDBException { - RocksDB mockRocksDB = mock(RocksDB.class); - Checkpoint mockCheckpoint = mock(Checkpoint.class); + public void testSameSnapshotWhenConcurrentUsersExceedsMaxAllowedUsers() { Store mockStore = mock(Store.class); + when(readOnlyStoreRepository.getStore(STORE_NAME)).thenReturn(mockStore); when(mockStore.isHybrid()).thenReturn(true); + BlobSnapshotManager blobSnapshotManager = - spy(new BlobSnapshotManager(BASE_PATH, SNAPSHOT_RETENTION_TIME, readOnlyStoreRepository)); - doReturn(mockCheckpoint).when(blobSnapshotManager).createCheckpoint(mockRocksDB); - doNothing().when(mockCheckpoint) - .createCheckpoint( - BASE_PATH + "/" + STORE_NAME + "/" + RocksDBUtils.getPartitionDbName(STORE_NAME, PARTITION_ID)); - blobSnapshotManager.maybeUpdateHybridSnapshot(mockRocksDB, STORE_NAME, PARTITION_ID); - blobSnapshotManager.maybeUpdateHybridSnapshot(mockRocksDB, STORE_NAME, PARTITION_ID); - blobSnapshotManager.maybeUpdateHybridSnapshot(mockRocksDB, STORE_NAME, PARTITION_ID); - - Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers(mockRocksDB, STORE_NAME, PARTITION_ID), 3); + spy(new BlobSnapshotManager(readOnlyStoreRepository, storageEngineRepository, storageMetadataService)); + doReturn(blobTransferPartitionMetadata).when(blobSnapshotManager).prepareMetadata(blobTransferPayload); + + AbstractStoragePartition storagePartition = Mockito.mock(AbstractStoragePartition.class); + AbstractStorageEngine storageEngine = Mockito.mock(AbstractStorageEngine.class); + Mockito.doReturn(storageEngine).when(storageEngineRepository).getLocalStorageEngine(TOPIC_NAME); + Mockito.doReturn(storagePartition).when(storageEngine).getPartitionOrThrow(PARTITION_ID); + Mockito.doNothing().when(storagePartition).createSnapshot(); + + // Create snapshot + for (int tryCount = 0; tryCount < BlobSnapshotManager.DEFAULT_MAX_CONCURRENT_USERS; tryCount++) { + BlobTransferPartitionMetadata actualBlobTransferPartitionMetadata = + blobSnapshotManager.getTransferMetadata(blobTransferPayload); + Assert.assertEquals(actualBlobTransferPartitionMetadata, blobTransferPartitionMetadata); + } + + // The last snapshot creation should fail + try { + blobSnapshotManager.getTransferMetadata(blobTransferPayload); + } catch (VeniceException e) { + String errorMessage = String.format( + "Exceeded the maximum number of concurrent users %d for topic %s partition %d", + BlobSnapshotManager.DEFAULT_MAX_CONCURRENT_USERS, + TOPIC_NAME, + PARTITION_ID); + Assert.assertEquals(e.getMessage(), errorMessage); + } + Assert.assertEquals( + blobSnapshotManager.getConcurrentSnapshotUsers(TOPIC_NAME, PARTITION_ID), + BlobSnapshotManager.DEFAULT_MAX_CONCURRENT_USERS); } @Test - public void testMultipleThreads() throws RocksDBException { - final int numberOfThreads = 2; - final ExecutorService asyncExecutor = Executors.newFixedThreadPool(numberOfThreads); - final CountDownLatch latch = new CountDownLatch(numberOfThreads); - RocksDB mockRocksDB = mock(RocksDB.class); - Checkpoint mockCheckpoint = mock(Checkpoint.class); + public void testTwoRequestUsingSameOffset() { + // Prepare Store mockStore = mock(Store.class); + when(readOnlyStoreRepository.getStore(STORE_NAME)).thenReturn(mockStore); when(mockStore.isHybrid()).thenReturn(true); + BlobSnapshotManager blobSnapshotManager = - spy(new BlobSnapshotManager(BASE_PATH, SNAPSHOT_RETENTION_TIME, readOnlyStoreRepository)); - doReturn(mockCheckpoint).when(blobSnapshotManager).createCheckpoint(mockRocksDB); - doNothing().when(mockCheckpoint) - .createCheckpoint( - BASE_PATH + "/" + STORE_NAME + "/" + RocksDBUtils.getPartitionDbName(STORE_NAME, PARTITION_ID)); - - for (int i = 0; i < numberOfThreads; i++) { - asyncExecutor.submit(() -> { - blobSnapshotManager.maybeUpdateHybridSnapshot(mockRocksDB, STORE_NAME, PARTITION_ID); - blobSnapshotManager.decreaseConcurrentUserCount(STORE_NAME, PARTITION_ID); - latch.countDown(); - }); - } + spy(new BlobSnapshotManager(readOnlyStoreRepository, storageEngineRepository, storageMetadataService)); + doReturn(blobTransferPartitionMetadata).when(blobSnapshotManager).prepareMetadata(blobTransferPayload); - try { - // Wait for all threads to finish - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Test interrupted", e); - } + AbstractStoragePartition storagePartition = Mockito.mock(AbstractStoragePartition.class); + AbstractStorageEngine storageEngine = Mockito.mock(AbstractStorageEngine.class); + Mockito.doReturn(storageEngine).when(storageEngineRepository).getLocalStorageEngine(TOPIC_NAME); + Mockito.doReturn(storagePartition).when(storageEngine).getPartitionOrThrow(PARTITION_ID); + Mockito.doNothing().when(storagePartition).createSnapshot(); - Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers(mockRocksDB, STORE_NAME, PARTITION_ID), 0); + // first request for same payload but use offset 1 + BlobTransferPartitionMetadata actualBlobTransferPartitionMetadata = + blobSnapshotManager.getTransferMetadata(blobTransferPayload); + Assert.assertEquals(actualBlobTransferPartitionMetadata, blobTransferPartitionMetadata); + + // second request for same payload but use offset 2 + BlobTransferPartitionMetadata blobTransferPartitionMetadata2 = Mockito.mock(BlobTransferPartitionMetadata.class); + doReturn(blobTransferPartitionMetadata2).when(blobSnapshotManager).prepareMetadata(blobTransferPayload); + actualBlobTransferPartitionMetadata = blobSnapshotManager.getTransferMetadata(blobTransferPayload); + Assert.assertEquals(actualBlobTransferPartitionMetadata, blobTransferPartitionMetadata); + + // verify that the second offset record is not tracked, and the first offset record is still tracked + Assert.assertEquals( + blobSnapshotManager.getTransferredSnapshotMetadata(TOPIC_NAME, PARTITION_ID), + blobTransferPartitionMetadata); } @Test - public void testSnapshotNotUpdatedWhenNotHybrid() throws RocksDBException { - RocksDB mockRocksDB = mock(RocksDB.class); - Checkpoint mockCheckpoint = mock(Checkpoint.class); + public void testMultipleThreads() { + final int numberOfThreads = 2; + final ExecutorService asyncExecutor = Executors.newFixedThreadPool(numberOfThreads); + final CountDownLatch latch = new CountDownLatch(numberOfThreads); + Store mockStore = mock(Store.class); when(readOnlyStoreRepository.getStore(STORE_NAME)).thenReturn(mockStore); - when(mockStore.isHybrid()).thenReturn(false); + when(mockStore.isHybrid()).thenReturn(true); + BlobSnapshotManager blobSnapshotManager = - spy(new BlobSnapshotManager(BASE_PATH, SNAPSHOT_RETENTION_TIME, readOnlyStoreRepository)); - doReturn(mockCheckpoint).when(blobSnapshotManager).createCheckpoint(mockRocksDB); - doNothing().when(mockCheckpoint) - .createCheckpoint( - BASE_PATH + "/" + STORE_NAME + "/" + RocksDBUtils.getPartitionDbName(STORE_NAME, PARTITION_ID)); - blobSnapshotManager.maybeUpdateHybridSnapshot(mockRocksDB, STORE_NAME, PARTITION_ID); - verify(mockCheckpoint, times(0)).createCheckpoint(DB_DIR + "/.snapshot_files"); + spy(new BlobSnapshotManager(readOnlyStoreRepository, storageEngineRepository, storageMetadataService)); + doReturn(blobTransferPartitionMetadata).when(blobSnapshotManager).prepareMetadata(blobTransferPayload); + + AbstractStoragePartition storagePartition = Mockito.mock(AbstractStoragePartition.class); + AbstractStorageEngine storageEngine = Mockito.mock(AbstractStorageEngine.class); + Mockito.doReturn(storageEngine).when(storageEngineRepository).getLocalStorageEngine(TOPIC_NAME); + Mockito.doReturn(storagePartition).when(storageEngine).getPartitionOrThrow(PARTITION_ID); + Mockito.doNothing().when(storagePartition).createSnapshot(); + + try { + for (int i = 0; i < numberOfThreads; i++) { + asyncExecutor.submit(() -> { + BlobTransferPartitionMetadata actualBlobTransferPartitionMetadata = + blobSnapshotManager.getTransferMetadata(blobTransferPayload); + blobSnapshotManager.decreaseConcurrentUserCount(blobTransferPayload); + Assert.assertEquals(actualBlobTransferPartitionMetadata, blobTransferPartitionMetadata); + latch.countDown(); + }); + } + } catch (VeniceException e) { + String errorMessage = String.format( + "Snapshot is being used by some hosts, cannot update for topic %s partition %d", + TOPIC_NAME, + PARTITION_ID); + Assert.assertEquals(e.getMessage(), errorMessage); + } + + Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers(TOPIC_NAME, PARTITION_ID), 0); } @Test @@ -161,7 +235,7 @@ public void testCreateSnapshotForBatch() throws RocksDBException { // case 1: snapshot file not exists // test execute - BlobSnapshotManager.createSnapshotForBatch(mockRocksDB, fullSnapshotPath); + BlobSnapshotManager.createSnapshot(mockRocksDB, fullSnapshotPath); // test verify verify(mockCheckpoint, times(1)).createCheckpoint(fullSnapshotPath); fileUtilsMockedStatic.verify(() -> FileUtils.deleteDirectory(eq(file.getAbsoluteFile())), times(0)); @@ -173,7 +247,7 @@ public void testCreateSnapshotForBatch() throws RocksDBException { fullSnapshotDir.mkdirs(); } // test execute - BlobSnapshotManager.createSnapshotForBatch(mockRocksDB, fullSnapshotPath); + BlobSnapshotManager.createSnapshot(mockRocksDB, fullSnapshotPath); // test verify verify(mockCheckpoint, times(2)).createCheckpoint(fullSnapshotPath); fileUtilsMockedStatic.verify(() -> FileUtils.deleteDirectory(eq(file.getAbsoluteFile())), times(1)); @@ -184,7 +258,7 @@ public void testCreateSnapshotForBatch() throws RocksDBException { .thenThrow(new IOException("Delete snapshot file failed.")); // test execute try { - BlobSnapshotManager.createSnapshotForBatch(mockRocksDB, fullSnapshotPath); + BlobSnapshotManager.createSnapshot(mockRocksDB, fullSnapshotPath); Assert.fail("Should throw exception"); } catch (VeniceException e) { // test verify @@ -201,7 +275,7 @@ public void testCreateSnapshotForBatch() throws RocksDBException { .createCheckpoint(fullSnapshotPath); // test execute try { - BlobSnapshotManager.createSnapshotForBatch(mockRocksDB, fullSnapshotPath); + BlobSnapshotManager.createSnapshot(mockRocksDB, fullSnapshotPath); Assert.fail("Should throw exception"); } catch (VeniceException e) { // test verify diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java index ff0d65e84c7..8d7c4e11e28 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java @@ -7,6 +7,7 @@ import com.linkedin.davinci.blobtransfer.client.NettyFileTransferClient; import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService; +import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.venice.blobtransfer.BlobFinder; import com.linkedin.venice.blobtransfer.BlobPeersDiscoveryResponse; @@ -14,6 +15,7 @@ import com.linkedin.venice.exceptions.VenicePeersNotFoundException; import com.linkedin.venice.kafka.protocol.state.PartitionState; import com.linkedin.venice.kafka.protocol.state.StoreVersionState; +import com.linkedin.venice.meta.ReadOnlyStoreRepository; import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer; @@ -45,6 +47,7 @@ public class TestNettyP2PBlobTransferManager { NettyFileTransferClient client; NettyP2PBlobTransferManager manager; StorageMetadataService storageMetadataService; + BlobSnapshotManager blobSnapshotManager; Path tmpSnapshotDir; Path tmpPartitionDir; String TEST_STORE = "test_store"; @@ -60,7 +63,13 @@ public void setUp() throws Exception { tmpPartitionDir = Files.createTempDirectory(TMP_PARTITION_DIR); // intentionally use different directories for snapshot and partition so that we can verify the file transfer storageMetadataService = mock(StorageMetadataService.class); - server = new P2PBlobTransferService(port, tmpSnapshotDir.toString(), storageMetadataService); + + ReadOnlyStoreRepository readOnlyStoreRepository = mock(ReadOnlyStoreRepository.class); + StorageEngineRepository storageEngineRepository = mock(StorageEngineRepository.class); + blobSnapshotManager = + Mockito.spy(new BlobSnapshotManager(readOnlyStoreRepository, storageEngineRepository, storageMetadataService)); + + server = new P2PBlobTransferService(port, tmpSnapshotDir.toString(), blobSnapshotManager); client = Mockito.spy(new NettyFileTransferClient(port, tmpPartitionDir.toString(), storageMetadataService)); finder = mock(BlobFinder.class); @@ -130,7 +139,10 @@ public void testNoResultFromFinder() { } @Test - public void testLocalFileTransfer() throws IOException, ExecutionException, InterruptedException, TimeoutException { + public void testLocalFileTransferInBatchStore() + throws IOException, ExecutionException, InterruptedException, TimeoutException { + Mockito.doReturn(false).when(blobSnapshotManager).isStoreHybrid(anyString()); + BlobPeersDiscoveryResponse response = new BlobPeersDiscoveryResponse(); response.setDiscoveryResult(Collections.singletonList("localhost")); doReturn(response).when(finder).discoverBlobPeers(anyString(), anyInt(), anyInt()); @@ -285,4 +297,93 @@ public void testRetryAndSkipBadHostAndUseCorrectHost() Mockito.verify(storageMetadataService, Mockito.times(1)) .computeStoreVersionState(Mockito.anyString(), Mockito.any()); } + + @Test + public void testLocalFileTransferInHybridStore() + throws IOException, ExecutionException, InterruptedException, TimeoutException { + Mockito.doReturn(true).when(blobSnapshotManager).isStoreHybrid(anyString()); + Mockito.doNothing().when(blobSnapshotManager).createSnapshot(anyString(), anyInt()); + + BlobPeersDiscoveryResponse response = new BlobPeersDiscoveryResponse(); + response.setDiscoveryResult(Collections.singletonList("localhost")); + doReturn(response).when(finder).discoverBlobPeers(anyString(), anyInt(), anyInt()); + + StoreVersionState storeVersionState = new StoreVersionState(); + Mockito.doReturn(storeVersionState).when(storageMetadataService).getStoreVersionState(Mockito.any()); + + InternalAvroSpecificSerializer partitionStateSerializer = + AvroProtocolDefinition.PARTITION_STATE.getSerializer(); + OffsetRecord expectOffsetRecord = new OffsetRecord(partitionStateSerializer); + expectOffsetRecord.setOffsetLag(1000L); + Mockito.doReturn(expectOffsetRecord).when(storageMetadataService).getLastOffset(Mockito.any(), Mockito.anyInt()); + + // Prepare files in the snapshot directory + Path snapshotDir = Paths.get( + RocksDBUtils.composeSnapshotDir(tmpSnapshotDir.toString(), TEST_STORE + "_v" + TEST_VERSION, TEST_PARTITION)); + Path partitionDir = Paths.get( + RocksDBUtils + .composePartitionDbDir(tmpPartitionDir.toString(), TEST_STORE + "_v" + TEST_VERSION, TEST_PARTITION)); + Files.createDirectories(snapshotDir); + Path file1 = snapshotDir.resolve("file1.txt"); + Path file2 = snapshotDir.resolve("file2.txt"); + Path file3 = snapshotDir.resolve("file3.txt"); + Path destFile1 = partitionDir.resolve("file1.txt"); + Path destFile2 = partitionDir.resolve("file2.txt"); + Path destFile3 = partitionDir.resolve("file3.txt"); + // small file + Files.write(file1.toAbsolutePath(), "helloworld".getBytes()); + Files.write(file3.toAbsolutePath(), "helloworldtwice".getBytes()); + // large file + long size = 10 * 1024 * 1024; + // Create an array of dummy bytes + byte[] dummyData = new byte[1024]; // 1KB of dummy data + Arrays.fill(dummyData, (byte) 0); // Fill with zeros or any dummy value + + // Write data to the file in chunks + for (long written = 0; written < size; written += dummyData.length) { + Files.write(file2.toAbsolutePath(), dummyData, StandardOpenOption.CREATE, StandardOpenOption.APPEND); + } + + // both files don't exist in the partition directory + Assert.assertTrue(Files.notExists(destFile1)); + Assert.assertTrue(Files.notExists(destFile2)); + Assert.assertTrue(Files.notExists(destFile3)); + + // Manager should be able to fetch the file and download it to another directory + CompletionStage future = manager.get(TEST_STORE, TEST_VERSION, TEST_PARTITION); + future.toCompletableFuture().get(1, TimeUnit.MINUTES); + + // Verify files are all written to the partition directory + Assert.assertTrue(Files.exists(destFile1)); + Assert.assertTrue(Files.exists(destFile2)); + Assert.assertTrue(Files.exists(destFile3)); + + // same content + Assert.assertTrue(Arrays.equals(Files.readAllBytes(file1), Files.readAllBytes(destFile1))); + Assert.assertTrue(Arrays.equals(Files.readAllBytes(file2), Files.readAllBytes(destFile2))); + Assert.assertTrue(Arrays.equals(Files.readAllBytes(file3), Files.readAllBytes(destFile3))); + + // Verify the metadata is retrieved one time, + // which is the first time preparing the snapshot before new snapshot is created. + Mockito.verify(storageMetadataService, Mockito.times(1)) + .getLastOffset(TEST_STORE + "_v" + TEST_VERSION, TEST_PARTITION); + Mockito.verify(storageMetadataService, Mockito.times(1)).getStoreVersionState(TEST_STORE + "_v" + TEST_VERSION); + + // Verify the record is updated + Mockito.verify(storageMetadataService, Mockito.times(1)) + .put(TEST_STORE + "_v" + TEST_VERSION, TEST_PARTITION, expectOffsetRecord); + + // Verify the store version state is updated + Mockito.verify(storageMetadataService, Mockito.times(1)) + .computeStoreVersionState(Mockito.anyString(), Mockito.any()); + + // Verify the createSnapshot is called + Mockito.verify(blobSnapshotManager, Mockito.times(1)).prepareMetadata(Mockito.any()); + Mockito.verify(blobSnapshotManager, Mockito.times(1)).createSnapshot(TEST_STORE + "_v" + TEST_VERSION, 0); + + // Verify the concurrent user of this partition is 0 as it should firstly be 1 and after the file is sent, + // it should decrease to 0 + long concurrentUser = blobSnapshotManager.getConcurrentSnapshotUsers(TEST_STORE + "_v" + TEST_VERSION, 0); + Assert.assertEquals(concurrentUser, 0); + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java index 48622fe9a53..7ef341c102a 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java @@ -7,9 +7,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.davinci.blobtransfer.server.P2PFileTransferServerHandler; +import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.venice.kafka.protocol.state.PartitionState; import com.linkedin.venice.kafka.protocol.state.StoreVersionState; +import com.linkedin.venice.meta.ReadOnlyStoreRepository; import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer; @@ -48,12 +50,20 @@ public class TestP2PFileTransferServerHandler { Path baseDir; StorageMetadataService storageMetadataService; P2PFileTransferServerHandler serverHandler; + BlobSnapshotManager blobSnapshotManager; + ReadOnlyStoreRepository readOnlyStoreRepository; + StorageEngineRepository storageEngineRepository; @BeforeMethod public void setUp() throws IOException { baseDir = Files.createTempDirectory("tmp"); storageMetadataService = Mockito.mock(StorageMetadataService.class); - serverHandler = new P2PFileTransferServerHandler(baseDir.toString(), storageMetadataService); + readOnlyStoreRepository = Mockito.mock(ReadOnlyStoreRepository.class); + storageEngineRepository = Mockito.mock(StorageEngineRepository.class); + + blobSnapshotManager = + new BlobSnapshotManager(readOnlyStoreRepository, storageEngineRepository, storageMetadataService); + serverHandler = new P2PFileTransferServerHandler(baseDir.toString(), blobSnapshotManager); ch = new EmbeddedChannel(serverHandler); } @@ -87,14 +97,33 @@ public void testRejectInvalidPath() { @Test public void testRejectNonExistPath() { + // prepare response from metadata service for the metadata preparation + StoreVersionState storeVersionState = new StoreVersionState(); + Mockito.doReturn(storeVersionState).when(storageMetadataService).getStoreVersionState(Mockito.any()); + InternalAvroSpecificSerializer partitionStateSerializer = + AvroProtocolDefinition.PARTITION_STATE.getSerializer(); + OffsetRecord offsetRecord = new OffsetRecord(partitionStateSerializer); + offsetRecord.setOffsetLag(1000L); + Mockito.doReturn(offsetRecord).when(storageMetadataService).getLastOffset(Mockito.any(), Mockito.anyInt()); + FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/myStore/1/10"); ch.writeInbound(request); FullHttpResponse response = ch.readOutbound(); Assert.assertEquals(response.status().code(), 404); + Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0L); } @Test public void testFailOnAccessPath() throws IOException { + // prepare response from metadata service for the metadata preparation + StoreVersionState storeVersionState = new StoreVersionState(); + Mockito.doReturn(storeVersionState).when(storageMetadataService).getStoreVersionState(Mockito.any()); + InternalAvroSpecificSerializer partitionStateSerializer = + AvroProtocolDefinition.PARTITION_STATE.getSerializer(); + OffsetRecord offsetRecord = new OffsetRecord(partitionStateSerializer); + offsetRecord.setOffsetLag(1000L); + Mockito.doReturn(offsetRecord).when(storageMetadataService).getLastOffset(Mockito.any(), Mockito.anyInt()); + // create an empty snapshot dir Path snapshotDir = Paths.get(RocksDBUtils.composeSnapshotDir(baseDir.toString(), "myStore_v1", 10)); Files.createDirectories(snapshotDir); @@ -103,6 +132,7 @@ public void testFailOnAccessPath() throws IOException { ch.writeInbound(request); FullHttpResponse response = ch.readOutbound(); Assert.assertEquals(response.status().code(), 500); + Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0L); } @Test @@ -114,7 +144,7 @@ public void testIdleChannelClose() { } @Test - public void testTransferSingleFileAndSingleMetadata() throws IOException { + public void testTransferSingleFileAndSingleMetadataForBatchStore() throws IOException { // prepare response from metadata service StoreVersionState storeVersionState = new StoreVersionState(); Mockito.doReturn(storeVersionState).when(storageMetadataService).getStoreVersionState(Mockito.any()); @@ -163,6 +193,8 @@ public void testTransferSingleFileAndSingleMetadata() throws IOException { DefaultHttpResponse endOfTransfer = (DefaultHttpResponse) response; Assert.assertEquals(endOfTransfer.headers().get(BLOB_TRANSFER_STATUS), BLOB_TRANSFER_COMPLETED); // end of STATUS response + + Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0L); } @Test @@ -239,8 +271,14 @@ public void testTransferMultipleFiles() throws IOException { DefaultHttpResponse endOfTransfer = (DefaultHttpResponse) response; Assert.assertEquals(endOfTransfer.headers().get(BLOB_TRANSFER_STATUS), BLOB_TRANSFER_COMPLETED); // end of STATUS response + + Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0L); } + /** + * Test when fail to get the metadata from storageMetadataService, it should return error to client. + * @throws IOException + */ @Test public void testWhenMetadataCreateError() throws IOException { // prepare the file request @@ -252,25 +290,11 @@ public void testWhenMetadataCreateError() throws IOException { ch.writeInbound(request); - // start of file1 + // metadata in server side has error Object response = ch.readOutbound(); Assert.assertTrue(response instanceof DefaultHttpResponse); - DefaultHttpResponse httpResponse = (DefaultHttpResponse) response; - Assert.assertEquals( - httpResponse.headers().get(HttpHeaderNames.CONTENT_DISPOSITION), - "attachment; filename=\"file1\""); - Assert.assertEquals(httpResponse.headers().get(BLOB_TRANSFER_TYPE), BlobTransferType.FILE.toString()); - // send the content in one chunk - response = ch.readOutbound(); - Assert.assertTrue(response instanceof DefaultFileRegion); - // the last empty response for file1 - response = ch.readOutbound(); - Assert.assertTrue(response instanceof LastHttpContent); - // end of file1 + Assert.assertEquals(((DefaultHttpResponse) response).status(), HttpResponseStatus.NOT_FOUND); - // metadata in server side has error - response = ch.readOutbound(); - Assert.assertTrue(response instanceof DefaultHttpResponse); - Assert.assertEquals(((DefaultHttpResponse) response).status(), HttpResponseStatus.INTERNAL_SERVER_ERROR); + Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0L); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartitionTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartitionTest.java index 0e85a51d2e9..974f7c57834 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartitionTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartitionTest.java @@ -932,17 +932,17 @@ public void testCreateSnapshot(boolean blobTransferEnabled) { storeConfig); try (MockedStatic mockedBlobSnapshotManager = Mockito.mockStatic(BlobSnapshotManager.class)) { - mockedBlobSnapshotManager.when(() -> BlobSnapshotManager.createSnapshotForBatch(Mockito.any(), Mockito.any())) + mockedBlobSnapshotManager.when(() -> BlobSnapshotManager.createSnapshot(Mockito.any(), Mockito.any())) .thenAnswer(invocation -> { return null; }); storagePartition.createSnapshot(); if (blobTransferEnabled) { mockedBlobSnapshotManager - .verify(() -> BlobSnapshotManager.createSnapshotForBatch(Mockito.any(), Mockito.any()), Mockito.times(1)); + .verify(() -> BlobSnapshotManager.createSnapshot(Mockito.any(), Mockito.any()), Mockito.times(1)); } else { mockedBlobSnapshotManager - .verify(() -> BlobSnapshotManager.createSnapshotForBatch(Mockito.any(), Mockito.any()), Mockito.never()); + .verify(() -> BlobSnapshotManager.createSnapshot(Mockito.any(), Mockito.any()), Mockito.never()); } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index c5fe0392881..8b35fcdf18f 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -1761,6 +1761,10 @@ private ConfigKeys() { "davinci.push.status.scan.max.offline.instance.ratio"; // this is a host-level config to decide whether bootstrap a blob transfer manager for the host public static final String BLOB_TRANSFER_MANAGER_ENABLED = "blob.transfer.manager.enabled"; + public static final String BLOB_TRANSFER_SNAPSHOT_RETENTION_TIME_IN_MIN = + "blob.transfer.snapshot.retention.time.in.min"; + public static final String BLOB_TRANSFER_MAX_CONCURRENT_SNAPSHOT_USER = "blob.transfer.max.concurrent.snapshot.user"; + // Port used by peer-to-peer transfer service. It should be used by both server and client public static final String DAVINCI_P2P_BLOB_TRANSFER_SERVER_PORT = "davinci.p2p.blob.transfer.server.port"; // Ideally this config should NOT be used but for testing purpose on a single host, we need to separate the ports. diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/BlobP2PTransferAmongServersTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/BlobP2PTransferAmongServersTest.java index b5830d8bdeb..c6ac0c9956c 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/BlobP2PTransferAmongServersTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/BlobP2PTransferAmongServersTest.java @@ -5,6 +5,7 @@ import static com.linkedin.venice.integration.utils.VeniceClusterWrapper.DEFAULT_KEY_SCHEMA; import static com.linkedin.venice.meta.StoreStatus.FULLLY_REPLICATED; import static com.linkedin.venice.utils.IntegrationTestPushUtils.defaultVPJProps; +import static com.linkedin.venice.utils.TestWriteUtils.STRING_SCHEMA; import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_STORE_NAME_PROP; import com.linkedin.venice.ConfigKeys; @@ -14,6 +15,8 @@ import com.linkedin.venice.integration.utils.VeniceClusterWrapper; import com.linkedin.venice.integration.utils.VeniceServerWrapper; import com.linkedin.venice.meta.PersistenceType; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.Version; import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.store.rocksdb.RocksDBUtils; import com.linkedin.venice.utils.IntegrationTestPushUtils; @@ -31,6 +34,7 @@ import org.apache.commons.io.FileUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.samza.system.SystemProducer; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -40,6 +44,7 @@ public class BlobP2PTransferAmongServersTest { private static final Logger LOGGER = LogManager.getLogger(BlobP2PTransferAmongServersTest.class); private static int PARTITION_COUNT = 3; + private static final int STREAMING_RECORD_SIZE = 1024; private String path1; private String path2; private VeniceClusterWrapper cluster; @@ -62,10 +67,10 @@ public void tearDown() { } @Test(singleThreaded = true) - public void testBlobP2PTransferAmongServers() throws Exception { + public void testBlobP2PTransferAmongServersForBatchStore() throws Exception { String storeName = "test-store"; Consumer paramsConsumer = params -> params.setBlobTransferEnabled(true); - setUpStore(cluster, storeName, paramsConsumer, properties -> {}, true); + setUpBatchStore(cluster, storeName, paramsConsumer, properties -> {}, true); VeniceServerWrapper server1 = cluster.getVeniceServers().get(0); VeniceServerWrapper server2 = cluster.getVeniceServers().get(1); @@ -138,7 +143,7 @@ public void testBlobP2PTransferAmongServers() throws Exception { public void testBlobTransferThrowExceptionIfSnapshotNotExisted() throws Exception { String storeName = "test-store-snapshot-not-existed"; Consumer paramsConsumer = params -> params.setBlobTransferEnabled(true); - setUpStore(cluster, storeName, paramsConsumer, properties -> {}, true); + setUpBatchStore(cluster, storeName, paramsConsumer, properties -> {}, true); VeniceServerWrapper server1 = cluster.getVeniceServers().get(0); @@ -242,7 +247,7 @@ public VeniceClusterWrapper initializeVeniceCluster() { return veniceClusterWrapper; } - private void setUpStore( + private void setUpBatchStore( VeniceClusterWrapper cluster, String storeName, Consumer paramsConsumer, @@ -280,4 +285,96 @@ private static void runVPJ(Properties vpjProperties, int expectedVersionNumber, cluster.waitVersion(storeName, expectedVersionNumber); LOGGER.info("**TIME** VPJ" + expectedVersionNumber + " takes " + (System.currentTimeMillis() - vpjStart)); } + + @Test(singleThreaded = true) + public void testBlobP2PTransferAmongServersForHybridStore() throws Exception { + ControllerClient controllerClient = new ControllerClient(cluster.getClusterName(), cluster.getAllControllersURLs()); + // prepare hybrid store. + String storeName = "test-store-hybrid"; + long streamingRewindSeconds = 25L; + long streamingMessageLag = 2L; + controllerClient.createNewStore(storeName, "owner", STRING_SCHEMA.toString(), STRING_SCHEMA.toString()); + controllerClient.updateStore( + storeName, + new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA) + .setHybridRewindSeconds(streamingRewindSeconds) + .setHybridOffsetLagThreshold(streamingMessageLag) + .setBlobTransferEnabled(true)); + + controllerClient.emptyPush(storeName, Utils.getUniqueString("empty-hybrid-push"), 1L); + VeniceServerWrapper server1 = cluster.getVeniceServers().get(0); + VeniceServerWrapper server2 = cluster.getVeniceServers().get(1); + + // offset record should be same after the empty push + for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) { + OffsetRecord offsetRecord1 = + server1.getVeniceServer().getStorageMetadataService().getLastOffset(storeName + "_v1", partitionId); + OffsetRecord offsetRecord2 = + server2.getVeniceServer().getStorageMetadataService().getLastOffset(storeName + "_v1", partitionId); + Assert.assertEquals(offsetRecord2.getLocalVersionTopicOffset(), offsetRecord1.getLocalVersionTopicOffset()); + } + + // cleanup and stop server 1 + cluster.stopVeniceServer(server1.getPort()); + FileUtils.deleteDirectory( + new File(RocksDBUtils.composePartitionDbDir(path1 + "/rocksdb", storeName + "_v1", METADATA_PARTITION_ID))); + for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) { + FileUtils.deleteDirectory( + new File(RocksDBUtils.composePartitionDbDir(path1 + "/rocksdb", storeName + "_v1", partitionId))); + // both partition db and snapshot should be deleted + Assert.assertFalse( + Files.exists( + Paths.get(RocksDBUtils.composePartitionDbDir(path1 + "/rocksdb", storeName + "_v1", partitionId)))); + Assert.assertFalse( + Files.exists(Paths.get(RocksDBUtils.composeSnapshotDir(path1 + "/rocksdb", storeName + "_v1", partitionId)))); + } + + // send records to server 2 only + SystemProducer veniceProducer = null; + for (int i = 1; i <= 10; i++) { + veniceProducer = IntegrationTestPushUtils.getSamzaProducer(cluster, storeName, Version.PushType.STREAM); + IntegrationTestPushUtils.sendCustomSizeStreamingRecord(veniceProducer, storeName, i, STREAMING_RECORD_SIZE); + } + if (veniceProducer != null) { + veniceProducer.stop(); + } + + // restart server 1 + TestUtils.waitForNonDeterministicAssertion(2, TimeUnit.MINUTES, () -> { + cluster.restartVeniceServer(server1.getPort()); + Assert.assertTrue(server1.isRunning()); + }); + + // wait for server 1 is fully replicated + cluster.getVeniceControllers().forEach(controller -> { + TestUtils.waitForNonDeterministicAssertion(2, TimeUnit.MINUTES, () -> { + Assert.assertEquals( + controller.getController() + .getVeniceControllerService() + .getVeniceHelixAdmin() + .getAllStoreStatuses(cluster.getClusterName()) + .get(storeName), + FULLLY_REPLICATED.toString()); + }); + }); + + TestUtils.waitForNonDeterministicAssertion(2, TimeUnit.MINUTES, () -> { + for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) { + File file = new File(RocksDBUtils.composePartitionDbDir(path1 + "/rocksdb", storeName + "_v1", partitionId)); + Boolean fileExisted = Files.exists(file.toPath()); + Assert.assertTrue(fileExisted); + } + }); + + // server 1 and 2 offset record should be the same + TestUtils.waitForNonDeterministicAssertion(2, TimeUnit.MINUTES, () -> { + for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) { + OffsetRecord offsetServer1 = + server1.getVeniceServer().getStorageMetadataService().getLastOffset(storeName + "_v1", partitionId); + OffsetRecord offsetServer2 = + server2.getVeniceServer().getStorageMetadataService().getLastOffset(storeName + "_v1", partitionId); + Assert.assertEquals(offsetServer1.getLocalVersionTopicOffset(), offsetServer2.getLocalVersionTopicOffset()); + } + }); + } } diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index 0314e0f1941..d3a76cf7914 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -453,7 +453,11 @@ private List createServices() { serverConfig.getDvcP2pBlobTransferClientPort(), serverConfig.getRocksDBPath(), customizedViewFuture, - storageMetadataService); + storageMetadataService, + metadataRepo, + storageService.getStorageEngineRepository(), + serverConfig.getMaxConcurrentSnapshotUser(), + serverConfig.getSnapshotRetentionTimeInMin()); } else { blobTransferManager = null; }