diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/annotations/FlakyTest.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/annotations/FlakyTest.java
index 27c26b123d6..e0a9db07d38 100644
--- a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/annotations/FlakyTest.java
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/annotations/FlakyTest.java
@@ -19,14 +19,21 @@
package org.apache.bookkeeper.common.testing.annotations;
import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
/**
* Intended for marking a test case as flaky.
*/
@Documented
-@Retention(RetentionPolicy.SOURCE)
+@Target({ ElementType.TYPE, ElementType.METHOD })
+@Retention(RetentionPolicy.RUNTIME)
+@Tag("flaky")
+@Test
public @interface FlakyTest {
/**
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
index b242a0d8663..818e1c7795f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
@@ -148,8 +148,8 @@ public void testStorageThresholdCompaction() throws Exception {
File ledgerDir2 = tmpDirs.createNew("ledger", "test2");
File journalDir = tmpDirs.createNew("journal", "test");
String[] ledgerDirNames = new String[]{
- ledgerDir1.getPath(),
- ledgerDir2.getPath()
+ ledgerDir1.getPath(),
+ ledgerDir2.getPath()
};
conf.setLedgerDirNames(ledgerDirNames);
conf.setJournalDirName(journalDir.getPath());
@@ -224,7 +224,7 @@ public void diskFull(File disk) {
// there are no writableLedgerDirs
for (File ledgerDir : bookie.getLedgerDirsManager().getAllLedgerDirs()) {
assertFalse("Found entry log file ([0,1,2].log. They should have been compacted" + ledgerDir,
- TestUtils.hasLogFiles(ledgerDir.getParentFile(), true, 0, 1, 2));
+ TestUtils.hasLogFiles(ledgerDir.getParentFile(), true, 0, 1, 2));
}
try {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java
index 91612ec5c79..c31edabba0e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java
@@ -179,7 +179,7 @@ public void testDiskSpaceWeightedBookieSelection() throws Exception {
/**
* Test to show that weight based selection honors the disk weight of bookies and also adapts
- * when the bookies's weight changes.
+ * when the bookies' weight changes.
*/
@FlakyTest("https://github.com/apache/bookkeeper/issues/503")
public void testDiskSpaceWeightedBookieSelectionWithChangingWeights() throws Exception {
diff --git a/pom.xml b/pom.xml
index b110d02b45c..b4ca0516d61 100644
--- a/pom.xml
+++ b/pom.xml
@@ -918,6 +918,9 @@
org.apache.maven.plugins
maven-surefire-plugin
${maven-surefire-plugin.version}
+
+ flaky
+
org.apache.maven.plugins
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
index cbd135de894..d1170701e14 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
@@ -61,6 +61,12 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +78,9 @@
public class TestDistributedLogBase {
static final Logger LOG = LoggerFactory.getLogger(TestDistributedLogBase.class);
+ @Rule
+ public final TestName runtime = new TestName();
+
@Rule
public Timeout globalTimeout = Timeout.seconds(120);
@@ -105,7 +114,20 @@ public class TestDistributedLogBase {
protected static int zkPort;
protected static final List TMP_DIRS = new ArrayList();
+ protected String testName;
+
+ @Before
+ public void setTestNameJunit4() {
+ testName = runtime.getMethodName();
+ }
+
+ @BeforeEach
+ void setTestNameJunit5(TestInfo testInfo) {
+ testName = testInfo.getDisplayName();
+ }
+
@BeforeClass
+ @BeforeAll
public static void setupCluster() throws Exception {
setupCluster(numBookies);
}
@@ -134,6 +156,7 @@ public void uncaughtException(Thread t, Throwable e) {
}
@AfterClass
+ @AfterAll
public static void teardownCluster() throws Exception {
bkutil.teardown();
zks.stop();
@@ -143,6 +166,7 @@ public static void teardownCluster() throws Exception {
}
@Before
+ @BeforeEach
public void setup() throws Exception {
try {
zkc = LocalDLMEmulator.connectZooKeeper("127.0.0.1", zkPort);
@@ -153,6 +177,7 @@ public void setup() throws Exception {
}
@After
+ @AfterEach
public void teardown() throws Exception {
if (null != zkc) {
zkc.close();
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
index 02cd7feccb0..f42ab1b85aa 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
@@ -18,11 +18,13 @@
package org.apache.distributedlog.bk;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTimeout;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import java.net.URI;
+import java.time.Duration;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Map;
@@ -53,17 +55,17 @@
import org.apache.zookeeper.Op;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* TestLedgerAllocator.
*/
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class TestLedgerAllocator extends TestDistributedLogBase {
private static final Logger logger = LoggerFactory.getLogger(TestLedgerAllocator.class);
@@ -81,9 +83,6 @@ public void onAbort(Throwable t) {
}
};
- @Rule
- public TestName runtime = new TestName();
-
private ZooKeeperClient zkc;
private BookKeeperClient bkc;
private DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
@@ -92,7 +91,7 @@ private URI createURI(String path) {
return URI.create("distributedlog://" + zkServers + path);
}
- @Before
+ @BeforeAll
public void setup() throws Exception {
zkc = TestZooKeeperClientBuilder.newBuilder()
.uri(createURI("/"))
@@ -102,7 +101,7 @@ public void setup() throws Exception {
.dlConfig(dlConf).ledgersPath(ledgersPath).zkc(zkc).build();
}
- @After
+ @AfterAll
public void teardown() throws Exception {
bkc.close();
zkc.close();
@@ -164,152 +163,162 @@ public void testAllocation() throws Exception {
Utils.close(allocator);
}
- @Test(timeout = 60000)
+ @Test
public void testBadVersionOnTwoAllocators() throws Exception {
- String allocationPath = "/allocation-bad-version";
- zkc.get().create(allocationPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- Stat stat = new Stat();
- byte[] data = zkc.get().getData(allocationPath, false, stat);
- Versioned allocationData = new Versioned(data, new LongVersion(stat.getVersion()));
-
- SimpleLedgerAllocator allocator1 =
- new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), zkc, bkc);
- SimpleLedgerAllocator allocator2 =
- new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), zkc, bkc);
- allocator1.allocate();
- // wait until allocated
- ZKTransaction txn1 = newTxn();
- LedgerHandle lh = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
- allocator2.allocate();
- ZKTransaction txn2 = newTxn();
- try {
- Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER));
- fail("Should fail allocating on second allocator as allocator1 is starting allocating something.");
- } catch (ZKException ke) {
- assertEquals(KeeperException.Code.BADVERSION, ke.getKeeperExceptionCode());
- }
- Utils.ioResult(txn1.execute());
- Utils.close(allocator1);
- Utils.close(allocator2);
-
- long eid = lh.addEntry("hello world".getBytes());
- lh.close();
- LedgerHandle readLh = bkc.get().openLedger(lh.getId(),
- BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes());
- Enumeration entries = readLh.readEntries(eid, eid);
- int i = 0;
- while (entries.hasMoreElements()) {
- LedgerEntry entry = entries.nextElement();
- assertEquals("hello world", new String(entry.getEntry(), UTF_8));
- ++i;
- }
- assertEquals(1, i);
+ assertTimeout(Duration.ofMinutes(1), () -> {
+ String allocationPath = "/allocation-bad-version";
+ zkc.get().create(allocationPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ Stat stat = new Stat();
+ byte[] data = zkc.get().getData(allocationPath, false, stat);
+ Versioned allocationData = new Versioned(data, new LongVersion(stat.getVersion()));
+
+ SimpleLedgerAllocator allocator1 =
+ new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf),
+ zkc, bkc);
+ SimpleLedgerAllocator allocator2 =
+ new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf),
+ zkc, bkc);
+ allocator1.allocate();
+ // wait until allocated
+ ZKTransaction txn1 = newTxn();
+ LedgerHandle lh = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
+ allocator2.allocate();
+ ZKTransaction txn2 = newTxn();
+ try {
+ Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER));
+ fail("Should fail allocating on second allocator as allocator1 is starting allocating something.");
+ } catch (ZKException ke) {
+ assertEquals(KeeperException.Code.BADVERSION, ke.getKeeperExceptionCode());
+ }
+ Utils.ioResult(txn1.execute());
+ Utils.close(allocator1);
+ Utils.close(allocator2);
+
+ long eid = lh.addEntry("hello world".getBytes());
+ lh.close();
+ LedgerHandle readLh = bkc.get().openLedger(lh.getId(),
+ BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes());
+ Enumeration entries = readLh.readEntries(eid, eid);
+ int i = 0;
+ while (entries.hasMoreElements()) {
+ LedgerEntry entry = entries.nextElement();
+ assertEquals("hello world", new String(entry.getEntry(), UTF_8));
+ ++i;
+ }
+ assertEquals(1, i);
+ });
}
- @Test(timeout = 60000)
+ @Test
public void testAllocatorWithoutEnoughBookies() throws Exception {
- String allocationPath = "/allocator-without-enough-bookies";
-
- DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
- confLocal.addConfiguration(conf);
- confLocal.setEnsembleSize(numBookies * 2);
- confLocal.setWriteQuorumSize(numBookies * 2);
-
- SimpleLedgerAllocator allocator1 = createAllocator(allocationPath, confLocal);
- allocator1.allocate();
- ZKTransaction txn1 = newTxn();
-
- try {
- Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
- fail("Should fail allocating ledger if there aren't enough bookies");
- } catch (AllocationException ioe) {
- // expected
- assertEquals(Phase.ERROR, ioe.getPhase());
- }
- byte[] data = zkc.get().getData(allocationPath, false, null);
- assertEquals(0, data.length);
+ assertTimeout(Duration.ofMinutes(1), () -> {
+ String allocationPath = "/allocator-without-enough-bookies";
+
+ DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+ confLocal.addConfiguration(conf);
+ confLocal.setEnsembleSize(numBookies * 2);
+ confLocal.setWriteQuorumSize(numBookies * 2);
+
+ SimpleLedgerAllocator allocator1 = createAllocator(allocationPath, confLocal);
+ allocator1.allocate();
+ ZKTransaction txn1 = newTxn();
+
+ try {
+ Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
+ fail("Should fail allocating ledger if there aren't enough bookies");
+ } catch (AllocationException ioe) {
+ // expected
+ assertEquals(Phase.ERROR, ioe.getPhase());
+ }
+ byte[] data = zkc.get().getData(allocationPath, false, null);
+ assertEquals(0, data.length);
+ });
}
- @Test(timeout = 60000)
- public void testSuccessAllocatorShouldDeleteUnusedledger() throws Exception {
- String allocationPath = "/allocation-delete-unused-ledger";
- zkc.get().create(allocationPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- Stat stat = new Stat();
- byte[] data = zkc.get().getData(allocationPath, false, stat);
-
- Versioned allocationData = new Versioned(data, new LongVersion(stat.getVersion()));
-
- SimpleLedgerAllocator allocator1 =
- new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), zkc, bkc);
- allocator1.allocate();
- // wait until allocated
- ZKTransaction txn1 = newTxn();
- LedgerHandle lh1 = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
-
- // Second allocator kicks in
- stat = new Stat();
- data = zkc.get().getData(allocationPath, false, stat);
- allocationData = new Versioned(data, new LongVersion(stat.getVersion()));
- SimpleLedgerAllocator allocator2 =
- new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), zkc, bkc);
- allocator2.allocate();
- // wait until allocated
- ZKTransaction txn2 = newTxn();
- LedgerHandle lh2 = Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER));
-
- // should fail to commit txn1 as version is changed by second allocator
- try {
- Utils.ioResult(txn1.execute());
- fail("Should fail commit obtaining ledger handle from first allocator"
- + " as allocator is modified by second allocator.");
- } catch (ZKException ke) {
- // as expected
- }
- Utils.ioResult(txn2.execute());
- Utils.close(allocator1);
- Utils.close(allocator2);
-
- // ledger handle should be deleted
- try {
- lh1.close();
- fail("LedgerHandle allocated by allocator1 should be deleted.");
- } catch (BKException bke) {
- // as expected
- }
- try {
- bkc.get().openLedger(lh1.getId(), BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes());
- fail("LedgerHandle allocated by allocator1 should be deleted.");
- } catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException nslee) {
- // as expected
- }
- long eid = lh2.addEntry("hello world".getBytes());
- lh2.close();
- LedgerHandle readLh = bkc.get().openLedger(lh2.getId(),
- BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes());
- Enumeration entries = readLh.readEntries(eid, eid);
- int i = 0;
- while (entries.hasMoreElements()) {
- LedgerEntry entry = entries.nextElement();
- assertEquals("hello world", new String(entry.getEntry(), UTF_8));
- ++i;
- }
- assertEquals(1, i);
+ @Test
+ public void testSuccessAllocatorShouldDeleteUnusedLedger() throws Exception {
+ assertTimeout(Duration.ofMinutes(1), () -> {
+ String allocationPath = "/allocation-delete-unused-ledger";
+ zkc.get().create(allocationPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ Stat stat = new Stat();
+ byte[] data = zkc.get().getData(allocationPath, false, stat);
+
+ Versioned allocationData = new Versioned(data, new LongVersion(stat.getVersion()));
+
+ SimpleLedgerAllocator allocator1 = new SimpleLedgerAllocator(allocationPath, allocationData,
+ newQuorumConfigProvider(dlConf), zkc, bkc);
+ allocator1.allocate();
+ // wait until allocated
+ ZKTransaction txn1 = newTxn();
+ LedgerHandle lh1 = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
+
+ // Second allocator kicks in
+ stat = new Stat();
+ data = zkc.get().getData(allocationPath, false, stat);
+ allocationData = new Versioned(data, new LongVersion(stat.getVersion()));
+ SimpleLedgerAllocator allocator2 = new SimpleLedgerAllocator(allocationPath, allocationData,
+ newQuorumConfigProvider(dlConf), zkc, bkc);
+ allocator2.allocate();
+ // wait until allocated
+ ZKTransaction txn2 = newTxn();
+ LedgerHandle lh2 = Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER));
+
+ // should fail to commit txn1 as version is changed by second allocator
+ try {
+ Utils.ioResult(txn1.execute());
+ fail("Should fail commit obtaining ledger handle from first allocator"
+ + " as allocator is modified by second allocator.");
+ } catch (ZKException ke) {
+ // as expected
+ }
+ Utils.ioResult(txn2.execute());
+ Utils.close(allocator1);
+ Utils.close(allocator2);
+
+ // ledger handle should be deleted
+ try {
+ lh1.close();
+ fail("LedgerHandle allocated by allocator1 should be deleted.");
+ } catch (BKException bke) {
+ // as expected
+ }
+ try {
+ bkc.get().openLedger(lh1.getId(), BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes());
+ fail("LedgerHandle allocated by allocator1 should be deleted.");
+ } catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException nslee) {
+ // as expected
+ }
+ long eid = lh2.addEntry("hello world".getBytes());
+ lh2.close();
+ LedgerHandle readLh = bkc.get().openLedger(lh2.getId(),
+ BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes());
+ Enumeration entries = readLh.readEntries(eid, eid);
+ int i = 0;
+ while (entries.hasMoreElements()) {
+ LedgerEntry entry = entries.nextElement();
+ assertEquals("hello world", new String(entry.getEntry(), UTF_8));
+ ++i;
+ }
+ assertEquals(1, i);
+ });
}
- @Test(timeout = 60000)
+ @Test
public void testCloseAllocatorDuringObtaining() throws Exception {
- String allocationPath = "/allocation2";
- SimpleLedgerAllocator allocator = createAllocator(allocationPath);
- allocator.allocate();
- ZKTransaction txn = newTxn();
- // close during obtaining ledger.
- LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
- Utils.close(allocator);
- byte[] data = zkc.get().getData(allocationPath, false, null);
- assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8)));
- // the ledger is not deleted
- bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32,
- dlConf.getBKDigestPW().getBytes(UTF_8));
+ assertTimeout(Duration.ofMinutes(1), () -> {
+ String allocationPath = "/allocation2";
+ SimpleLedgerAllocator allocator = createAllocator(allocationPath);
+ allocator.allocate();
+ ZKTransaction txn = newTxn();
+ // close during obtaining ledger.
+ LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
+ Utils.close(allocator);
+ byte[] data = zkc.get().getData(allocationPath, false, null);
+ assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8)));
+ // the ledger is not deleted
+ bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32,
+ dlConf.getBKDigestPW().getBytes(UTF_8));
+ });
}
@FlakyTest("https://issues.apache.org/jira/browse/DL-26")
@@ -329,84 +338,93 @@ public void testCloseAllocatorAfterConfirm() throws Exception {
dlConf.getBKDigestPW().getBytes(UTF_8));
}
- @Test(timeout = 60000)
+ @Test
public void testCloseAllocatorAfterAbort() throws Exception {
- String allocationPath = "/allocation3";
- SimpleLedgerAllocator allocator = createAllocator(allocationPath);
- allocator.allocate();
- ZKTransaction txn = newTxn();
- // close during obtaining ledger.
- LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
- txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null));
- try {
- Utils.ioResult(txn.execute());
- fail("Should fail the transaction when setting unexisted path");
- } catch (ZKException ke) {
- // expected
- }
- Utils.close(allocator);
- byte[] data = zkc.get().getData(allocationPath, false, null);
- assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8)));
- // the ledger is not deleted.
- bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32,
- dlConf.getBKDigestPW().getBytes(UTF_8));
+ assertTimeout(Duration.ofMinutes(1), () -> {
+ String allocationPath = "/allocation3";
+ SimpleLedgerAllocator allocator = createAllocator(allocationPath);
+ allocator.allocate();
+ ZKTransaction txn = newTxn();
+ // close during obtaining ledger.
+ LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
+ txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null));
+ try {
+ Utils.ioResult(txn.execute());
+ fail("Should fail the transaction when setting unexisted path");
+ } catch (ZKException ke) {
+ // expected
+ }
+ Utils.close(allocator);
+ byte[] data = zkc.get().getData(allocationPath, false, null);
+ assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8)));
+ // the ledger is not deleted.
+ bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32,
+ dlConf.getBKDigestPW().getBytes(UTF_8));
+ });
}
- @Test(timeout = 60000)
+ @Test
public void testConcurrentAllocation() throws Exception {
- String allocationPath = "/" + runtime.getMethodName();
- SimpleLedgerAllocator allocator = createAllocator(allocationPath);
- allocator.allocate();
- ZKTransaction txn1 = newTxn();
- CompletableFuture obtainFuture1 = allocator.tryObtain(txn1, NULL_LISTENER);
- ZKTransaction txn2 = newTxn();
- CompletableFuture obtainFuture2 = allocator.tryObtain(txn2, NULL_LISTENER);
- assertTrue(obtainFuture2.isDone());
- assertTrue(obtainFuture2.isCompletedExceptionally());
- try {
- Utils.ioResult(obtainFuture2);
- fail("Should fail the concurrent obtain since there is already a transaction obtaining the ledger handle");
- } catch (SimpleLedgerAllocator.ConcurrentObtainException cbe) {
- // expected
- }
+ assertTimeout(Duration.ofMinutes(1), () -> {
+ String allocationPath = "/" + testName;
+ SimpleLedgerAllocator allocator = createAllocator(allocationPath);
+ allocator.allocate();
+ ZKTransaction txn1 = newTxn();
+ CompletableFuture obtainFuture1 = allocator.tryObtain(txn1, NULL_LISTENER);
+ ZKTransaction txn2 = newTxn();
+ CompletableFuture obtainFuture2 = allocator.tryObtain(txn2, NULL_LISTENER);
+ assertTrue(obtainFuture2.isDone());
+ assertTrue(obtainFuture2.isCompletedExceptionally());
+ try {
+ Utils.ioResult(obtainFuture2);
+ fail("Should fail the concurrent obtain since there is "
+ + "already a transaction obtaining the ledger handle");
+ } catch (SimpleLedgerAllocator.ConcurrentObtainException cbe) {
+ // expected
+ }
+ });
}
- @Test(timeout = 60000)
+ @Test
public void testObtainMultipleLedgers() throws Exception {
- String allocationPath = "/" + runtime.getMethodName();
- SimpleLedgerAllocator allocator = createAllocator(allocationPath);
- int numLedgers = 10;
- Set allocatedLedgers = new HashSet();
- for (int i = 0; i < numLedgers; i++) {
- allocator.allocate();
- ZKTransaction txn = newTxn();
- LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
- Utils.ioResult(txn.execute());
- allocatedLedgers.add(lh);
- }
- assertEquals(numLedgers, allocatedLedgers.size());
+ assertTimeout(Duration.ofMinutes(1), () -> {
+ String allocationPath = "/" + testName;
+ SimpleLedgerAllocator allocator = createAllocator(allocationPath);
+ int numLedgers = 10;
+ Set allocatedLedgers = new HashSet();
+ for (int i = 0; i < numLedgers; i++) {
+ allocator.allocate();
+ ZKTransaction txn = newTxn();
+ LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
+ Utils.ioResult(txn.execute());
+ allocatedLedgers.add(lh);
+ }
+ assertEquals(numLedgers, allocatedLedgers.size());
+ });
}
- @Test(timeout = 60000)
+ @Test
public void testAllocationWithMetadata() throws Exception {
- String allocationPath = "/" + runtime.getMethodName();
-
- String application = "testApplicationMetadata";
- String component = "testComponentMetadata";
- String custom = "customMetadata";
- LedgerMetadata ledgerMetadata = new LedgerMetadata();
- ledgerMetadata.setApplication(application);
- ledgerMetadata.setComponent(component);
- ledgerMetadata.addCustomMetadata("custom", custom);
-
- SimpleLedgerAllocator allocator = createAllocator(allocationPath, dlConf, ledgerMetadata);
- allocator.allocate();
+ assertTimeout(Duration.ofMinutes(1), () -> {
+ String allocationPath = "/" + testName;
+
+ String application = "testApplicationMetadata";
+ String component = "testComponentMetadata";
+ String custom = "customMetadata";
+ LedgerMetadata ledgerMetadata = new LedgerMetadata();
+ ledgerMetadata.setApplication(application);
+ ledgerMetadata.setComponent(component);
+ ledgerMetadata.addCustomMetadata("custom", custom);
+
+ SimpleLedgerAllocator allocator = createAllocator(allocationPath, dlConf, ledgerMetadata);
+ allocator.allocate();
- ZKTransaction txn = newTxn();
- LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
- Map customMeta = lh.getCustomMetadata();
- assertEquals(application, new String(customMeta.get("application"), UTF_8));
- assertEquals(component, new String(customMeta.get("component"), UTF_8));
- assertEquals(custom, new String(customMeta.get("custom"), UTF_8));
+ ZKTransaction txn = newTxn();
+ LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
+ Map customMeta = lh.getCustomMetadata();
+ assertEquals(application, new String(customMeta.get("application"), UTF_8));
+ assertEquals(component, new String(customMeta.get("component"), UTF_8));
+ assertEquals(custom, new String(customMeta.get("custom"), UTF_8));
+ });
}
}
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
index 1ebab1e5457..3cd2e5f52f1 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
@@ -17,16 +17,19 @@
*/
package org.apache.distributedlog.feature;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTimeout;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.time.Duration;
import org.apache.bookkeeper.common.testing.annotations.FlakyTest;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.common.config.PropertiesWriter;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
/**
* Test case for dynamic configuration based feature provider.
@@ -45,78 +48,45 @@ private void ensureConfigReloaded() throws InterruptedException {
Thread.sleep(1);
}
- @Test(timeout = 60000)
+ @Test
public void testLoadFeaturesFromBase() throws Exception {
- PropertiesWriter writer = new PropertiesWriter();
- writer.setProperty("feature_1", "10000");
- writer.setProperty("feature_2", "5000");
- writer.save();
-
- DistributedLogConfiguration conf = new DistributedLogConfiguration()
- .setDynamicConfigReloadIntervalSec(Integer.MAX_VALUE)
- .setFileFeatureProviderBaseConfigPath(writer.getFile().toURI().toURL().getPath());
- DynamicConfigurationFeatureProvider provider =
- new DynamicConfigurationFeatureProvider("", conf, NullStatsLogger.INSTANCE);
- provider.start();
- ensureConfigReloaded();
-
- Feature feature1 = provider.getFeature("feature_1");
- assertTrue(feature1.isAvailable());
- assertEquals(10000, feature1.availability());
- Feature feature2 = provider.getFeature("feature_2");
- assertTrue(feature2.isAvailable());
- assertEquals(5000, feature2.availability());
- Feature feature3 = provider.getFeature("feature_3");
- assertFalse(feature3.isAvailable());
- assertEquals(0, feature3.availability());
- Feature feature4 = provider.getFeature("unknown_feature");
- assertFalse(feature4.isAvailable());
- assertEquals(0, feature4.availability());
-
- provider.stop();
+ assertTimeout(Duration.ofMinutes(1), () -> {
+ PropertiesWriter writer = new PropertiesWriter();
+ writer.setProperty("feature_1", "10000");
+ writer.setProperty("feature_2", "5000");
+ writer.save();
+
+ DistributedLogConfiguration conf = new DistributedLogConfiguration()
+ .setDynamicConfigReloadIntervalSec(Integer.MAX_VALUE)
+ .setFileFeatureProviderBaseConfigPath(writer.getFile().toURI().toURL().getPath());
+ DynamicConfigurationFeatureProvider provider =
+ new DynamicConfigurationFeatureProvider("", conf, NullStatsLogger.INSTANCE);
+ provider.start();
+ ensureConfigReloaded();
+
+ Feature feature1 = provider.getFeature("feature_1");
+ assertTrue(feature1.isAvailable());
+ assertEquals(10000, feature1.availability());
+ Feature feature2 = provider.getFeature("feature_2");
+ assertTrue(feature2.isAvailable());
+ assertEquals(5000, feature2.availability());
+ Feature feature3 = provider.getFeature("feature_3");
+ assertFalse(feature3.isAvailable());
+ assertEquals(0, feature3.availability());
+ Feature feature4 = provider.getFeature("unknown_feature");
+ assertFalse(feature4.isAvailable());
+ assertEquals(0, feature4.availability());
+
+ provider.stop();
+ });
}
@FlakyTest("https://issues.apache.org/jira/browse/DL-40")
public void testLoadFeaturesFromOverlay() throws Exception {
- PropertiesWriter writer = new PropertiesWriter();
- writer.setProperty("feature_1", "10000");
- writer.setProperty("feature_2", "5000");
- writer.save();
-
- PropertiesWriter overlayWriter = new PropertiesWriter();
- overlayWriter.setProperty("feature_2", "6000");
- overlayWriter.setProperty("feature_4", "6000");
- overlayWriter.save();
-
- DistributedLogConfiguration conf = new DistributedLogConfiguration()
- .setDynamicConfigReloadIntervalSec(Integer.MAX_VALUE)
- .setFileFeatureProviderBaseConfigPath(writer.getFile().toURI().toURL().getPath())
- .setFileFeatureProviderOverlayConfigPath(overlayWriter.getFile().toURI().toURL().getPath());
- DynamicConfigurationFeatureProvider provider =
- new DynamicConfigurationFeatureProvider("", conf, NullStatsLogger.INSTANCE);
- provider.start();
- ensureConfigReloaded();
-
- Feature feature1 = provider.getFeature("feature_1");
- assertTrue(feature1.isAvailable());
- assertEquals(10000, feature1.availability());
- Feature feature2 = provider.getFeature("feature_2");
- assertTrue(feature2.isAvailable());
- assertEquals(6000, feature2.availability());
- Feature feature3 = provider.getFeature("feature_3");
- assertFalse(feature3.isAvailable());
- assertEquals(0, feature3.availability());
- Feature feature4 = provider.getFeature("feature_4");
- assertTrue(feature4.isAvailable());
- assertEquals(6000, feature4.availability());
- Feature feature5 = provider.getFeature("unknown_feature");
- assertFalse(feature5.isAvailable());
- assertEquals(0, feature5.availability());
-
- provider.stop();
+ Assertions.fail("xxx");
}
- @Test(timeout = 60000)
+ @Test
public void testReloadFeaturesFromOverlay() throws Exception {
PropertiesWriter writer = new PropertiesWriter();
writer.setProperty("feature_1", "10000");
diff --git a/testtools/pom.xml b/testtools/pom.xml
index f63deecd52b..8bdb6ec47ef 100644
--- a/testtools/pom.xml
+++ b/testtools/pom.xml
@@ -35,5 +35,9 @@
org.apache.logging.log4j
log4j-slf4j2-impl
+
+ org.junit.jupiter
+ junit-jupiter-engine
+