From 5eaa25f498ada0d93cd3002403906ac324d21cdc Mon Sep 17 00:00:00 2001 From: Jia Zhai Date: Tue, 31 Oct 2017 19:43:18 -0700 Subject: [PATCH] ISSUE #666: Introduce registration client for bookkeeper client to do bookie discover Descriptions of the changes in this PR: while in BOOKKEEPER-628, It is trying to improve/generalize the bookie registration process. This PR follows the work of #663, which provide bookie side registration, and it is for client side work, it tries to introduce RegistrationClient for bookkeeper client to do bookie discover. This PR follows the work of #663, which provide bookie side registration, Author: Jia Zhai Reviewers: Sijie Guo This closes #667 from zhaijack/client_registration2, closes #666 --- .../benchmark/BenchThroughputLatency.java | 2 +- .../bookkeeper/benchmark/TestClient.java | 3 - .../apache/bookkeeper/bookie/BookieShell.java | 2 +- .../apache/bookkeeper/client/BookKeeper.java | 132 +++---- .../bookkeeper/client/BookKeeperAdmin.java | 85 ++--- .../bookkeeper/client/BookieInfoReader.java | 6 +- .../bookkeeper/client/BookieWatcher.java | 358 ++++-------------- .../client/impl/BookKeeperBuilderImpl.java | 13 +- .../bookkeeper/conf/ClientConfiguration.java | 28 ++ .../discover/RegistrationClient.java | 89 ++++- .../discover/ZKRegistrationClient.java | 346 +++++++++++++++++ .../http/BKHttpServiceProvider.java | 4 +- .../bookkeeper/http/ListBookiesService.java | 2 +- .../apache/bookkeeper/proto/BookieServer.java | 3 +- .../bookkeeper/replication/Auditor.java | 31 +- .../replication/ReplicationWorker.java | 12 +- .../server/service/BookieService.java | 4 +- .../bookkeeper/util/LocalBookKeeper.java | 5 +- .../org/apache/bookkeeper/auth/TestAuth.java | 26 +- ...rDiskSpaceWeightedLedgerPlacementTest.java | 3 +- .../bookkeeper/client/BookKeeperTest.java | 6 +- .../client/BookKeeperTestClient.java | 5 +- .../bookkeeper/client/TestBookieWatcher.java | 2 +- .../bookkeeper/client/TestReadTimeout.java | 26 +- .../bookkeeper/test/BookieFailureTest.java | 4 - 25 files changed, 688 insertions(+), 509 deletions(-) create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java index 49febbefd4b..d9de4ac0429 100644 --- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java +++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java @@ -82,7 +82,7 @@ static class Context { public BenchThroughputLatency(int ensemble, int writeQuorumSize, int ackQuorumSize, byte[] passwd, int numberOfLedgers, int sendLimit, ClientConfiguration conf) - throws KeeperException, IOException, InterruptedException { + throws BKException, IOException, InterruptedException { this.sem = new Semaphore(conf.getThrottleValue()); bk = new BookKeeper(conf); this.counter = new AtomicLong(0); diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/TestClient.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/TestClient.java index 3c11c520afe..6c1ee0501b9 100644 --- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/TestClient.java +++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/TestClient.java @@ -52,7 +52,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -192,8 +191,6 @@ public void run() { executor.shutdown(); } catch (ExecutionException ee) { LOG.error("Exception in worker", ee); - } catch (KeeperException ke) { - LOG.error("Error accessing zookeeper", ke); } catch (BKException e) { LOG.error("Error accessing bookkeeper", e); } catch (IOException ioe) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java index 481c6477cc0..2d261453e04 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java @@ -1129,7 +1129,7 @@ public int runCmd(CommandLine cmdLine) throws Exception { bookies.addAll(availableBookies); } else if (cmdLine.hasOption("ro")) { Collection roBookies = bka - .getReadOnlyBookiesAsync(); + .getReadOnlyBookies(); bookies.addAll(roBookies); } for (BookieSocketAddress b : bookies) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index 8716c3a2ffb..8e148129e33 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -40,8 +40,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback; -import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; import org.apache.bookkeeper.client.AsyncCallback.IsClosedCallback; +import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; +import org.apache.bookkeeper.client.BKException.ZKException; import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCreateAdvCallback; import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCreateCallback; @@ -51,7 +52,10 @@ import org.apache.bookkeeper.client.api.CreateBuilder; import org.apache.bookkeeper.client.api.DeleteBuilder; import org.apache.bookkeeper.client.api.OpenBuilder; +import org.apache.bookkeeper.conf.AbstractConfiguration; import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.discover.RegistrationClient; +import org.apache.bookkeeper.discover.ZKRegistrationClient; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.feature.SettableFeatureProvider; @@ -64,13 +68,11 @@ import org.apache.bookkeeper.proto.BookieClient; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.bookkeeper.util.ReflectionUtils; import org.apache.bookkeeper.util.SafeRunnable; -import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; -import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.lang.SystemUtils; import org.apache.zookeeper.KeeperException; @@ -94,7 +96,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper { static final Logger LOG = LoggerFactory.getLogger(BookKeeper.class); - final ZooKeeper zk; + final RegistrationClient regClient; final EventLoopGroup eventLoopGroup; // The stats logger for this client. @@ -115,9 +117,6 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper { // whether the event loop group is one we created, or is owned by whoever // instantiated us boolean ownEventLoopGroup = false; - // whether the zk handle is one we created, or is owned by whoever - // instantiated us - boolean ownZKHandle = false; final BookieClient bookieClient; final BookieWatcher bookieWatcher; @@ -287,7 +286,7 @@ public Builder featureProvider(FeatureProvider featureProvider) { return this; } - public BookKeeper build() throws IOException, InterruptedException, KeeperException { + public BookKeeper build() throws IOException, InterruptedException, BKException { Preconditions.checkNotNull(statsLogger, "No stats logger provided"); return new BookKeeper(conf, zk, eventLoopGroup, statsLogger, dnsResolver, requestTimer, featureProvider); } @@ -305,13 +304,13 @@ public static Builder forConfig(final ClientConfiguration conf) { * A list of one of more servers on which zookeeper is running. The * client assumes that the running bookies have been registered with * zookeeper under the path - * {@link BookieWatcher#bookieRegistrationPath} + * {@link AbstractConfiguration#getZkAvailableBookiesPath()} * @throws IOException * @throws InterruptedException * @throws KeeperException */ public BookKeeper(String servers) throws IOException, InterruptedException, - KeeperException { + BKException { this(new ClientConfiguration().setZkServers(servers)); } @@ -327,13 +326,17 @@ public BookKeeper(String servers) throws IOException, InterruptedException, * @throws KeeperException */ public BookKeeper(final ClientConfiguration conf) - throws IOException, InterruptedException, KeeperException { + throws IOException, InterruptedException, BKException { this(conf, null, null, NullStatsLogger.INSTANCE, null, null, null); } - private static ZooKeeper validateZooKeeper(ZooKeeper zk) throws NullPointerException { + private static ZooKeeper validateZooKeeper(ZooKeeper zk) throws NullPointerException, IOException { Preconditions.checkNotNull(zk, "No zookeeper instance provided"); + if (!zk.getState().isConnected()) { + LOG.error("Unconnected zookeeper handle passed to bookkeeper"); + throw new IOException(KeeperException.create(KeeperException.Code.CONNECTIONLOSS)); + } return zk; } @@ -358,8 +361,7 @@ private static EventLoopGroup validateEventLoopGroup(EventLoopGroup eventLoopGro * @throws KeeperException */ public BookKeeper(ClientConfiguration conf, ZooKeeper zk) - throws IOException, InterruptedException, KeeperException { - + throws IOException, InterruptedException, BKException { this(conf, validateZooKeeper(zk), null, NullStatsLogger.INSTANCE, null, null, null); } @@ -381,7 +383,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk) * @throws KeeperException if the passed zk handle is not connected */ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLoopGroup) - throws IOException, InterruptedException, KeeperException { + throws IOException, InterruptedException, BKException { this(conf, validateZooKeeper(zk), validateEventLoopGroup(eventLoopGroup), NullStatsLogger.INSTANCE, null, null, null); } @@ -396,28 +398,49 @@ private BookKeeper(ClientConfiguration conf, DNSToSwitchMapping dnsResolver, HashedWheelTimer requestTimer, FeatureProvider featureProvider) - throws IOException, InterruptedException, KeeperException { + throws IOException, InterruptedException, BKException { this.conf = conf; this.delayEnsembleChange = conf.getDelayEnsembleChange(); this.reorderReadSequence = conf.isReorderReadSequenceEnabled(); - // initialize zookeeper client - if (zkc == null) { - this.zk = ZooKeeperClient.newBuilder() - .connectString(conf.getZkServers()) - .sessionTimeoutMs(conf.getZkTimeout()) - .operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkTimeout(), - conf.getZkTimeout(), 0)) - .statsLogger(statsLogger) - .build(); - this.ownZKHandle = true; + // initialize resources + ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setNameFormat( + "BookKeeperClientScheduler-%d"); + this.scheduler = Executors + .newSingleThreadScheduledExecutor(tfb.build()); + this.mainWorkerPool = OrderedSafeExecutor.newBuilder() + .name("BookKeeperClientWorker") + .numThreads(conf.getNumWorkerThreads()) + .statsLogger(statsLogger) + .traceTaskExecution(conf.getEnableTaskExecutionStats()) + .traceTaskWarnTimeMicroSec(conf.getTaskExecutionWarnTimeMicros()) + .build(); + + // initialize stats logger + this.statsLogger = statsLogger.scope(BookKeeperClientStats.CLIENT_SCOPE); + initOpLoggers(this.statsLogger); + + // initialize feature provider + if (null == featureProvider) { + this.featureProvider = SettableFeatureProvider.DISABLE_ALL; } else { - if (!zkc.getState().isConnected()) { - LOG.error("Unconnected zookeeper handle passed to bookkeeper"); - throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); - } - this.zk = zkc; - this.ownZKHandle = false; + this.featureProvider = featureProvider; + } + this.disableEnsembleChangeFeature = + this.featureProvider.getFeature(conf.getDisableEnsembleChangeFeatureName()); + + // initialize registration client + try { + Class regClientCls = conf.getRegistrationClientClass(); + this.regClient = ReflectionUtils.newInstance(regClientCls); + this.regClient.initialize( + conf, + scheduler, + statsLogger, + java.util.Optional.ofNullable(zkc)); + } catch (ConfigurationException ce) { + LOG.error("Failed to initialize registration client", ce); + throw new IOException("Failed to initialize registration client", ce); } // initialize event loop group @@ -440,25 +463,6 @@ private BookKeeper(ClientConfiguration conf, this.ownTimer = false; } - if (null == featureProvider) { - this.featureProvider = SettableFeatureProvider.DISABLE_ALL; - } else { - this.featureProvider = featureProvider; - } - - // get features - this.disableEnsembleChangeFeature = this.featureProvider.getFeature(conf.getDisableEnsembleChangeFeatureName()); - - // initialize scheduler - ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setNameFormat( - "BookKeeperClientScheduler-%d"); - this.scheduler = Executors - .newSingleThreadScheduledExecutor(tfb.build()); - - // initialize stats logger - this.statsLogger = statsLogger.scope(BookKeeperClientStats.CLIENT_SCOPE); - initOpLoggers(this.statsLogger); - // initialize the ensemble placement this.placementPolicy = initializeEnsemblePlacementPolicy(conf, dnsResolver, this.requestTimer, this.featureProvider, this.statsLogger); @@ -482,18 +486,11 @@ private BookKeeper(ClientConfiguration conf, } else { this.readLACSpeculativeRequestPolicy = Optional.absent(); } - // initialize main worker pool - this.mainWorkerPool = OrderedSafeExecutor.newBuilder() - .name("BookKeeperClientWorker") - .numThreads(conf.getNumWorkerThreads()) - .statsLogger(statsLogger) - .traceTaskExecution(conf.getEnableTaskExecutionStats()) - .traceTaskWarnTimeMicroSec(conf.getTaskExecutionWarnTimeMicros()) - .build(); + // initialize bookie client this.bookieClient = new BookieClient(conf, this.eventLoopGroup, this.mainWorkerPool, statsLogger); - this.bookieWatcher = new BookieWatcher(conf, this.scheduler, this.placementPolicy, this); + this.bookieWatcher = new BookieWatcher(conf, this.placementPolicy, regClient); if (conf.getDiskWeightBasedPlacementEnabled()) { LOG.info("Weighted ledger placement enabled"); ThreadFactoryBuilder tFBuilder = new ThreadFactoryBuilder() @@ -510,7 +507,12 @@ private BookKeeper(ClientConfiguration conf, } // initialize ledger manager - this.ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk); + try { + this.ledgerManagerFactory = + LedgerManagerFactory.newLedgerManagerFactory(conf, ((ZKRegistrationClient) regClient).getZk()); + } catch (KeeperException ke) { + throw new ZKException(); + } this.ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager()); this.ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator(); this.explicitLacInterval = conf.getExplictLacInterval(); @@ -638,7 +640,7 @@ public static DigestType fromApiDigestType(org.apache.bookkeeper.client.api.Dige } ZooKeeper getZkHandle() { - return zk; + return ((ZKRegistrationClient) regClient).getZk(); } protected ClientConfiguration getConf() { @@ -1371,9 +1373,7 @@ public void close() throws BKException, InterruptedException { if (ownEventLoopGroup) { eventLoopGroup.shutdownGracefully(); } - if (ownZKHandle) { - zk.close(); - } + this.regClient.close(); } private final void initOpLoggers(StatsLogger stats) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java index d2e5db3f376..aae36b872bd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java @@ -22,6 +22,7 @@ import static com.google.common.base.Charsets.UTF_8; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -32,17 +33,16 @@ import java.util.Collection; import java.util.Enumeration; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.NoSuchElementException; import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.UUID; -import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -54,8 +54,9 @@ import org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadCallback; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator; +import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener; import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator; import org.apache.bookkeeper.meta.LedgerManagerFactory; import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager; @@ -68,15 +69,15 @@ import org.apache.bookkeeper.replication.ReplicationException.BKAuditException; import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException; import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.BookKeeperConstants; import org.apache.bookkeeper.util.IOUtils; +import org.apache.bookkeeper.util.ZkUtils; import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.util.ZkUtils; import org.apache.zookeeper.ZKUtil; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; @@ -91,13 +92,6 @@ public class BookKeeperAdmin implements AutoCloseable { private final static Logger LOG = LoggerFactory.getLogger(BookKeeperAdmin.class); private final static Logger VERBOSE = LoggerFactory.getLogger("verbose"); - // ZK client instance - private ZooKeeper zk; - private final boolean ownsZK; - - // ZK ledgers related String constants - private final String bookiesPath; - // BookKeeper client instance private BookKeeper bkc; private final boolean ownsBK; @@ -138,7 +132,7 @@ public class BookKeeperAdmin implements AutoCloseable { * Throws this exception if there is an error instantiating the * BookKeeper client. */ - public BookKeeperAdmin(String zkServers) throws IOException, InterruptedException, KeeperException { + public BookKeeperAdmin(String zkServers) throws IOException, InterruptedException, BKException { this(new ClientConfiguration().setZkServers(zkServers)); } @@ -160,18 +154,9 @@ public BookKeeperAdmin(String zkServers) throws IOException, InterruptedExceptio * Throws this exception if there is an error instantiating the * BookKeeper client. */ - public BookKeeperAdmin(ClientConfiguration conf) throws IOException, InterruptedException, KeeperException { - // Create the ZooKeeper client instance - zk = ZooKeeperClient.newBuilder() - .connectString(conf.getZkServers()) - .sessionTimeoutMs(conf.getZkTimeout()) - .build(); - ownsZK = true; - - // Create the bookie path - bookiesPath = conf.getZkAvailableBookiesPath(); + public BookKeeperAdmin(ClientConfiguration conf) throws IOException, InterruptedException, BKException { // Create the BookKeeper client instance - bkc = new BookKeeper(conf, zk); + bkc = new BookKeeper(conf); ownsBK = true; this.lfr = new LedgerFragmentReplicator(bkc, NullStatsLogger.INSTANCE); this.mFactory = bkc.ledgerManagerFactory; @@ -189,9 +174,6 @@ public BookKeeperAdmin(ClientConfiguration conf) throws IOException, Interrupted public BookKeeperAdmin(final BookKeeper bkc, StatsLogger statsLogger) { this.bkc = bkc; ownsBK = false; - this.zk = bkc.zk; - ownsZK = false; - this.bookiesPath = bkc.getConf().getZkAvailableBookiesPath(); this.lfr = new LedgerFragmentReplicator(bkc, statsLogger); this.mFactory = bkc.ledgerManagerFactory; } @@ -212,18 +194,6 @@ public void close() throws InterruptedException, BKException { if (ownsBK) { bkc.close(); } - if (ownsZK) { - zk.close(); - } - } - - /** - * Get {@link org.apache.zookeeper.ZooKeeper} used by bookkeeper admin client. - * - * @return zookeeper client used by bookkeeper admin client - */ - public ZooKeeper getZooKeeper() { - return zk; } /** @@ -242,17 +212,8 @@ public Collection getAvailableBookies() * @return a collection of bookie addresses * @throws BKException if there are issues trying to read the list. */ - public Collection getReadOnlyBookiesSync() throws BKException { - return bkc.bookieWatcher.getReadOnlyBookiesSync(); - } - - /** - * Get a list of readonly bookies asynchronously (may be slightly out of date). - * - * @return a collection of bookie addresses - */ - public Collection getReadOnlyBookiesAsync() { - return bkc.bookieWatcher.getReadOnlyBookiesAsync(); + public Collection getReadOnlyBookies() throws BKException { + return bkc.bookieWatcher.getReadOnlyBookies(); } /** @@ -262,9 +223,9 @@ public Collection getReadOnlyBookiesAsync() { * * @param listener the listener to notify */ - public void notifyBookiesChanged(final BookiesListener listener) + public void watchWritableBookiesChanged(final RegistrationListener listener) throws BKException { - bkc.bookieWatcher.notifyBookiesChanged(listener); + bkc.regClient.watchWritableBookies(listener); } /** @@ -274,9 +235,9 @@ public void notifyBookiesChanged(final BookiesListener listener) * * @param listener the listener to notify */ - public void notifyReadOnlyBookiesChanged(final BookiesListener listener) + public void watchReadOnlyBookiesChanged(final RegistrationListener listener) throws BKException { - bkc.bookieWatcher.notifyReadOnlyBookiesChanged(listener); + bkc.regClient.watchReadOnlyBookies(listener); } /** @@ -1066,6 +1027,15 @@ public static boolean format(ClientConfiguration conf, zkAcls, CreateMode.PERSISTENT); } + // create readonly bookies node if not exists + if (null == zkc.exists(conf.getZkAvailableBookiesPath() + "/" + READONLY, false)) { + zkc.create( + conf.getZkAvailableBookiesPath() + "/" + READONLY, + new byte[0], + zkAcls, + CreateMode.PERSISTENT); + } + // If old data was there then confirm with admin. if (ledgerRootExists) { boolean confirm = false; @@ -1274,7 +1244,8 @@ public void triggerAudit() throw new UnavailableException("Autorecovery is disabled. So giving up!"); } - BookieSocketAddress auditorId = AuditorElector.getCurrentAuditor(new ServerConfiguration(bkc.conf), zk); + BookieSocketAddress auditorId = + AuditorElector.getCurrentAuditor(new ServerConfiguration(bkc.conf), bkc.getZkHandle()); if (auditorId == null) { LOG.error("No auditor elected, though Autorecovery is enabled. So giving up."); throw new UnavailableException("No auditor elected, though Autorecovery is enabled. So giving up."); @@ -1308,7 +1279,7 @@ public void triggerAudit() public void decommissionBookie(BookieSocketAddress bookieAddress) throws CompatibilityException, UnavailableException, KeeperException, InterruptedException, IOException, BKAuditException, TimeoutException, BKException { - if (getAvailableBookies().contains(bookieAddress) || getReadOnlyBookiesAsync().contains(bookieAddress)) { + if (getAvailableBookies().contains(bookieAddress) || getReadOnlyBookies().contains(bookieAddress)) { LOG.error("Bookie: {} is not shutdown yet", bookieAddress); throw BKException.create(BKException.Code.IllegalOpException); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java index 70fe2bd9923..2ea5bef6895 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java @@ -29,14 +29,13 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.collections4.CollectionUtils; import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieClient; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback; import org.apache.bookkeeper.proto.BookkeeperProtocol; +import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -234,6 +233,7 @@ public boolean completeUnlessQueued() { } public void start() { + this.bk.regClient.watchWritableBookies(bookies -> availableBookiesChanged(bookies.getValue())); scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { @@ -402,7 +402,7 @@ Map getBookieInfo() throws BKException, Interru Collection bookies; bookies = bk.bookieWatcher.getBookies(); - bookies.addAll(bk.bookieWatcher.getReadOnlyBookiesAsync()); + bookies.addAll(bk.bookieWatcher.getReadOnlyBookies()); totalSent.set(bookies.size()); for (BookieSocketAddress b : bookies) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java index b11271f23c9..2bba1d602d4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java @@ -17,43 +17,26 @@ */ package org.apache.bookkeeper.client; -import static org.apache.bookkeeper.util.SafeRunnable.safeRun; - -import java.io.IOException; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import java.util.ArrayList; -import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.BKException.BKInterruptedException; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; +import org.apache.bookkeeper.client.BKException.MetaStoreException; +import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.discover.RegistrationClient; import org.apache.bookkeeper.net.BookieSocketAddress; -import org.apache.bookkeeper.util.BookKeeperConstants; -import org.apache.bookkeeper.util.SafeRunnable; -import org.apache.zookeeper.AsyncCallback.ChildrenCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.KeeperException.NodeExistsException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; - -import java.util.Map; -import org.apache.bookkeeper.util.ZkUtils; -import org.apache.zookeeper.data.ACL; /** * This class is responsible for maintaining a consistent view of what bookies @@ -62,157 +45,73 @@ * replacement * */ -class BookieWatcher implements Watcher, ChildrenCallback { - static final Logger logger = LoggerFactory.getLogger(BookieWatcher.class); - - public static int ZK_CONNECT_BACKOFF_SEC = 1; - private static final Set EMPTY_SET = new HashSet(); - - // Bookie registration path in ZK - private final String bookieRegistrationPath; +@Slf4j +class BookieWatcher { + + private static final Function EXCEPTION_FUNC = cause -> { + if (cause instanceof BKException) { + log.error("Failed to get bookie list : ", cause); + return (BKException) cause; + } else if (cause instanceof InterruptedException) { + log.error("Interrupted reading bookie list : ", cause); + return new BKInterruptedException(); + } else { + return new MetaStoreException(); + } + }; - final BookKeeper bk; - final ScheduledExecutorService scheduler; - final EnsemblePlacementPolicy placementPolicy; + private final ClientConfiguration conf; + private final RegistrationClient registrationClient; + private final EnsemblePlacementPolicy placementPolicy; // Bookies that will not be preferred to be chosen in a new ensemble final Cache quarantinedBookies; - SafeRunnable reReadTask = new SafeRunnable() { - @Override - public void safeRun() { - readBookies(); - } - }; - private ReadOnlyBookieWatcher readOnlyBookieWatcher; + private volatile Set writableBookies = Collections.emptySet(); + private volatile Set readOnlyBookies = Collections.emptySet(); public BookieWatcher(ClientConfiguration conf, - ScheduledExecutorService scheduler, EnsemblePlacementPolicy placementPolicy, - BookKeeper bk) throws KeeperException, InterruptedException { - this.bk = bk; - // ZK bookie registration path - this.bookieRegistrationPath = conf.getZkAvailableBookiesPath(); - this.scheduler = scheduler; + RegistrationClient registrationClient) { + this.conf = conf; this.placementPolicy = placementPolicy; - readOnlyBookieWatcher = new ReadOnlyBookieWatcher(conf, bk); + this.registrationClient = registrationClient; this.quarantinedBookies = CacheBuilder.newBuilder() .expireAfterWrite(conf.getBookieQuarantineTimeSeconds(), TimeUnit.SECONDS) .removalListener(new RemovalListener() { @Override public void onRemoval(RemovalNotification bookie) { - logger.info("Bookie {} is no longer quarantined", bookie.getKey()); + log.info("Bookie {} is no longer quarantined", bookie.getKey()); } }).build(); } - void notifyBookiesChanged(final BookiesListener listener) throws BKException { + public Set getBookies() throws BKException { try { - bk.getZkHandle().getChildren(this.bookieRegistrationPath, - new Watcher() { - public void process(WatchedEvent event) { - // listen children changed event from ZooKeeper - if (event.getType() == EventType.NodeChildrenChanged) { - listener.availableBookiesChanged(); - } - } - }); - } catch (KeeperException ke) { - logger.error("Error registering watcher with zookeeper", ke); - throw new BKException.ZKException(); - } catch (InterruptedException ie) { + return FutureUtils.result(registrationClient.getWritableBookies(), EXCEPTION_FUNC).getValue(); + } catch (BKInterruptedException ie) { Thread.currentThread().interrupt(); - logger.error("Interrupted registering watcher with zookeeper", ie); - throw new BKException.BKInterruptedException(); + throw ie; } } - void notifyReadOnlyBookiesChanged(final BookiesListener listener) throws BKException { - readOnlyBookieWatcher.notifyBookiesChanged(listener); - } - - public Collection getReadOnlyBookiesSync() throws BKException { - try { - String znode = this.bookieRegistrationPath + "/" + BookKeeperConstants.READONLY; - List children = bk.getZkHandle().getChildren(znode, false); - return convertToBookieAddresses(children); - } catch (KeeperException ke) { - logger.error("Failed to get read only bookie list : ", ke); - throw new BKException.ZKException(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - logger.error("Interrupted reading read only bookie list", ie); - throw new BKException.BKInterruptedException(); - } - } - - public Collection getBookies() throws BKException { + public Set getReadOnlyBookies() throws BKException { try { - List children = bk.getZkHandle().getChildren(this.bookieRegistrationPath, false); - children.remove(BookKeeperConstants.READONLY); - return convertToBookieAddresses(children); - } catch (KeeperException ke) { - logger.error("Failed to get bookie list : ", ke); - throw new BKException.ZKException(); - } catch (InterruptedException ie) { + return FutureUtils.result(registrationClient.getReadOnlyBookies(), EXCEPTION_FUNC).getValue(); + } catch (BKInterruptedException ie) { Thread.currentThread().interrupt(); - logger.error("Interrupted reading bookie list", ie); - throw new BKException.BKInterruptedException(); + throw ie; } } - Collection getReadOnlyBookiesAsync() { - return new HashSet(readOnlyBookieWatcher.getReadOnlyBookies()); - } - - public void readBookies() { - readBookies(this); - } - - public void readBookies(ChildrenCallback callback) { - bk.getZkHandle().getChildren(this.bookieRegistrationPath, this, callback, null); - } - - @Override - public void process(WatchedEvent event) { - readBookies(); - } - - @Override - public void processResult(int rc, String path, Object ctx, List children) { - - if (rc != KeeperException.Code.OK.intValue()) { - //logger.error("Error while reading bookies", KeeperException.create(Code.get(rc), path)); - // try the read after a second again - try { - scheduler.schedule(reReadTask, ZK_CONNECT_BACKOFF_SEC, TimeUnit.SECONDS); - } catch (RejectedExecutionException ree) { - logger.warn("Failed to schedule reading bookies task : ", ree); - } - return; - } - - // Just exclude the 'readonly' znode to exclude r-o bookies from - // available nodes list. - children.remove(BookKeeperConstants.READONLY); - - HashSet newBookieAddrs = convertToBookieAddresses(children); - + // this callback is already not executed in zookeeper thread + private synchronized void processWritableBookiesChanged(Set newBookieAddrs) { // Update watcher outside ZK callback thread, to avoid deadlock in case some other // component is trying to do a blocking ZK operation - bk.getMainWorkerPool().submitOrdered(path, safeRun(() -> { - synchronized (BookieWatcher.this) { - Set readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies(); - placementPolicy.onClusterChanged(newBookieAddrs, readonlyBookies); - if (bk.conf.getDiskWeightBasedPlacementEnabled()) { - // start collecting bookieInfo for the newly joined bookies, if any - bk.bookieInfoReader.availableBookiesChanged(newBookieAddrs); - } - } - })); - + this.writableBookies = newBookieAddrs; + placementPolicy.onClusterChanged(newBookieAddrs, readOnlyBookies); // we don't need to close clients here, because: // a. the dead bookies will be removed from topology, which will not be used in new ensemble. // b. the read sequence will be reordered based on znode availability, so most of the reads @@ -229,45 +128,27 @@ public void processResult(int rc, String path, Object ctx, List children // } } - private static HashSet convertToBookieAddresses(List children) { - // Read the bookie addresses into a set for efficient lookup - HashSet newBookieAddrs = new HashSet(); - for (String bookieAddrString : children) { - BookieSocketAddress bookieAddr; - try { - bookieAddr = new BookieSocketAddress(bookieAddrString); - } catch (IOException e) { - logger.error("Could not parse bookie address: " + bookieAddrString + ", ignoring this bookie"); - continue; - } - newBookieAddrs.add(bookieAddr); - } - return newBookieAddrs; + private synchronized void processReadOnlyBookiesChanged(Set readOnlyBookies) { + this.readOnlyBookies = readOnlyBookies; + placementPolicy.onClusterChanged(writableBookies, readOnlyBookies); } /** * Blocks until bookies are read from zookeeper, used in the {@link BookKeeper} constructor. - * @throws InterruptedException - * @throws KeeperException + * + * @throws BKException when failed to read bookies */ - public void readBookiesBlocking() throws InterruptedException, KeeperException { - // Read readonly bookies first - readOnlyBookieWatcher.readROBookiesBlocking(); - - final LinkedBlockingQueue queue = new LinkedBlockingQueue(); - readBookies(new ChildrenCallback() { - public void processResult(int rc, String path, Object ctx, List children) { - bk.getMainWorkerPool().submitOrdered(path, safeRun(() -> { - BookieWatcher.this.processResult(rc, path, ctx, children); - queue.add(rc); - })); - } - }); - int rc = queue.take(); + public void readBookiesBlocking() throws BKException { + this.registrationClient.watchReadOnlyBookies(bookies -> processReadOnlyBookiesChanged(bookies.getValue())); + this.registrationClient.watchWritableBookies(bookies -> processWritableBookiesChanged(bookies.getValue())); - if (rc != KeeperException.Code.OK.intValue()) { - throw KeeperException.create(Code.get(rc)); + try { + readOnlyBookies = getReadOnlyBookies(); + } catch (Exception e) { + log.error("Failed getReadOnlyBookies: ", e); } + + writableBookies = getBookies(); } /** @@ -289,10 +170,11 @@ public ArrayList newEnsemble(int ensembleSize, int writeQuo writeQuorumSize, ackQuorumSize, customMetadata, new HashSet( quarantinedBookies.asMap().keySet())); } catch (BKNotEnoughBookiesException e) { - if (logger.isDebugEnabled()) { - logger.debug("Not enough healthy bookies available, using quarantined bookies"); + if (log.isDebugEnabled()) { + log.debug("Not enough healthy bookies available, using quarantined bookies"); } - return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, EMPTY_SET); + return placementPolicy.newEnsemble( + ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, Collections.emptySet()); } } @@ -318,8 +200,8 @@ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, existingAndQuarantinedBookies, addr, excludeBookies); } catch (BKNotEnoughBookiesException e) { - if (logger.isDebugEnabled()) { - logger.debug("Not enough healthy bookies available, using quarantined bookies"); + if (log.isDebugEnabled()) { + log.debug("Not enough healthy bookies available, using quarantined bookies"); } return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, new HashSet(existingBookies), addr, excludeBookies); @@ -334,112 +216,8 @@ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, public void quarantineBookie(BookieSocketAddress bookie) { if (quarantinedBookies.getIfPresent(bookie) == null) { quarantinedBookies.put(bookie, Boolean.TRUE); - logger.warn("Bookie {} has been quarantined because of read/write errors.", bookie); + log.warn("Bookie {} has been quarantined because of read/write errors.", bookie); } } - /** - * Watcher implementation to watch the readonly bookies under - * <available>/readonly - */ - private static class ReadOnlyBookieWatcher implements Watcher, ChildrenCallback { - - private final static Logger LOG = LoggerFactory.getLogger(ReadOnlyBookieWatcher.class); - private volatile HashSet readOnlyBookies = new HashSet(); - private BookKeeper bk; - private String readOnlyBookieRegPath; - - public ReadOnlyBookieWatcher(ClientConfiguration conf, BookKeeper bk) throws KeeperException, - InterruptedException { - this.bk = bk; - readOnlyBookieRegPath = conf.getZkAvailableBookiesPath() + "/" - + BookKeeperConstants.READONLY; - if (null == bk.getZkHandle().exists(readOnlyBookieRegPath, false)) { - try { - List zkAcls = ZkUtils.getACLs(conf); - bk.getZkHandle().create(readOnlyBookieRegPath, new byte[0], zkAcls, - CreateMode.PERSISTENT); - } catch (NodeExistsException e) { - // this node is just now created by someone. - } - } - } - - @Override - public void process(WatchedEvent event) { - readROBookies(); - } - - // read the readonly bookies in blocking fashion. Used only for first - // time. - void readROBookiesBlocking() throws InterruptedException, KeeperException { - - final LinkedBlockingQueue queue = new LinkedBlockingQueue(); - readROBookies(new ChildrenCallback() { - public void processResult(int rc, String path, Object ctx, List children) { - try { - ReadOnlyBookieWatcher.this.processResult(rc, path, ctx, children); - queue.put(rc); - } catch (InterruptedException e) { - logger.error("Interruped when trying to read readonly bookies in a blocking fashion"); - throw new RuntimeException(e); - } - } - }); - int rc = queue.take(); - - if (rc != KeeperException.Code.OK.intValue()) { - throw KeeperException.create(Code.get(rc)); - } - } - - void notifyBookiesChanged(final BookiesListener listener) throws BKException { - try { - List children = bk.getZkHandle().getChildren(this.readOnlyBookieRegPath, new Watcher() { - public void process(WatchedEvent event) { - // listen children changed event from ZooKeeper - if (event.getType() == EventType.NodeChildrenChanged) { - listener.availableBookiesChanged(); - } - } - }); - - // Update the list of read-only bookies - HashSet newReadOnlyBookies = convertToBookieAddresses(children); - readOnlyBookies = newReadOnlyBookies; - } catch (KeeperException ke) { - logger.error("Error registering watcher with zookeeper", ke); - throw new BKException.ZKException(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - logger.error("Interrupted registering watcher with zookeeper", ie); - throw new BKException.BKInterruptedException(); - } - } - - // Read children and register watcher for readonly bookies path - void readROBookies(ChildrenCallback callback) { - bk.getZkHandle().getChildren(this.readOnlyBookieRegPath, this, callback, null); - } - - void readROBookies() { - readROBookies(this); - } - - @Override - public void processResult(int rc, String path, Object ctx, List children) { - if (rc != Code.OK.intValue()) { - LOG.error("Not able to read readonly bookies : ", KeeperException.create(Code.get(rc))); - return; - } - - HashSet newReadOnlyBookies = convertToBookieAddresses(children); - readOnlyBookies = newReadOnlyBookies; - } - - // returns the readonly bookies - public HashSet getReadOnlyBookies() { - return readOnlyBookies; - } - } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperBuilderImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperBuilderImpl.java index d9223a4318c..7133c08ec89 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperBuilderImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperBuilderImpl.java @@ -20,19 +20,16 @@ */ package org.apache.bookkeeper.client.impl; -import com.google.common.base.Preconditions; import io.netty.channel.EventLoopGroup; import io.netty.util.HashedWheelTimer; import java.io.IOException; -import org.apache.bookkeeper.client.BKException.ZKException; +import org.apache.bookkeeper.client.api.BKException; import org.apache.bookkeeper.client.api.BookKeeper; import org.apache.bookkeeper.client.api.BookKeeperBuilder; -import org.apache.bookkeeper.client.api.BKException; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.net.DNSToSwitchMapping; import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; /** @@ -86,13 +83,7 @@ public BookKeeperBuilder featureProvider(FeatureProvider featureProvider) { @Override public BookKeeper build() throws InterruptedException, BKException, IOException { - try { - return builder.build(); - } catch (KeeperException err) { - ZKException zkErr = new ZKException(); - zkErr.initCause(err); - throw zkErr; - } + return builder.build(); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index 5a2175346f4..8725a6ce6e7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -29,6 +29,8 @@ import org.apache.bookkeeper.client.EnsemblePlacementPolicy; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy; +import org.apache.bookkeeper.discover.RegistrationClient; +import org.apache.bookkeeper.discover.ZKRegistrationClient; import org.apache.bookkeeper.replication.Auditor; import org.apache.bookkeeper.util.ReflectionUtils; import org.apache.commons.configuration.ConfigurationException; @@ -147,6 +149,9 @@ public class ClientConfiguration extends AbstractConfiguration { protected final static String TLS_TRUSTSTORE = "clientTrustStore"; protected final static String TLS_TRUSTSTORE_PASSWORD_PATH = "clientTrustStorePasswordPath"; + // Registration Client + protected final static String REGISTRATION_CLIENT_CLASS = "registrationClientClass"; + /** * Construct a default client-side configuration */ @@ -1620,4 +1625,27 @@ public ClientConfiguration setNettyUsePooledBuffers(boolean enabled) { setProperty(NETTY_USE_POOLED_BUFFERS, enabled); return this; } + + /** + * Set registration manager class + * + * @param regClientClass + * ClientClass + */ + public void setRegistrationClientClass( + Class regClientClass) { + setProperty(REGISTRATION_CLIENT_CLASS, regClientClass); + } + + /** + * Get Registration Client Class. + * + * @return registration manager class. + */ + public Class getRegistrationClientClass() + throws ConfigurationException { + return ReflectionUtils.getClass(this, REGISTRATION_CLIENT_CLASS, + ZKRegistrationClient.class, RegistrationClient.class, + defaultLoader); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java index 7f6160adb63..068f1ccd8aa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java @@ -18,19 +18,98 @@ package org.apache.bookkeeper.discover; -import java.util.List; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.versioning.Versioned; +import org.apache.zookeeper.ZooKeeper; + /** * A registration client, which the bookkeeper client will use to interact with registration service. */ -public interface RegistrationClient { +@LimitedPrivate +@Evolving +public interface RegistrationClient extends AutoCloseable { /** - * Get the list of available bookie identifiers. + * Listener to receive changes from the registration service. + */ + interface RegistrationListener { + + void onBookiesChanged(Versioned> bookies); + + } + + /** + * Initialize the registration client with provided resources. + * + *

