Skip to content

Commit

Permalink
[test] Fixed unit test related to multi threading (#1054)
Browse files Browse the repository at this point in the history
* [server][davinci] Created interface for Blob dicovery and added a server implementation

* Fixed inconsistent unit test

* Fixed inconsistent unit test
  • Loading branch information
sebas-inf authored Jun 28, 2024
1 parent 7c582e6 commit 1894f95
Showing 1 changed file with 21 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.rocksdb.Checkpoint;
Expand Down Expand Up @@ -90,9 +90,10 @@ public void testSameSnapshotWhenConcurrentUsers() throws RocksDBException {

@Test
public void testMultipleThreads() throws RocksDBException {
final ExecutorService asyncExecutor = Executors.newFixedThreadPool(2);
final int numberOfThreads = 2;
final ExecutorService asyncExecutor = Executors.newFixedThreadPool(numberOfThreads);
final CountDownLatch latch = new CountDownLatch(numberOfThreads);
RocksDB mockRocksDB = mock(RocksDB.class);
Random rand = new Random();
Checkpoint mockCheckpoint = mock(Checkpoint.class);
Store mockStore = mock(Store.class);
when(readOnlyStoreRepository.getStore(STORE_NAME)).thenReturn(mockStore);
Expand All @@ -103,16 +104,23 @@ public void testMultipleThreads() throws RocksDBException {
doNothing().when(mockCheckpoint)
.createCheckpoint(
BASE_PATH + "/" + STORE_NAME + "/" + RocksDBUtils.getPartitionDbName(STORE_NAME, PARTITION_ID));
asyncExecutor.submit(() -> {
blobSnapshotManager.maybeUpdateHybridSnapshot(mockRocksDB, STORE_NAME, PARTITION_ID);
Utils.sleep(rand.nextInt(1000));
blobSnapshotManager.decreaseConcurrentUserCount(STORE_NAME, PARTITION_ID);
});
asyncExecutor.submit(() -> {
blobSnapshotManager.maybeUpdateHybridSnapshot(mockRocksDB, STORE_NAME, PARTITION_ID);
Utils.sleep(rand.nextInt(1000));
blobSnapshotManager.decreaseConcurrentUserCount(STORE_NAME, PARTITION_ID);
});

for (int i = 0; i < numberOfThreads; i++) {
asyncExecutor.submit(() -> {
blobSnapshotManager.maybeUpdateHybridSnapshot(mockRocksDB, STORE_NAME, PARTITION_ID);
blobSnapshotManager.decreaseConcurrentUserCount(STORE_NAME, PARTITION_ID);
latch.countDown();
});
}

try {
// Wait for all threads to finish
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Test interrupted", e);
}

Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers(mockRocksDB, STORE_NAME, PARTITION_ID), 0);
}

Expand Down

0 comments on commit 1894f95

Please sign in to comment.