diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManagerTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManagerTest.java index f7dddf49c30..83fad2f43e5 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManagerTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManagerTest.java @@ -8,7 +8,7 @@ import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import java.util.Map; -import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.rocksdb.Checkpoint; @@ -90,9 +90,10 @@ public void testSameSnapshotWhenConcurrentUsers() throws RocksDBException { @Test public void testMultipleThreads() throws RocksDBException { - final ExecutorService asyncExecutor = Executors.newFixedThreadPool(2); + final int numberOfThreads = 2; + final ExecutorService asyncExecutor = Executors.newFixedThreadPool(numberOfThreads); + final CountDownLatch latch = new CountDownLatch(numberOfThreads); RocksDB mockRocksDB = mock(RocksDB.class); - Random rand = new Random(); Checkpoint mockCheckpoint = mock(Checkpoint.class); Store mockStore = mock(Store.class); when(readOnlyStoreRepository.getStore(STORE_NAME)).thenReturn(mockStore); @@ -103,16 +104,23 @@ public void testMultipleThreads() throws RocksDBException { doNothing().when(mockCheckpoint) .createCheckpoint( BASE_PATH + "/" + STORE_NAME + "/" + RocksDBUtils.getPartitionDbName(STORE_NAME, PARTITION_ID)); - asyncExecutor.submit(() -> { - blobSnapshotManager.maybeUpdateHybridSnapshot(mockRocksDB, STORE_NAME, PARTITION_ID); - Utils.sleep(rand.nextInt(1000)); - blobSnapshotManager.decreaseConcurrentUserCount(STORE_NAME, PARTITION_ID); - }); - asyncExecutor.submit(() -> { - blobSnapshotManager.maybeUpdateHybridSnapshot(mockRocksDB, STORE_NAME, PARTITION_ID); - Utils.sleep(rand.nextInt(1000)); - blobSnapshotManager.decreaseConcurrentUserCount(STORE_NAME, PARTITION_ID); - }); + + for (int i = 0; i < numberOfThreads; i++) { + asyncExecutor.submit(() -> { + blobSnapshotManager.maybeUpdateHybridSnapshot(mockRocksDB, STORE_NAME, PARTITION_ID); + blobSnapshotManager.decreaseConcurrentUserCount(STORE_NAME, PARTITION_ID); + latch.countDown(); + }); + } + + try { + // Wait for all threads to finish + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted", e); + } + Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers(mockRocksDB, STORE_NAME, PARTITION_ID), 0); }