diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
index b242a0d8663..b22564e01f7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
@@ -41,6 +41,9 @@
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.TestUtils;
import org.junit.Before;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Tags;
+import org.junit.jupiter.api.Test;
/**
* Test BookieStorage with a threshold.
@@ -141,6 +144,8 @@ public float checkDir(File dir) throws DiskErrorException, DiskOutOfSpaceExcepti
}
@FlakyTest(value = "https://github.com/apache/bookkeeper/issues/1562")
+ @Tag("flaky")
+ @Test
public void testStorageThresholdCompaction() throws Exception {
stopAllBookies();
ServerConfiguration conf = newServerConfiguration();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java
index 4b92df528fe..a316bb0b380 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java
@@ -22,12 +22,12 @@
import static org.apache.bookkeeper.bookie.storage.EntryLogTestUtils.assertEntryEquals;
import static org.apache.bookkeeper.bookie.storage.EntryLogTestUtils.makeEntry;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
@@ -41,7 +41,6 @@
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
@@ -72,20 +71,19 @@
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
import org.apache.commons.io.FileUtils;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
-import org.junit.FixMethodOrder;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runners.MethodSorters;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests for EntryLog.
*/
-@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class DefaultEntryLogTest {
private static final Logger LOG = LoggerFactory.getLogger(DefaultEntryLogTest.class);
@@ -104,7 +102,7 @@ File createTempDir(String prefix, String suffix) throws IOException {
private LedgerDirsManager dirsMgr;
private DefaultEntryLogger entryLogger;
- @Before
+ @BeforeAll
public void setUp() throws Exception {
this.rootDir = createTempDir("bkTest", ".dir");
this.curDir = BookieImpl.getCurrentDirectory(rootDir);
@@ -119,7 +117,7 @@ public void setUp() throws Exception {
this.entryLogger = new DefaultEntryLogger(conf, dirsMgr);
}
- @After
+ @AfterAll
public void tearDown() throws Exception {
if (null != this.entryLogger) {
entryLogger.close();
@@ -332,28 +330,6 @@ public void testMissingLogId() throws Exception {
}
}
- /**
- * Test that EntryLogger Should fail with FNFE, if entry logger directories does not exist.
- */
- @Ignore // no longer valid as LedgerDirsManager creates the directory as needed
- public void testEntryLoggerShouldThrowFNFEIfDirectoriesDoesNotExist()
- throws Exception {
- File tmpDir = createTempDir("bkTest", ".dir");
- DefaultEntryLogger entryLogger = null;
- try {
- entryLogger = new DefaultEntryLogger(conf, new LedgerDirsManager(conf, new File[] { tmpDir },
- new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())));
- fail("Expecting FileNotFoundException");
- } catch (FileNotFoundException e) {
- assertEquals("Entry log directory '" + tmpDir + "/current' does not exist", e
- .getLocalizedMessage());
- } finally {
- if (entryLogger != null) {
- entryLogger.close();
- }
- }
- }
-
/**
* Test to verify the DiskFull during addEntry.
*/
@@ -384,9 +360,9 @@ public void testAddEntryFailureOnDiskFull() throws Exception {
.getCurrentLogForLedger(DefaultEntryLogger.UNASSIGNED_LEDGERID).getLogFile().getParentFile());
ledgerStorage.addEntry(generateEntry(3, 1));
// Verify written entries
- Assert.assertTrue(0 == generateEntry(1, 1).compareTo(ledgerStorage.getEntry(1, 1)));
- Assert.assertTrue(0 == generateEntry(2, 1).compareTo(ledgerStorage.getEntry(2, 1)));
- Assert.assertTrue(0 == generateEntry(3, 1).compareTo(ledgerStorage.getEntry(3, 1)));
+ assertEquals(0, generateEntry(1, 1).compareTo(ledgerStorage.getEntry(1, 1)));
+ assertEquals(0, generateEntry(2, 1).compareTo(ledgerStorage.getEntry(2, 1)));
+ assertEquals(0, generateEntry(3, 1).compareTo(ledgerStorage.getEntry(3, 1)));
}
/**
@@ -707,6 +683,8 @@ public Boolean call() throws IOException, BookieException {
* using InterleavedLedgerStorage.
*/
@FlakyTest(value = "https://github.com/apache/bookkeeper/issues/1516")
+ @Tag("flaky")
+ @Test
public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() throws Exception {
testConcurrentWriteAndReadCalls(InterleavedLedgerStorage.class.getName(), false);
}
@@ -716,6 +694,8 @@ public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() throws E
* using InterleavedLedgerStorage with EntryLogPerLedger enabled.
*/
@FlakyTest(value = "https://github.com/apache/bookkeeper/issues/1516")
+ @Tag("flaky")
+ @Test
public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorageWithELPLEnabled() throws Exception {
testConcurrentWriteAndReadCalls(InterleavedLedgerStorage.class.getName(), true);
}
@@ -725,6 +705,8 @@ public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorageWithELPLEna
* using SortedLedgerStorage.
*/
@FlakyTest(value = "https://github.com/apache/bookkeeper/issues/1516")
+ @Tag("flaky")
+ @Test
public void testConcurrentWriteAndReadCallsOfSortedLedgerStorage() throws Exception {
testConcurrentWriteAndReadCalls(SortedLedgerStorage.class.getName(), false);
}
@@ -734,6 +716,8 @@ public void testConcurrentWriteAndReadCallsOfSortedLedgerStorage() throws Except
* using SortedLedgerStorage with EntryLogPerLedger enabled.
*/
@FlakyTest(value = "https://github.com/apache/bookkeeper/issues/1516")
+ @Tag("flaky")
+ @Test
public void testConcurrentWriteAndReadCallsOfSortedLedgerStorageWithELPLEnabled() throws Exception {
testConcurrentWriteAndReadCalls(SortedLedgerStorage.class.getName(), true);
}
@@ -1014,10 +998,8 @@ public void testEntryLogManagerInterfaceForEntryLogPerLedger() throws Exception
/*
* since new entryLogs are set for all the ledgers, previous entrylogs would be added to rotatedLogChannels
*/
- Assert.assertEquals("Number of current active EntryLogs ", numOfLedgers,
- entryLogManager.getCopyOfCurrentLogs().size());
- Assert.assertEquals("Number of Rotated Logs ", numOfLedgers,
- entryLogManager.getRotatedLogChannels().size());
+ assertEquals(numOfLedgers, entryLogManager.getCopyOfCurrentLogs().size(), "Number of current active EntryLogs ");
+ assertEquals(numOfLedgers, entryLogManager.getRotatedLogChannels().size(), "Number of Rotated Logs ");
for (long i = 0; i < numOfLedgers; i++) {
entryLogManager.setCurrentLogForLedgerAndAddToRotate(i,
@@ -1028,25 +1010,22 @@ public void testEntryLogManagerInterfaceForEntryLogPerLedger() throws Exception
* again since new entryLogs are set for all the ledgers, previous entrylogs would be added to
* rotatedLogChannels
*/
- Assert.assertEquals("Number of current active EntryLogs ", numOfLedgers,
- entryLogManager.getCopyOfCurrentLogs().size());
- Assert.assertEquals("Number of Rotated Logs ", 2 * numOfLedgers,
- entryLogManager.getRotatedLogChannels().size());
+ assertEquals(numOfLedgers, entryLogManager.getCopyOfCurrentLogs().size(), "Number of current active EntryLogs ");
+ assertEquals(2 * numOfLedgers, entryLogManager.getRotatedLogChannels().size(), "Number of Rotated Logs ");
for (BufferedLogChannel logChannel : entryLogManager.getRotatedLogChannels()) {
entryLogManager.getRotatedLogChannels().remove(logChannel);
}
- Assert.assertEquals("Number of Rotated Logs ", 0, entryLogManager.getRotatedLogChannels().size());
+ assertEquals(0, entryLogManager.getRotatedLogChannels().size(), "Number of Rotated Logs ");
// entrylogid is sequential
for (long i = 0; i < numOfLedgers; i++) {
- assertEquals("EntryLogid for Ledger " + i, 2 * numOfLedgers + i,
- entryLogManager.getCurrentLogForLedger(i).getLogId());
+ assertEquals(2 * numOfLedgers + i, entryLogManager.getCurrentLogForLedger(i).getLogId(),
+ "EntryLogId for Ledger " + i);
}
for (long i = 2 * numOfLedgers; i < (3 * numOfLedgers); i++) {
- assertTrue("EntryLog with logId: " + i + " should be present",
- entryLogManager.getCurrentLogIfPresent(i) != null);
+ assertNotNull(entryLogManager.getCurrentLogIfPresent(i), "EntryLog with logId: " + i + " should be present");
}
}
@@ -1076,7 +1055,6 @@ private void validateLockAcquireAndRelease(int numOfLedgers, int numOfThreadsPer
CountDownLatch latchToWait = new CountDownLatch(1);
AtomicInteger numberOfThreadsAcquiredLock = new AtomicInteger(0);
AtomicBoolean irptExceptionHappened = new AtomicBoolean(false);
- Random rand = new Random();
for (int i = 0; i < numOfLedgers * numOfThreadsPerLedger; i++) {
long ledgerId = i % numOfLedgers;
@@ -1094,7 +1072,7 @@ private void validateLockAcquireAndRelease(int numOfLedgers, int numOfThreadsPer
});
}
- assertEquals("Number Of Threads acquired Lock", 0, numberOfThreadsAcquiredLock.get());
+ assertEquals(0, numberOfThreadsAcquiredLock.get(), "Number Of Threads acquired Lock");
latchToStart.countDown();
Thread.sleep(1000);
/*
@@ -1107,13 +1085,14 @@ private void validateLockAcquireAndRelease(int numOfLedgers, int numOfThreadsPer
* After acquiring the lock there must be waiting on 'latchToWait' latch
*/
int currentNumberOfThreadsAcquiredLock = numberOfThreadsAcquiredLock.get();
- assertTrue("Number Of Threads acquired Lock " + currentNumberOfThreadsAcquiredLock,
- (currentNumberOfThreadsAcquiredLock > 0) && (currentNumberOfThreadsAcquiredLock <= numOfLedgers));
+ assertTrue((currentNumberOfThreadsAcquiredLock > 0) && (currentNumberOfThreadsAcquiredLock <= numOfLedgers),
+ "Number Of Threads acquired Lock " + currentNumberOfThreadsAcquiredLock);
+
latchToWait.countDown();
Thread.sleep(2000);
- assertEquals("Number Of Threads acquired Lock", numOfLedgers * numOfThreadsPerLedger,
- numberOfThreadsAcquiredLock.get());
- }
+
+ assertEquals(numOfLedgers * numOfThreadsPerLedger, numberOfThreadsAcquiredLock.get(),
+ "Number Of Threads acquired Lock"); }
/*
* test EntryLogManager.EntryLogManagerForEntryLogPerLedger removes the
@@ -1144,7 +1123,7 @@ public void testEntryLogManagerExpiryRemoval() throws Exception {
entryLogManager.setCurrentLogForLedgerAndAddToRotate(ledgerId, logChannel);
BufferedLogChannel currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId);
- assertEquals("LogChannel for ledger " + ledgerId + " should match", logChannel, currentLogForLedger);
+ assertEquals(logChannel, currentLogForLedger, "LogChannel for ledger " + ledgerId + " should match");
Thread.sleep(evictionPeriod * 1000 + 100);
entryLogManager.doEntryLogMapCleanup();
@@ -1154,15 +1133,14 @@ public void testEntryLogManagerExpiryRemoval() throws Exception {
* ledger should not be available anymore
*/
currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId);
- assertEquals("LogChannel for ledger " + ledgerId + " should be null", null, currentLogForLedger);
- Assert.assertEquals("Number of current active EntryLogs ", 0, entryLogManager.getCopyOfCurrentLogs().size());
- Assert.assertEquals("Number of rotated EntryLogs ", 1, entryLogManager.getRotatedLogChannels().size());
- Assert.assertTrue("CopyOfRotatedLogChannels should contain the created LogChannel",
- entryLogManager.getRotatedLogChannels().contains(logChannel));
-
- Assert.assertTrue("since mapentry must have been evicted, it should be null",
- (entryLogManager.getCacheAsMap().get(ledgerId) == null)
- || (entryLogManager.getCacheAsMap().get(ledgerId).getEntryLogWithDirInfo() == null));
+ assertNull(currentLogForLedger, "LogChannel for ledger " + ledgerId + " should be null");
+ assertEquals(0, entryLogManager.getCopyOfCurrentLogs().size(), "Number of current active EntryLogs ");
+ assertEquals(1, entryLogManager.getRotatedLogChannels().size(), "Number of rotated EntryLogs ");
+ assertTrue(entryLogManager.getRotatedLogChannels().contains(logChannel), "CopyOfRotatedLogChannels should contain the created LogChannel");
+
+ assertTrue((entryLogManager.getCacheAsMap().get(ledgerId) == null)
+ || (entryLogManager.getCacheAsMap().get(ledgerId).getEntryLogWithDirInfo() == null),
+ "since map entry must have been evicted, it should be null");
}
/*
@@ -1233,8 +1211,8 @@ public void testLongLedgerIdsWithEntryLogPerLedger() throws Exception {
long readEntryId = buf.readLong();
byte[] readData = new byte[buf.readableBytes()];
buf.readBytes(readData);
- assertEquals("LedgerId ", ledgerId, readLedgerId);
- assertEquals("EntryId ", entryId, readEntryId);
+ assertEquals(ledgerId, readLedgerId, "LedgerId ");
+ assertEquals(entryId, readEntryId, "EntryId ");
assertEquals("Entry Data ", expectedValue, new String(readData));
}
}
@@ -1347,9 +1325,9 @@ public void run() {
* eviction period time, so it should not be evicted.
*/
BufferedLogChannel currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId);
- assertEquals("LogChannel for ledger " + ledgerId, newLogChannel, currentLogForLedger);
- Assert.assertEquals("Number of current active EntryLogs ", 1, entryLogManager.getCopyOfCurrentLogs().size());
- Assert.assertEquals("Number of rotated EntryLogs ", 0, entryLogManager.getRotatedLogChannels().size());
+ assertEquals(newLogChannel, currentLogForLedger, "LogChannel for ledger " + ledgerId);
+ assertEquals(1, entryLogManager.getCopyOfCurrentLogs().size(), "Number of current active EntryLogs ");
+ assertEquals(0, entryLogManager.getRotatedLogChannels().size(), "Number of rotated EntryLogs ");
}
/**
@@ -1410,23 +1388,21 @@ public void run() {
t.start();
Thread.sleep(evictionPeriod * 1000 + 100);
entryLogManager.doEntryLogMapCleanup();
- Assert.assertFalse("Exception occurred in thread, which is not expected", exceptionOccurred.get());
+ assertFalse(exceptionOccurred.get(), "Exception occurred in thread, which is not expected");
/*
* since for more than evictionPeriod, that ledger is not accessed and cache is cleaned up, mapping for that
* ledger should not be available anymore
*/
BufferedLogChannel currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId);
- assertEquals("LogChannel for ledger " + ledgerId + " should be null", null, currentLogForLedger);
+ assertNull(currentLogForLedger, "LogChannel for ledger " + ledgerId + " should be null");
// expected number of current active entryLogs is 1 since we created entrylog for 'newLedgerId'
- Assert.assertEquals("Number of current active EntryLogs ", 1, entryLogManager.getCopyOfCurrentLogs().size());
- Assert.assertEquals("Number of rotated EntryLogs ", 1, entryLogManager.getRotatedLogChannels().size());
- Assert.assertTrue("CopyOfRotatedLogChannels should contain the created LogChannel",
- entryLogManager.getRotatedLogChannels().contains(newLogChannel));
-
- Assert.assertTrue("since mapentry must have been evicted, it should be null",
- (entryLogManager.getCacheAsMap().get(ledgerId) == null)
- || (entryLogManager.getCacheAsMap().get(ledgerId).getEntryLogWithDirInfo() == null));
+ assertEquals(1, entryLogManager.getCopyOfCurrentLogs().size(), "Number of current active EntryLogs ");
+ assertEquals(1, entryLogManager.getRotatedLogChannels().size(), "Number of rotated EntryLogs ");
+ assertTrue(entryLogManager.getRotatedLogChannels().contains(newLogChannel), "CopyOfRotatedLogChannels should contain the created LogChannel");
+
+ assertTrue((entryLogManager.getCacheAsMap().get(ledgerId) == null)
+ || (entryLogManager.getCacheAsMap().get(ledgerId).getEntryLogWithDirInfo() == null), "since mapentry must have been evicted, it should be null");
}
/*
@@ -1580,8 +1556,8 @@ public void testReadAddCallsOfMultipleEntryLogs() throws Exception {
long entryId = buf.readLong();
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
- assertEquals("LedgerId ", i, ledgerId);
- assertEquals("EntryId ", j, entryId);
+ assertEquals(i, ledgerId, "LedgerId ");
+ assertEquals(j, entryId, "EntryId ");
assertEquals("Entry Data ", expectedValue, new String(data));
}
}
@@ -1601,8 +1577,8 @@ public void testReadAddCallsOfMultipleEntryLogs() throws Exception {
long entryId = buf.readLong();
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
- assertEquals("LedgerId ", i, ledgerId);
- assertEquals("EntryId ", j, entryId);
+ assertEquals(i, ledgerId, "LedgerId ");
+ assertEquals(j, entryId, "EntryId ");
assertEquals("Entry Data ", expectedValue, new String(data));
}
}
@@ -1735,8 +1711,7 @@ public void testEntryLoggerAddEntryWhenLedgerDirsAreFull() throws Exception {
DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger)
entryLogger.getEntryLogManager();
- Assert.assertEquals("EntryLogManager class type", EntryLogManagerForEntryLogPerLedger.class,
- entryLogManager.getClass());
+ assertEquals(EntryLogManagerForEntryLogPerLedger.class, entryLogManager.getClass(), "EntryLogManager class type");
entryLogger.addEntry(0L, generateEntry(0, 1));
entryLogger.addEntry(1L, generateEntry(1, 1));
@@ -1917,8 +1892,8 @@ public void testSwappingEntryLogManager(boolean initialEntryLogPerLedgerEnabled,
long entryId = buf.readLong();
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
- assertEquals("LedgerId ", i, ledgerId);
- assertEquals("EntryId ", j, entryId);
+ assertEquals(i, ledgerId, "LedgerId ");
+ assertEquals(j, entryId, "EntryId ");
assertEquals("Entry Data ", expectedValue, new String(data));
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java
index 91612ec5c79..303dff8100b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java
@@ -38,6 +38,8 @@
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -134,6 +136,8 @@ private BookieServer replaceBookieWithCustomFreeDiskSpaceBookie(
* Test to show that weight based selection honors the disk weight of bookies.
*/
@FlakyTest("https://github.com/apache/bookkeeper/issues/503")
+ @Tag("flaky")
+ @Test
public void testDiskSpaceWeightedBookieSelection() throws Exception {
long freeDiskSpace = 1000000L;
int multiple = 3;
@@ -182,6 +186,8 @@ public void testDiskSpaceWeightedBookieSelection() throws Exception {
* when the bookies's weight changes.
*/
@FlakyTest("https://github.com/apache/bookkeeper/issues/503")
+ @Tag("flaky")
+ @Test
public void testDiskSpaceWeightedBookieSelectionWithChangingWeights() throws Exception {
long freeDiskSpace = 1000000L;
int multiple = 3;
@@ -269,6 +275,8 @@ public void testDiskSpaceWeightedBookieSelectionWithChangingWeights() throws Exc
* when bookies go away permanently.
*/
@FlakyTest("https://github.com/apache/bookkeeper/issues/503")
+ @Tag("flaky")
+ @Test
public void testDiskSpaceWeightedBookieSelectionWithBookiesDying() throws Exception {
long freeDiskSpace = 1000000L;
int multiple = 3;
@@ -347,6 +355,8 @@ public void testDiskSpaceWeightedBookieSelectionWithBookiesDying() throws Except
* when bookies are added.
*/
@FlakyTest("https://github.com/apache/bookkeeper/issues/503")
+ @Tag("flaky")
+ @Test
public void testDiskSpaceWeightedBookieSelectionWithBookiesBeingAdded() throws Exception {
long freeDiskSpace = 1000000L;
int multiple = 3;
@@ -419,6 +429,8 @@ public void testDiskSpaceWeightedBookieSelectionWithBookiesBeingAdded() throws E
* the periodic bookieInfo read is working and causes the new weights to be taken into account.
*/
@FlakyTest("https://github.com/apache/bookkeeper/issues/503")
+ @Tag("flaky")
+ @Test
public void testDiskSpaceWeightedBookieSelectionWithPeriodicBookieInfoUpdate() throws Exception {
long freeDiskSpace = 1000000L;
int multiple = 3;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java
index fbe6c921eb8..34ec08f7445 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java
@@ -37,6 +37,7 @@
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.PortManager;
+import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledForJreRange;
import org.junit.jupiter.api.condition.JRE;
@@ -128,6 +129,8 @@ will fail (even if retry it many times).
@FlakyTest(value = "https://github.com/apache/bookkeeper/issues/4142")
@SuppressWarnings("deprecation")
@EnabledForJreRange(max = JRE.JAVA_17)
+ @Tag("flaky")
+ @Test
public void testBookieServerZKSessionExpireBehaviour() throws Exception {
// 6000 is minimum due to default tick time
System.setProperty("zookeeper.request.timeout", "0");
diff --git a/pom.xml b/pom.xml
index b2fd58d706a..ca7c91edc51 100644
--- a/pom.xml
+++ b/pom.xml
@@ -872,6 +872,9 @@
org.apache.maven.plugins
maven-surefire-plugin
${maven-surefire-plugin.version}
+
+ flaky
+
org.apache.maven.plugins
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
index cbd135de894..d1170701e14 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
@@ -61,6 +61,12 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +78,9 @@
public class TestDistributedLogBase {
static final Logger LOG = LoggerFactory.getLogger(TestDistributedLogBase.class);
+ @Rule
+ public final TestName runtime = new TestName();
+
@Rule
public Timeout globalTimeout = Timeout.seconds(120);
@@ -105,7 +114,20 @@ public class TestDistributedLogBase {
protected static int zkPort;
protected static final List TMP_DIRS = new ArrayList();
+ protected String testName;
+
+ @Before
+ public void setTestNameJunit4() {
+ testName = runtime.getMethodName();
+ }
+
+ @BeforeEach
+ void setTestNameJunit5(TestInfo testInfo) {
+ testName = testInfo.getDisplayName();
+ }
+
@BeforeClass
+ @BeforeAll
public static void setupCluster() throws Exception {
setupCluster(numBookies);
}
@@ -134,6 +156,7 @@ public void uncaughtException(Thread t, Throwable e) {
}
@AfterClass
+ @AfterAll
public static void teardownCluster() throws Exception {
bkutil.teardown();
zks.stop();
@@ -143,6 +166,7 @@ public static void teardownCluster() throws Exception {
}
@Before
+ @BeforeEach
public void setup() throws Exception {
try {
zkc = LocalDLMEmulator.connectZooKeeper("127.0.0.1", zkPort);
@@ -153,6 +177,7 @@ public void setup() throws Exception {
}
@After
+ @AfterEach
public void teardown() throws Exception {
if (null != zkc) {
zkc.close();
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/admin/TestDistributedLogAdmin.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/admin/TestDistributedLogAdmin.java
index e4c9a77ce0f..fbf6801cd4d 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/admin/TestDistributedLogAdmin.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/admin/TestDistributedLogAdmin.java
@@ -44,6 +44,8 @@
import org.apache.zookeeper.ZooDefs;
import org.junit.After;
import org.junit.Before;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +73,8 @@ public void teardown() throws Exception {
}
@FlakyTest("https://issues.apache.org/jira/browse/DL-44")
+ @Tag("flaky")
+ @Test
@SuppressWarnings("deprecation")
public void testChangeSequenceNumber() throws Exception {
DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
index 02cd7feccb0..40831e628ea 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
@@ -18,11 +18,13 @@
package org.apache.distributedlog.bk;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTimeout;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import java.net.URI;
+import java.time.Duration;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Map;
@@ -53,17 +55,18 @@
import org.apache.zookeeper.Op;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* TestLedgerAllocator.
*/
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class TestLedgerAllocator extends TestDistributedLogBase {
private static final Logger logger = LoggerFactory.getLogger(TestLedgerAllocator.class);
@@ -81,9 +84,6 @@ public void onAbort(Throwable t) {
}
};
- @Rule
- public TestName runtime = new TestName();
-
private ZooKeeperClient zkc;
private BookKeeperClient bkc;
private DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
@@ -92,7 +92,7 @@ private URI createURI(String path) {
return URI.create("distributedlog://" + zkServers + path);
}
- @Before
+ @BeforeAll
public void setup() throws Exception {
zkc = TestZooKeeperClientBuilder.newBuilder()
.uri(createURI("/"))
@@ -102,7 +102,7 @@ public void setup() throws Exception {
.dlConfig(dlConf).ledgersPath(ledgersPath).zkc(zkc).build();
}
- @After
+ @AfterAll
public void teardown() throws Exception {
bkc.close();
zkc.close();
@@ -133,6 +133,8 @@ private SimpleLedgerAllocator createAllocator(String allocationPath,
}
@FlakyTest("https://issues.apache.org/jira/browse/DL-43")
+ @Tag("flaky")
+ @org.junit.jupiter.api.Test
public void testAllocation() throws Exception {
String allocationPath = "/allocation1";
SimpleLedgerAllocator allocator = createAllocator(allocationPath);
@@ -164,155 +166,167 @@ public void testAllocation() throws Exception {
Utils.close(allocator);
}
- @Test(timeout = 60000)
+ @Test
public void testBadVersionOnTwoAllocators() throws Exception {
- String allocationPath = "/allocation-bad-version";
- zkc.get().create(allocationPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- Stat stat = new Stat();
- byte[] data = zkc.get().getData(allocationPath, false, stat);
- Versioned allocationData = new Versioned(data, new LongVersion(stat.getVersion()));
-
- SimpleLedgerAllocator allocator1 =
- new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), zkc, bkc);
- SimpleLedgerAllocator allocator2 =
- new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), zkc, bkc);
- allocator1.allocate();
- // wait until allocated
- ZKTransaction txn1 = newTxn();
- LedgerHandle lh = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
- allocator2.allocate();
- ZKTransaction txn2 = newTxn();
- try {
- Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER));
- fail("Should fail allocating on second allocator as allocator1 is starting allocating something.");
- } catch (ZKException ke) {
- assertEquals(KeeperException.Code.BADVERSION, ke.getKeeperExceptionCode());
- }
- Utils.ioResult(txn1.execute());
- Utils.close(allocator1);
- Utils.close(allocator2);
-
- long eid = lh.addEntry("hello world".getBytes());
- lh.close();
- LedgerHandle readLh = bkc.get().openLedger(lh.getId(),
- BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes());
- Enumeration entries = readLh.readEntries(eid, eid);
- int i = 0;
- while (entries.hasMoreElements()) {
- LedgerEntry entry = entries.nextElement();
- assertEquals("hello world", new String(entry.getEntry(), UTF_8));
- ++i;
- }
- assertEquals(1, i);
+ assertTimeout(Duration.ofMinutes(1), () -> {
+ String allocationPath = "/allocation-bad-version";
+ zkc.get().create(allocationPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ Stat stat = new Stat();
+ byte[] data = zkc.get().getData(allocationPath, false, stat);
+ Versioned allocationData = new Versioned(data, new LongVersion(stat.getVersion()));
+
+ SimpleLedgerAllocator allocator1 =
+ new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf),
+ zkc, bkc);
+ SimpleLedgerAllocator allocator2 =
+ new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf),
+ zkc, bkc);
+ allocator1.allocate();
+ // wait until allocated
+ ZKTransaction txn1 = newTxn();
+ LedgerHandle lh = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
+ allocator2.allocate();
+ ZKTransaction txn2 = newTxn();
+ try {
+ Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER));
+ fail("Should fail allocating on second allocator as allocator1 is starting allocating something.");
+ } catch (ZKException ke) {
+ assertEquals(KeeperException.Code.BADVERSION, ke.getKeeperExceptionCode());
+ }
+ Utils.ioResult(txn1.execute());
+ Utils.close(allocator1);
+ Utils.close(allocator2);
+
+ long eid = lh.addEntry("hello world".getBytes());
+ lh.close();
+ LedgerHandle readLh = bkc.get().openLedger(lh.getId(),
+ BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes());
+ Enumeration entries = readLh.readEntries(eid, eid);
+ int i = 0;
+ while (entries.hasMoreElements()) {
+ LedgerEntry entry = entries.nextElement();
+ assertEquals("hello world", new String(entry.getEntry(), UTF_8));
+ ++i;
+ }
+ assertEquals(1, i);
+ });
}
- @Test(timeout = 60000)
+ @Test
public void testAllocatorWithoutEnoughBookies() throws Exception {
- String allocationPath = "/allocator-without-enough-bookies";
-
- DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
- confLocal.addConfiguration(conf);
- confLocal.setEnsembleSize(numBookies * 2);
- confLocal.setWriteQuorumSize(numBookies * 2);
-
- SimpleLedgerAllocator allocator1 = createAllocator(allocationPath, confLocal);
- allocator1.allocate();
- ZKTransaction txn1 = newTxn();
-
- try {
- Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
- fail("Should fail allocating ledger if there aren't enough bookies");
- } catch (AllocationException ioe) {
- // expected
- assertEquals(Phase.ERROR, ioe.getPhase());
- }
- byte[] data = zkc.get().getData(allocationPath, false, null);
- assertEquals(0, data.length);
+ assertTimeout(Duration.ofMinutes(1), () -> {
+ String allocationPath = "/allocator-without-enough-bookies";
+
+ DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+ confLocal.addConfiguration(conf);
+ confLocal.setEnsembleSize(numBookies * 2);
+ confLocal.setWriteQuorumSize(numBookies * 2);
+
+ SimpleLedgerAllocator allocator1 = createAllocator(allocationPath, confLocal);
+ allocator1.allocate();
+ ZKTransaction txn1 = newTxn();
+
+ try {
+ Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
+ fail("Should fail allocating ledger if there aren't enough bookies");
+ } catch (AllocationException ioe) {
+ // expected
+ assertEquals(Phase.ERROR, ioe.getPhase());
+ }
+ byte[] data = zkc.get().getData(allocationPath, false, null);
+ assertEquals(0, data.length);
+ });
}
- @Test(timeout = 60000)
- public void testSuccessAllocatorShouldDeleteUnusedledger() throws Exception {
- String allocationPath = "/allocation-delete-unused-ledger";
- zkc.get().create(allocationPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- Stat stat = new Stat();
- byte[] data = zkc.get().getData(allocationPath, false, stat);
-
- Versioned allocationData = new Versioned(data, new LongVersion(stat.getVersion()));
-
- SimpleLedgerAllocator allocator1 =
- new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), zkc, bkc);
- allocator1.allocate();
- // wait until allocated
- ZKTransaction txn1 = newTxn();
- LedgerHandle lh1 = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
-
- // Second allocator kicks in
- stat = new Stat();
- data = zkc.get().getData(allocationPath, false, stat);
- allocationData = new Versioned(data, new LongVersion(stat.getVersion()));
- SimpleLedgerAllocator allocator2 =
- new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), zkc, bkc);
- allocator2.allocate();
- // wait until allocated
- ZKTransaction txn2 = newTxn();
- LedgerHandle lh2 = Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER));
-
- // should fail to commit txn1 as version is changed by second allocator
- try {
- Utils.ioResult(txn1.execute());
- fail("Should fail commit obtaining ledger handle from first allocator"
- + " as allocator is modified by second allocator.");
- } catch (ZKException ke) {
- // as expected
- }
- Utils.ioResult(txn2.execute());
- Utils.close(allocator1);
- Utils.close(allocator2);
-
- // ledger handle should be deleted
- try {
- lh1.close();
- fail("LedgerHandle allocated by allocator1 should be deleted.");
- } catch (BKException bke) {
- // as expected
- }
- try {
- bkc.get().openLedger(lh1.getId(), BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes());
- fail("LedgerHandle allocated by allocator1 should be deleted.");
- } catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException nslee) {
- // as expected
- }
- long eid = lh2.addEntry("hello world".getBytes());
- lh2.close();
- LedgerHandle readLh = bkc.get().openLedger(lh2.getId(),
- BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes());
- Enumeration entries = readLh.readEntries(eid, eid);
- int i = 0;
- while (entries.hasMoreElements()) {
- LedgerEntry entry = entries.nextElement();
- assertEquals("hello world", new String(entry.getEntry(), UTF_8));
- ++i;
- }
- assertEquals(1, i);
+ @Test
+ public void testSuccessAllocatorShouldDeleteUnusedLedger() throws Exception {
+ assertTimeout(Duration.ofMinutes(1), () -> {
+ String allocationPath = "/allocation-delete-unused-ledger";
+ zkc.get().create(allocationPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ Stat stat = new Stat();
+ byte[] data = zkc.get().getData(allocationPath, false, stat);
+
+ Versioned allocationData = new Versioned(data, new LongVersion(stat.getVersion()));
+
+ SimpleLedgerAllocator allocator1 = new SimpleLedgerAllocator(allocationPath, allocationData,
+ newQuorumConfigProvider(dlConf), zkc, bkc);
+ allocator1.allocate();
+ // wait until allocated
+ ZKTransaction txn1 = newTxn();
+ LedgerHandle lh1 = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
+
+ // Second allocator kicks in
+ stat = new Stat();
+ data = zkc.get().getData(allocationPath, false, stat);
+ allocationData = new Versioned(data, new LongVersion(stat.getVersion()));
+ SimpleLedgerAllocator allocator2 = new SimpleLedgerAllocator(allocationPath, allocationData,
+ newQuorumConfigProvider(dlConf), zkc, bkc);
+ allocator2.allocate();
+ // wait until allocated
+ ZKTransaction txn2 = newTxn();
+ LedgerHandle lh2 = Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER));
+
+ // should fail to commit txn1 as version is changed by second allocator
+ try {
+ Utils.ioResult(txn1.execute());
+ fail("Should fail commit obtaining ledger handle from first allocator"
+ + " as allocator is modified by second allocator.");
+ } catch (ZKException ke) {
+ // as expected
+ }
+ Utils.ioResult(txn2.execute());
+ Utils.close(allocator1);
+ Utils.close(allocator2);
+
+ // ledger handle should be deleted
+ try {
+ lh1.close();
+ fail("LedgerHandle allocated by allocator1 should be deleted.");
+ } catch (BKException bke) {
+ // as expected
+ }
+ try {
+ bkc.get().openLedger(lh1.getId(), BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes());
+ fail("LedgerHandle allocated by allocator1 should be deleted.");
+ } catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException nslee) {
+ // as expected
+ }
+ long eid = lh2.addEntry("hello world".getBytes());
+ lh2.close();
+ LedgerHandle readLh = bkc.get().openLedger(lh2.getId(),
+ BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes());
+ Enumeration entries = readLh.readEntries(eid, eid);
+ int i = 0;
+ while (entries.hasMoreElements()) {
+ LedgerEntry entry = entries.nextElement();
+ assertEquals("hello world", new String(entry.getEntry(), UTF_8));
+ ++i;
+ }
+ assertEquals(1, i);
+ });
}
- @Test(timeout = 60000)
+ @Test
public void testCloseAllocatorDuringObtaining() throws Exception {
- String allocationPath = "/allocation2";
- SimpleLedgerAllocator allocator = createAllocator(allocationPath);
- allocator.allocate();
- ZKTransaction txn = newTxn();
- // close during obtaining ledger.
- LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
- Utils.close(allocator);
- byte[] data = zkc.get().getData(allocationPath, false, null);
- assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8)));
- // the ledger is not deleted
- bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32,
- dlConf.getBKDigestPW().getBytes(UTF_8));
+ assertTimeout(Duration.ofMinutes(1), () -> {
+ String allocationPath = "/allocation2";
+ SimpleLedgerAllocator allocator = createAllocator(allocationPath);
+ allocator.allocate();
+ ZKTransaction txn = newTxn();
+ // close during obtaining ledger.
+ LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
+ Utils.close(allocator);
+ byte[] data = zkc.get().getData(allocationPath, false, null);
+ assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8)));
+ // the ledger is not deleted
+ bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32,
+ dlConf.getBKDigestPW().getBytes(UTF_8));
+ });
}
@FlakyTest("https://issues.apache.org/jira/browse/DL-26")
+ @Tag("flaky")
+ @Test
public void testCloseAllocatorAfterConfirm() throws Exception {
String allocationPath = "/allocation2";
SimpleLedgerAllocator allocator = createAllocator(allocationPath);
@@ -329,84 +343,93 @@ public void testCloseAllocatorAfterConfirm() throws Exception {
dlConf.getBKDigestPW().getBytes(UTF_8));
}
- @Test(timeout = 60000)
+ @Test
public void testCloseAllocatorAfterAbort() throws Exception {
- String allocationPath = "/allocation3";
- SimpleLedgerAllocator allocator = createAllocator(allocationPath);
- allocator.allocate();
- ZKTransaction txn = newTxn();
- // close during obtaining ledger.
- LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
- txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null));
- try {
- Utils.ioResult(txn.execute());
- fail("Should fail the transaction when setting unexisted path");
- } catch (ZKException ke) {
- // expected
- }
- Utils.close(allocator);
- byte[] data = zkc.get().getData(allocationPath, false, null);
- assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8)));
- // the ledger is not deleted.
- bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32,
- dlConf.getBKDigestPW().getBytes(UTF_8));
+ assertTimeout(Duration.ofMinutes(1), () -> {
+ String allocationPath = "/allocation3";
+ SimpleLedgerAllocator allocator = createAllocator(allocationPath);
+ allocator.allocate();
+ ZKTransaction txn = newTxn();
+ // close during obtaining ledger.
+ LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
+ txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null));
+ try {
+ Utils.ioResult(txn.execute());
+ fail("Should fail the transaction when setting unexisted path");
+ } catch (ZKException ke) {
+ // expected
+ }
+ Utils.close(allocator);
+ byte[] data = zkc.get().getData(allocationPath, false, null);
+ assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8)));
+ // the ledger is not deleted.
+ bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32,
+ dlConf.getBKDigestPW().getBytes(UTF_8));
+ });
}
- @Test(timeout = 60000)
+ @Test
public void testConcurrentAllocation() throws Exception {
- String allocationPath = "/" + runtime.getMethodName();
- SimpleLedgerAllocator allocator = createAllocator(allocationPath);
- allocator.allocate();
- ZKTransaction txn1 = newTxn();
- CompletableFuture obtainFuture1 = allocator.tryObtain(txn1, NULL_LISTENER);
- ZKTransaction txn2 = newTxn();
- CompletableFuture obtainFuture2 = allocator.tryObtain(txn2, NULL_LISTENER);
- assertTrue(obtainFuture2.isDone());
- assertTrue(obtainFuture2.isCompletedExceptionally());
- try {
- Utils.ioResult(obtainFuture2);
- fail("Should fail the concurrent obtain since there is already a transaction obtaining the ledger handle");
- } catch (SimpleLedgerAllocator.ConcurrentObtainException cbe) {
- // expected
- }
+ assertTimeout(Duration.ofMinutes(1), () -> {
+ String allocationPath = "/" + testName;
+ SimpleLedgerAllocator allocator = createAllocator(allocationPath);
+ allocator.allocate();
+ ZKTransaction txn1 = newTxn();
+ CompletableFuture obtainFuture1 = allocator.tryObtain(txn1, NULL_LISTENER);
+ ZKTransaction txn2 = newTxn();
+ CompletableFuture obtainFuture2 = allocator.tryObtain(txn2, NULL_LISTENER);
+ assertTrue(obtainFuture2.isDone());
+ assertTrue(obtainFuture2.isCompletedExceptionally());
+ try {
+ Utils.ioResult(obtainFuture2);
+ fail("Should fail the concurrent obtain since there is "
+ + "already a transaction obtaining the ledger handle");
+ } catch (SimpleLedgerAllocator.ConcurrentObtainException cbe) {
+ // expected
+ }
+ });
}
- @Test(timeout = 60000)
+ @Test
public void testObtainMultipleLedgers() throws Exception {
- String allocationPath = "/" + runtime.getMethodName();
- SimpleLedgerAllocator allocator = createAllocator(allocationPath);
- int numLedgers = 10;
- Set allocatedLedgers = new HashSet();
- for (int i = 0; i < numLedgers; i++) {
- allocator.allocate();
- ZKTransaction txn = newTxn();
- LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
- Utils.ioResult(txn.execute());
- allocatedLedgers.add(lh);
- }
- assertEquals(numLedgers, allocatedLedgers.size());
+ assertTimeout(Duration.ofMinutes(1), () -> {
+ String allocationPath = "/" + testName;
+ SimpleLedgerAllocator allocator = createAllocator(allocationPath);
+ int numLedgers = 10;
+ Set allocatedLedgers = new HashSet();
+ for (int i = 0; i < numLedgers; i++) {
+ allocator.allocate();
+ ZKTransaction txn = newTxn();
+ LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
+ Utils.ioResult(txn.execute());
+ allocatedLedgers.add(lh);
+ }
+ assertEquals(numLedgers, allocatedLedgers.size());
+ });
}
- @Test(timeout = 60000)
+ @Test
public void testAllocationWithMetadata() throws Exception {
- String allocationPath = "/" + runtime.getMethodName();
-
- String application = "testApplicationMetadata";
- String component = "testComponentMetadata";
- String custom = "customMetadata";
- LedgerMetadata ledgerMetadata = new LedgerMetadata();
- ledgerMetadata.setApplication(application);
- ledgerMetadata.setComponent(component);
- ledgerMetadata.addCustomMetadata("custom", custom);
-
- SimpleLedgerAllocator allocator = createAllocator(allocationPath, dlConf, ledgerMetadata);
- allocator.allocate();
+ assertTimeout(Duration.ofMinutes(1), () -> {
+ String allocationPath = "/" + testName;
+
+ String application = "testApplicationMetadata";
+ String component = "testComponentMetadata";
+ String custom = "customMetadata";
+ LedgerMetadata ledgerMetadata = new LedgerMetadata();
+ ledgerMetadata.setApplication(application);
+ ledgerMetadata.setComponent(component);
+ ledgerMetadata.addCustomMetadata("custom", custom);
+
+ SimpleLedgerAllocator allocator = createAllocator(allocationPath, dlConf, ledgerMetadata);
+ allocator.allocate();
- ZKTransaction txn = newTxn();
- LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
- Map customMeta = lh.getCustomMetadata();
- assertEquals(application, new String(customMeta.get("application"), UTF_8));
- assertEquals(component, new String(customMeta.get("component"), UTF_8));
- assertEquals(custom, new String(customMeta.get("custom"), UTF_8));
+ ZKTransaction txn = newTxn();
+ LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
+ Map customMeta = lh.getCustomMetadata();
+ assertEquals(application, new String(customMeta.get("application"), UTF_8));
+ assertEquals(component, new String(customMeta.get("component"), UTF_8));
+ assertEquals(custom, new String(customMeta.get("custom"), UTF_8));
+ });
}
}
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
index 1ebab1e5457..2b11b3a5d80 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
@@ -17,16 +17,19 @@
*/
package org.apache.distributedlog.feature;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTimeout;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.time.Duration;
import org.apache.bookkeeper.common.testing.annotations.FlakyTest;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.common.config.PropertiesWriter;
-import org.junit.Test;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
/**
* Test case for dynamic configuration based feature provider.
@@ -45,38 +48,42 @@ private void ensureConfigReloaded() throws InterruptedException {
Thread.sleep(1);
}
- @Test(timeout = 60000)
+ @Test
public void testLoadFeaturesFromBase() throws Exception {
- PropertiesWriter writer = new PropertiesWriter();
- writer.setProperty("feature_1", "10000");
- writer.setProperty("feature_2", "5000");
- writer.save();
-
- DistributedLogConfiguration conf = new DistributedLogConfiguration()
- .setDynamicConfigReloadIntervalSec(Integer.MAX_VALUE)
- .setFileFeatureProviderBaseConfigPath(writer.getFile().toURI().toURL().getPath());
- DynamicConfigurationFeatureProvider provider =
- new DynamicConfigurationFeatureProvider("", conf, NullStatsLogger.INSTANCE);
- provider.start();
- ensureConfigReloaded();
-
- Feature feature1 = provider.getFeature("feature_1");
- assertTrue(feature1.isAvailable());
- assertEquals(10000, feature1.availability());
- Feature feature2 = provider.getFeature("feature_2");
- assertTrue(feature2.isAvailable());
- assertEquals(5000, feature2.availability());
- Feature feature3 = provider.getFeature("feature_3");
- assertFalse(feature3.isAvailable());
- assertEquals(0, feature3.availability());
- Feature feature4 = provider.getFeature("unknown_feature");
- assertFalse(feature4.isAvailable());
- assertEquals(0, feature4.availability());
-
- provider.stop();
+ assertTimeout(Duration.ofMinutes(1), () -> {
+ PropertiesWriter writer = new PropertiesWriter();
+ writer.setProperty("feature_1", "10000");
+ writer.setProperty("feature_2", "5000");
+ writer.save();
+
+ DistributedLogConfiguration conf = new DistributedLogConfiguration()
+ .setDynamicConfigReloadIntervalSec(Integer.MAX_VALUE)
+ .setFileFeatureProviderBaseConfigPath(writer.getFile().toURI().toURL().getPath());
+ DynamicConfigurationFeatureProvider provider =
+ new DynamicConfigurationFeatureProvider("", conf, NullStatsLogger.INSTANCE);
+ provider.start();
+ ensureConfigReloaded();
+
+ Feature feature1 = provider.getFeature("feature_1");
+ assertTrue(feature1.isAvailable());
+ assertEquals(10000, feature1.availability());
+ Feature feature2 = provider.getFeature("feature_2");
+ assertTrue(feature2.isAvailable());
+ assertEquals(5000, feature2.availability());
+ Feature feature3 = provider.getFeature("feature_3");
+ assertFalse(feature3.isAvailable());
+ assertEquals(0, feature3.availability());
+ Feature feature4 = provider.getFeature("unknown_feature");
+ assertFalse(feature4.isAvailable());
+ assertEquals(0, feature4.availability());
+
+ provider.stop();
+ });
}
@FlakyTest("https://issues.apache.org/jira/browse/DL-40")
+ @Tag("flaky")
+ @Test
public void testLoadFeaturesFromOverlay() throws Exception {
PropertiesWriter writer = new PropertiesWriter();
writer.setProperty("feature_1", "10000");
@@ -116,7 +123,7 @@ public void testLoadFeaturesFromOverlay() throws Exception {
provider.stop();
}
- @Test(timeout = 60000)
+ @Test
public void testReloadFeaturesFromOverlay() throws Exception {
PropertiesWriter writer = new PropertiesWriter();
writer.setProperty("feature_1", "10000");
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
index 630c13a5233..3839991ac14 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
@@ -49,6 +49,8 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
import org.junit.rules.TestName;
/**
@@ -91,6 +93,8 @@ private static ByteBuf getValue(int i) {
}
@FlakyTest("https://github.com/apache/bookkeeper/issues/1440")
+ @Tag("flaky")
+ @Test
public void testTableSimpleAPI() throws Exception {
// Create a namespace
NamespaceConfiguration nsConf = NamespaceConfiguration.newBuilder()