The existence of zkSupplier is for backward compatability. * - * @return a future represents the list of available bookies + * @param conf client configuration + * @param statsLogger stats logger + * @param zkOptional a supplier to supply zookeeper client. + * @return */ - CompletableFuture> getAvailableBookies(); + RegistrationClient initialize(ClientConfiguration conf, + ScheduledExecutorService scheduler, + StatsLogger statsLogger, + Optional zkOptional) + throws BKException; + @Override + void close(); + + /** + * Get the list of writable bookie identifiers. + * + * @return a future represents the list of writable bookies. + */ + CompletableFuture>> getWritableBookies(); + + /** + * Get the list of readonly bookie identifiers. + * + * @return a future represents the list of readonly bookies. + */ + CompletableFuture>> getReadOnlyBookies(); + + /** + * Watch the changes of bookies. + * + *

The topology changes of bookies will be propagated to the provided listener. + * + * @param listener listener to receive the topology changes of bookies. + */ + void watchWritableBookies(RegistrationListener listener); + + /** + * Unwatch the changes of bookies. + * + * @param listener listener to receive the topology changes of bookies. + */ + void unwatchWritableBookies(RegistrationListener listener); + + /** + * Watch the changes of bookies. + * + *

The topology changes of bookies will be propagated to the provided listener. + * + * @param listener listener to receive the topology changes of bookies. + */ + void watchReadOnlyBookies(RegistrationListener listener); + + /** + * Unwatch the changes of bookies. + * + * @param listener listener to receive the topology changes of bookies. + */ + void unwatchReadOnlyBookies(RegistrationListener listener); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java new file mode 100644 index 00000000000..e214521a71c --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bookkeeper.discover; + +import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY; + +import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BKException.BKInterruptedException; +import org.apache.bookkeeper.client.BKException.Code; +import org.apache.bookkeeper.client.BKException.ZKException; +import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.common.util.SafeRunnable; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.bookkeeper.versioning.LongVersion; +import org.apache.bookkeeper.versioning.Version; +import org.apache.bookkeeper.versioning.Version.Occurred; +import org.apache.bookkeeper.versioning.Versioned; +import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; +import org.apache.bookkeeper.zookeeper.ZooKeeperClient; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; + +/** + * ZooKeeper based {@link RegistrationClient}. + */ +@Slf4j +public class ZKRegistrationClient implements RegistrationClient { + + private static final int ZK_CONNECT_BACKOFF_MS = 200; + + private class WatchTask + implements SafeRunnable, + Watcher, + BiConsumer>, Throwable>, + AutoCloseable { + + private final String regPath; + private final Set listeners; + private boolean closed = false; + private Set bookies = null; + private Version version = Version.NEW; + + WatchTask(String regPath) { + this.regPath = regPath; + this.listeners = new CopyOnWriteArraySet<>(); + } + + public int getNumListeners() { + return listeners.size(); + } + + public boolean addListener(RegistrationListener listener) { + if (listeners.add(listener)) { + if (null != bookies) { + listener.onBookiesChanged(new Versioned<>(bookies, version)); + } + } + return true; + } + + public boolean removeListener(RegistrationListener listener) { + return listeners.remove(listener); + } + + void watch() { + scheduleWatchTask(0L); + } + + private void scheduleWatchTask(long delayMs) { + try { + scheduler.schedule(this, delayMs, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException ree) { + log.warn("Failed to schedule watch bookies task", ree); + } + } + + @Override + public void safeRun() { + if (isClosed()) { + return; + } + + getChildren(regPath, this) + .whenCompleteAsync(this, scheduler); + } + + @Override + public void accept(Versioned> bookieSet, Throwable throwable) { + if (throwable != null) { + scheduleWatchTask(ZK_CONNECT_BACKOFF_MS); + return; + } + + if (this.version.compare(bookieSet.getVersion()) == Occurred.BEFORE + || this.version.compare(bookieSet.getVersion()) == Occurred.CONCURRENTLY) { + this.version = bookieSet.getVersion(); + this.bookies = bookieSet.getValue(); + + for (RegistrationListener listener : listeners) { + listener.onBookiesChanged(bookieSet); + } + } + } + + @Override + public void process(WatchedEvent event) { + if (EventType.None == event.getType()) { + if (KeeperState.Expired == event.getState()) { + scheduleWatchTask(ZK_CONNECT_BACKOFF_MS); + } + return; + } + + // re-read the bookie list + scheduleWatchTask(0L); + } + + synchronized boolean isClosed() { + return closed; + } + + @Override + public synchronized void close() { + if (!closed) { + return; + } + closed = true; + } + } + + private ClientConfiguration conf; + private ZooKeeper zk = null; + // whether the zk handle is one we created, or is owned by whoever + // instantiated us + private boolean ownZKHandle = false; + private ScheduledExecutorService scheduler; + private WatchTask watchWritableBookiesTask = null; + private WatchTask watchReadOnlyBookiesTask = null; + + // registration paths + private String bookieRegistrationPath; + private String bookieReadonlyRegistrationPath; + + @Override + public RegistrationClient initialize(ClientConfiguration conf, + ScheduledExecutorService scheduler, + StatsLogger statsLogger, + Optional zkOptional) + throws BKException { + this.conf = conf; + this.scheduler = scheduler; + + this.bookieRegistrationPath = conf.getZkAvailableBookiesPath(); + this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY; + + // construct the zookeeper + if (zkOptional.isPresent()) { + this.zk = zkOptional.get(); + this.ownZKHandle = false; + } else { + try { + this.zk = ZooKeeperClient.newBuilder() + .connectString(conf.getZkServers()) + .sessionTimeoutMs(conf.getZkTimeout()) + .operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkTimeout(), + conf.getZkTimeout(), 0)) + .statsLogger(statsLogger) + .build(); + + if (null == zk.exists(bookieReadonlyRegistrationPath, false)) { + try { + List zkAcls = ZkUtils.getACLs(conf); + zk.create(bookieReadonlyRegistrationPath, + new byte[0], + zkAcls, + CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException e) { + // this node is just now created by someone. + } + } + } catch (IOException | KeeperException e) { + log.error("Failed to create zookeeper client to {}", conf.getZkServers(), e); + ZKException zke = new ZKException(); + zke.fillInStackTrace(); + throw zke; + } catch (InterruptedException e) { + throw new BKInterruptedException(); + } + this.ownZKHandle = true; + } + + return this; + } + + @Override + public void close() { + if (ownZKHandle && null != zk) { + try { + zk.close(); + } catch (InterruptedException e) { + log.warn("Interrupted on closing zookeeper client", e); + } + } + } + + public ZooKeeper getZk() { + return zk; + } + + @Override + public CompletableFuture>> getWritableBookies() { + return getChildren(bookieRegistrationPath, null); + } + + @Override + public CompletableFuture>> getReadOnlyBookies() { + return getChildren(bookieReadonlyRegistrationPath, null); + } + + private CompletableFuture>> getChildren(String regPath, Watcher watcher) { + CompletableFuture>> future = FutureUtils.createFuture(); + zk.getChildren(regPath, watcher, (rc, path, ctx, children, stat) -> { + if (Code.OK != rc) { + ZKException zke = new ZKException(); + zke.fillInStackTrace(); + future.completeExceptionally(zke); + return; + } + + Version version = new LongVersion(stat.getVersion()); + Set bookies = convertToBookieAddresses(children); + future.complete(new Versioned<>(bookies, version)); + }, null); + return future; + } + + + @Override + public synchronized void watchWritableBookies(RegistrationListener listener) { + if (null == watchWritableBookiesTask) { + watchWritableBookiesTask = new WatchTask(bookieRegistrationPath); + } + + watchWritableBookiesTask.addListener(listener); + if (watchWritableBookiesTask.getNumListeners() == 1) { + watchWritableBookiesTask.watch(); + } + } + + @Override + public synchronized void unwatchWritableBookies(RegistrationListener listener) { + if (null == watchWritableBookiesTask) { + return; + } + + watchWritableBookiesTask.removeListener(listener); + if (watchWritableBookiesTask.getNumListeners() == 0) { + watchWritableBookiesTask.close(); + watchWritableBookiesTask = null; + } + } + + @Override + public synchronized void watchReadOnlyBookies(RegistrationListener listener) { + if (null == watchReadOnlyBookiesTask) { + watchReadOnlyBookiesTask = new WatchTask(bookieReadonlyRegistrationPath); + } + + watchReadOnlyBookiesTask.addListener(listener); + if (watchReadOnlyBookiesTask.getNumListeners() == 1) { + watchReadOnlyBookiesTask.watch(); + } + } + + @Override + public synchronized void unwatchReadOnlyBookies(RegistrationListener listener) { + if (null == watchReadOnlyBookiesTask) { + return; + } + + watchReadOnlyBookiesTask.removeListener(listener); + if (watchReadOnlyBookiesTask.getNumListeners() == 0) { + watchReadOnlyBookiesTask.close(); + watchReadOnlyBookiesTask = null; + } + } + + private static HashSet convertToBookieAddresses(List children) { + // Read the bookie addresses into a set for efficient lookup + HashSet newBookieAddrs = Sets.newHashSet(); + for (String bookieAddrString : children) { + if (READONLY.equals(bookieAddrString)) { + continue; + } + + BookieSocketAddress bookieAddr; + try { + bookieAddr = new BookieSocketAddress(bookieAddrString); + } catch (IOException e) { + log.error("Could not parse bookie address: " + bookieAddrString + ", ignoring this bookie"); + continue; + } + newBookieAddrs.add(bookieAddr); + } + return newBookieAddrs; + } + +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/BKHttpServiceProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/BKHttpServiceProvider.java index b31b5e72cff..38eb007c48e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/BKHttpServiceProvider.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/BKHttpServiceProvider.java @@ -59,7 +59,7 @@ public class BKHttpServiceProvider implements HttpServiceProvider { private BKHttpServiceProvider(BookieServer bookieServer, AutoRecoveryMain autoRecovery, ServerConfiguration serverConf) - throws IOException, KeeperException, InterruptedException { + throws IOException, KeeperException, InterruptedException, BKException { this.bookieServer = bookieServer; this.autoRecovery = autoRecovery; this.serverConf = serverConf; @@ -129,7 +129,7 @@ public Builder setServerConfiguration(ServerConfiguration conf) { } public BKHttpServiceProvider build() - throws IOException, KeeperException, InterruptedException { + throws IOException, KeeperException, InterruptedException, BKException { return new BKHttpServiceProvider( bookieServer, autoRecovery, diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ListBookiesService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ListBookiesService.java index 5523f6a88da..f7e5fe427fa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ListBookiesService.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ListBookiesService.java @@ -70,7 +70,7 @@ public HttpServiceResponse handle(HttpServiceRequest request) throws Exception { params.get("print_hostnames").equals("true"); if (readOnly) { - bookies.addAll(bka.getReadOnlyBookiesAsync()); + bookies.addAll(bka.getReadOnlyBookies()); } else { bookies.addAll(bka.getAvailableBookies()); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java index 00c41bcc9fb..253a8715945 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java @@ -31,6 +31,7 @@ import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.bookie.ExitCode; import org.apache.bookkeeper.bookie.ReadOnlyBookie; +import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.processor.RequestProcessor; @@ -101,7 +102,7 @@ protected Bookie newBookie(ServerConfiguration conf) new Bookie(conf, statsLogger.scope(BOOKIE_SCOPE)); } - public void start() throws IOException, UnavailableException, InterruptedException, KeeperException { + public void start() throws IOException, UnavailableException, InterruptedException, BKException { this.bookie.start(); // fail fast, when bookie startup is not successful if (!this.bookie.isRunning()) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java index 4f84d03a289..7a10e0b52d4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java @@ -40,7 +40,6 @@ import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeperAdmin; -import org.apache.bookkeeper.client.BookiesListener; import org.apache.bookkeeper.client.LedgerChecker; import org.apache.bookkeeper.client.LedgerFragment; import org.apache.bookkeeper.client.LedgerHandle; @@ -73,7 +72,7 @@ * re-replication activities by keeping all the corresponding ledgers of the * failed bookie as underreplicated znode in zk. */ -public class Auditor implements BookiesListener { +public class Auditor { private static final Logger LOG = LoggerFactory.getLogger(Auditor.class); private final ServerConfiguration conf; private BookKeeper bkc; @@ -157,12 +156,9 @@ private void initialize(ServerConfiguration conf, ZooKeeper zkc) } catch (CompatibilityException ce) { throw new UnavailableException( "CompatibilityException while initializing Auditor", ce); - } catch (IOException ioe) { + } catch (IOException | BKException | KeeperException ioe) { throw new UnavailableException( - "IOException while initializing Auditor", ioe); - } catch (KeeperException ke) { - throw new UnavailableException( - "KeeperException while initializing Auditor", ke); + "Exception while initializing Auditor", ioe); } catch (InterruptedException ie) { throw new UnavailableException( "Interrupted while initializing Auditor", ie); @@ -375,7 +371,7 @@ public void run() { LOG.info("Periodic checking disabled"); } try { - notifyBookieChanges(); + watchBookieChanges(); knownBookies = getAvailableBookies(); } catch (BKException bke) { LOG.error("Couldn't get bookie list, exiting", bke); @@ -427,7 +423,7 @@ private void waitIfLedgerReplicationDisabled() throws UnavailableException, private List getAvailableBookies() throws BKException { // Get the available bookies Collection availableBkAddresses = admin.getAvailableBookies(); - Collection readOnlyBkAddresses = admin.getReadOnlyBookiesSync(); + Collection readOnlyBkAddresses = admin.getReadOnlyBookies(); availableBkAddresses.addAll(readOnlyBkAddresses); List availableBookies = new ArrayList(); @@ -437,9 +433,9 @@ private List getAvailableBookies() throws BKException { return availableBookies; } - private void notifyBookieChanges() throws BKException { - admin.notifyBookiesChanged(this); - admin.notifyReadOnlyBookiesChanged(this); + private void watchBookieChanges() throws BKException { + admin.watchWritableBookiesChanged(bookies -> submitAuditTask()); + admin.watchReadOnlyBookiesChanged(bookies -> submitAuditTask()); } /** @@ -702,17 +698,6 @@ public void processResult(int rc, String s, Object obj) { } } - @Override - public void availableBookiesChanged() { - // since a watch is triggered, we need to watch again on the bookies - try { - notifyBookieChanges(); - } catch (BKException bke) { - LOG.error("Exception while registering for a bookie change notification", bke); - } - submitAuditTask(); - } - /** * Shutdown the auditor */ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java index 61205f764ff..cd79afd723a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java @@ -131,10 +131,14 @@ public ReplicationWorker(final ZooKeeper zkc, .newLedgerManagerFactory(this.conf, this.zkc); this.underreplicationManager = mFactory .newLedgerUnderreplicationManager(); - this.bkc = BookKeeper.forConfig(new ClientConfiguration(conf)) - .zk(zkc) - .statsLogger(statsLogger.scope(BK_CLIENT_SCOPE)) - .build(); + try { + this.bkc = BookKeeper.forConfig(new ClientConfiguration(conf)) + .zk(zkc) + .statsLogger(statsLogger.scope(BK_CLIENT_SCOPE)) + .build(); + } catch (BKException e) { + throw new IOException("Failed to instantiate replication worker", e); + } this.admin = new BookKeeperAdmin(bkc, statsLogger); this.ledgerChecker = new LedgerChecker(bkc); this.workerThread = new BookieThread(this, "ReplicationWorker"); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java index 7b43122f5d8..ff9e9dcfb27 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java @@ -19,12 +19,12 @@ package org.apache.bookkeeper.server.service; import java.io.IOException; +import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; import org.apache.bookkeeper.server.component.ServerLifecycleComponent; import org.apache.bookkeeper.server.conf.BookieConfiguration; import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.zookeeper.KeeperException; /** * A {@link ServerLifecycleComponent} that starts the core bookie server. @@ -50,7 +50,7 @@ public BookieServer getServer() { protected void doStart() { try { this.server.start(); - } catch (IOException | UnavailableException | InterruptedException | KeeperException e) { + } catch (IOException | UnavailableException | InterruptedException | BKException e) { throw new RuntimeException("Failed to start bookie server", e); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java index f03767ca77b..02f779ee3ff 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim; import org.apache.bookkeeper.shims.zk.ZooKeeperServerShimFactory; import org.apache.bookkeeper.zookeeper.ZooKeeperClient; @@ -132,7 +133,7 @@ private static void cleanupDirectories(List dirs) throws IOException { private List runBookies(ServerConfiguration baseConf, String dirSuffix) throws IOException, KeeperException, InterruptedException, BookieException, - UnavailableException, CompatibilityException, SecurityException { + UnavailableException, CompatibilityException, SecurityException, BKException { List tempDirs = new ArrayList(); try { runBookies(baseConf, tempDirs, dirSuffix); @@ -161,7 +162,7 @@ private List runBookies(ServerConfiguration baseConf, String dirSuffix) @SuppressWarnings("deprecation") private void runBookies(ServerConfiguration baseConf, List tempDirs, String dirSuffix) throws IOException, KeeperException, InterruptedException, BookieException, UnavailableException, - CompatibilityException, SecurityException { + CompatibilityException, SecurityException, BKException { LOG.info("Starting Bookie(s)"); // Create Bookie Servers (B1, B2, B3) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java index d36e29dd732..6799d3e00b7 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java @@ -20,31 +20,33 @@ */ package org.apache.bookkeeper.auth; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.IOException; import java.util.Arrays; import java.util.Enumeration; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; - import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.proto.BookieConnectionPeer; import org.apache.bookkeeper.proto.BookieServer; +import org.apache.bookkeeper.proto.ClientConnectionPeer; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.junit.Assert.*; - -import org.junit.Test; -import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; -import org.apache.bookkeeper.proto.ClientConnectionPeer; -import org.apache.bookkeeper.proto.BookieConnectionPeer; - public class TestAuth extends BookKeeperClusterTestCase { static final Logger LOG = LoggerFactory.getLogger(TestAuth.class); @@ -113,12 +115,12 @@ private int entryCount(long ledgerId, ServerConfiguration bookieConf, public void testSingleMessageAuth() throws Exception { ServerConfiguration bookieConf = newServerConfiguration(); bookieConf.setBookieAuthProviderFactoryClass( - AlwaysSucceedBookieAuthProviderFactory.class.getName()); - + AlwaysSucceedBookieAuthProviderFactory.class.getName()); + ClientConfiguration clientConf = newClientConfiguration(); clientConf.setClientAuthProviderFactoryClass( - SendUntilCompleteClientAuthProviderFactory.class.getName()); - + SendUntilCompleteClientAuthProviderFactory.class.getName()); + startAndStoreBookie(bookieConf); AtomicLong ledgerId = new AtomicLong(-1); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java index 7206b236862..8ae4caf9003 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java @@ -35,7 +35,6 @@ import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.bookkeeper.test.annotations.FlakyTest; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +52,7 @@ public BookKeeperDiskSpaceWeightedLedgerPlacementTest() { } class BookKeeperCheckInfoReader extends BookKeeper { - BookKeeperCheckInfoReader(ClientConfiguration conf) throws KeeperException, IOException, InterruptedException { + BookKeeperCheckInfoReader(ClientConfiguration conf) throws BKException, IOException, InterruptedException { super(conf); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java index c0ce2424aba..3527575a0d8 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java @@ -21,6 +21,7 @@ package org.apache.bookkeeper.client; import io.netty.util.IllegalReferenceCountException; +import java.io.IOException; import java.util.Collections; import java.util.Enumeration; import java.util.concurrent.CountDownLatch; @@ -33,8 +34,8 @@ import org.apache.bookkeeper.client.BKException.BKBookieHandleNotAvailableException; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.apache.zookeeper.KeeperException.ConnectionLossException; import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.KeeperException; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -88,8 +89,9 @@ public void testConstructionNotConnectedExplicitZk() throws Exception { try { BookKeeper bkc = new BookKeeper(conf, zk); fail("Shouldn't be able to construct with unconnected zk"); - } catch (KeeperException.ConnectionLossException cle) { + } catch (IOException cle) { // correct behaviour + assertTrue(cle.getCause() instanceof ConnectionLossException); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java index 5401fef8574..a42a50d4b56 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java @@ -22,7 +22,6 @@ */ import java.io.IOException; - import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.proto.BookieClient; import org.apache.zookeeper.KeeperException; @@ -34,7 +33,7 @@ */ public class BookKeeperTestClient extends BookKeeper { public BookKeeperTestClient(ClientConfiguration conf) - throws IOException, InterruptedException, KeeperException { + throws IOException, InterruptedException, BKException { super(conf); } @@ -57,7 +56,7 @@ public BookieClient getBookieClient() { * @throws KeeperException */ public void readBookiesBlocking() - throws InterruptedException, KeeperException { + throws InterruptedException, BKException { bookieWatcher.readBookiesBlocking(); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java index f1534606ea5..b0cb8794518 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java @@ -112,7 +112,7 @@ private void runBookieWatcherWhenSessionExpired(ZooKeeper zk, int timeout, boole } // make zookeeper session expired - expireZooKeeperSession(bkc.zk, timeout); + expireZooKeeperSession(bkc.getZkHandle(), timeout); TimeUnit.MILLISECONDS.sleep(3 * timeout); // start four new bookies diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java index 9fe7bed8b47..272435e9a58 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java @@ -25,7 +25,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.net.BookieSocketAddress; @@ -54,36 +53,37 @@ public TestReadTimeout() { public void testReadTimeout() throws Exception { final AtomicBoolean completed = new AtomicBoolean(false); - LedgerHandle writelh = bkc.createLedger(3,3,digestType, "testPasswd".getBytes()); + LedgerHandle writelh = bkc.createLedger(3, 3, digestType, "testPasswd".getBytes()); String tmp = "Foobar"; - + final int numEntries = 10; for (int i = 0; i < numEntries; i++) { writelh.addEntry(tmp.getBytes()); } - + Set beforeSet = new HashSet(); beforeSet.addAll(writelh.getLedgerMetadata().getEnsemble(numEntries)); final BookieSocketAddress bookieToSleep = writelh.getLedgerMetadata().getEnsemble(numEntries).get(0); - int sleeptime = baseClientConf.getReadTimeout()*3; + int sleeptime = baseClientConf.getReadTimeout() * 3; CountDownLatch latch = sleepBookie(bookieToSleep, sleeptime); latch.await(); - writelh.asyncAddEntry(tmp.getBytes(), - new AddCallback() { - public void addComplete(int rc, LedgerHandle lh, - long entryId, Object ctx) { - completed.set(true); - } - }, null); - Thread.sleep((baseClientConf.getReadTimeout()*3)*1000); + writelh.asyncAddEntry(tmp.getBytes(), + new AddCallback() { + public void addComplete(int rc, LedgerHandle lh, + long entryId, Object ctx) { + completed.set(true); + } + }, null); + Thread.sleep((baseClientConf.getReadTimeout() * 3) * 1000); Assert.assertTrue("Write request did not finish", completed.get()); Set afterSet = new HashSet(); afterSet.addAll(writelh.getLedgerMetadata().getEnsemble(numEntries + 1)); beforeSet.removeAll(afterSet); Assert.assertTrue("Bookie set should not match", beforeSet.size() != 0); + } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java index 42f8ef518b2..b522fb28295 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java @@ -35,7 +35,6 @@ import org.apache.bookkeeper.proto.BookieServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.zookeeper.KeeperException; import org.junit.Before; import org.junit.Test; @@ -230,9 +229,6 @@ void auxTestReadWriteAsyncSingleClient(BookieServer bs) throws IOException { LOG.info("Verified that entries are ok, and now closing ledger"); lh.close(); - } catch (KeeperException e) { - LOG.error("Caught KeeperException", e); - fail(e.toString()); } catch (BKException e) { LOG.error("Caught BKException", e); fail(e.toString());