Skip to content

Commit

Permalink
ISSUE apache#666: Introduce registration client for bookkeeper client…
Browse files Browse the repository at this point in the history
… to do bookie discover

Descriptions of the changes in this PR:
while in BOOKKEEPER-628, It is trying to improve/generalize the bookie registration process.

This PR follows the work of apache#663, which provide bookie side registration, and it is for client side work, it tries to introduce RegistrationClient for bookkeeper client to do bookie discover.

This PR follows the work of apache#663, which provide bookie side registration,

Author: Jia Zhai <[email protected]>

Reviewers: Sijie Guo <[email protected]>

This closes apache#667 from zhaijack/client_registration2, closes apache#666
  • Loading branch information
jiazhai authored and Ivan Kelly committed Nov 2, 2017
1 parent 34f1b05 commit 5eaa25f
Show file tree
Hide file tree
Showing 25 changed files with 688 additions and 509 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ static class Context {

public BenchThroughputLatency(int ensemble, int writeQuorumSize, int ackQuorumSize, byte[] passwd,
int numberOfLedgers, int sendLimit, ClientConfiguration conf)
throws KeeperException, IOException, InterruptedException {
throws BKException, IOException, InterruptedException {
this.sem = new Semaphore(conf.getThrottleValue());
bk = new BookKeeper(conf);
this.counter = new AtomicLong(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -192,8 +191,6 @@ public void run() {
executor.shutdown();
} catch (ExecutionException ee) {
LOG.error("Exception in worker", ee);
} catch (KeeperException ke) {
LOG.error("Error accessing zookeeper", ke);
} catch (BKException e) {
LOG.error("Error accessing bookkeeper", e);
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ public int runCmd(CommandLine cmdLine) throws Exception {
bookies.addAll(availableBookies);
} else if (cmdLine.hasOption("ro")) {
Collection<BookieSocketAddress> roBookies = bka
.getReadOnlyBookiesAsync();
.getReadOnlyBookies();
bookies.addAll(roBookies);
}
for (BookieSocketAddress b : bookies) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.AsyncCallback.IsClosedCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException.ZKException;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCreateAdvCallback;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCreateCallback;
Expand All @@ -51,7 +52,10 @@
import org.apache.bookkeeper.client.api.CreateBuilder;
import org.apache.bookkeeper.client.api.DeleteBuilder;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.discover.ZKRegistrationClient;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
Expand All @@ -64,13 +68,11 @@
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang.SystemUtils;
import org.apache.zookeeper.KeeperException;
Expand All @@ -94,7 +96,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {

static final Logger LOG = LoggerFactory.getLogger(BookKeeper.class);

final ZooKeeper zk;
final RegistrationClient regClient;
final EventLoopGroup eventLoopGroup;

// The stats logger for this client.
Expand All @@ -115,9 +117,6 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
// whether the event loop group is one we created, or is owned by whoever
// instantiated us
boolean ownEventLoopGroup = false;
// whether the zk handle is one we created, or is owned by whoever
// instantiated us
boolean ownZKHandle = false;

final BookieClient bookieClient;
final BookieWatcher bookieWatcher;
Expand Down Expand Up @@ -287,7 +286,7 @@ public Builder featureProvider(FeatureProvider featureProvider) {
return this;
}

public BookKeeper build() throws IOException, InterruptedException, KeeperException {
public BookKeeper build() throws IOException, InterruptedException, BKException {
Preconditions.checkNotNull(statsLogger, "No stats logger provided");
return new BookKeeper(conf, zk, eventLoopGroup, statsLogger, dnsResolver, requestTimer, featureProvider);
}
Expand All @@ -305,13 +304,13 @@ public static Builder forConfig(final ClientConfiguration conf) {
* A list of one of more servers on which zookeeper is running. The
* client assumes that the running bookies have been registered with
* zookeeper under the path
* {@link BookieWatcher#bookieRegistrationPath}
* {@link AbstractConfiguration#getZkAvailableBookiesPath()}
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
*/
public BookKeeper(String servers) throws IOException, InterruptedException,
KeeperException {
BKException {
this(new ClientConfiguration().setZkServers(servers));
}

Expand All @@ -327,13 +326,17 @@ public BookKeeper(String servers) throws IOException, InterruptedException,
* @throws KeeperException
*/
public BookKeeper(final ClientConfiguration conf)
throws IOException, InterruptedException, KeeperException {
throws IOException, InterruptedException, BKException {
this(conf, null, null, NullStatsLogger.INSTANCE,
null, null, null);
}

private static ZooKeeper validateZooKeeper(ZooKeeper zk) throws NullPointerException {
private static ZooKeeper validateZooKeeper(ZooKeeper zk) throws NullPointerException, IOException {
Preconditions.checkNotNull(zk, "No zookeeper instance provided");
if (!zk.getState().isConnected()) {
LOG.error("Unconnected zookeeper handle passed to bookkeeper");
throw new IOException(KeeperException.create(KeeperException.Code.CONNECTIONLOSS));
}
return zk;
}

Expand All @@ -358,8 +361,7 @@ private static EventLoopGroup validateEventLoopGroup(EventLoopGroup eventLoopGro
* @throws KeeperException
*/
public BookKeeper(ClientConfiguration conf, ZooKeeper zk)
throws IOException, InterruptedException, KeeperException {

throws IOException, InterruptedException, BKException {
this(conf, validateZooKeeper(zk), null, NullStatsLogger.INSTANCE, null, null, null);
}

Expand All @@ -381,7 +383,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk)
* @throws KeeperException if the passed zk handle is not connected
*/
public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLoopGroup)
throws IOException, InterruptedException, KeeperException {
throws IOException, InterruptedException, BKException {
this(conf, validateZooKeeper(zk), validateEventLoopGroup(eventLoopGroup), NullStatsLogger.INSTANCE,
null, null, null);
}
Expand All @@ -396,28 +398,49 @@ private BookKeeper(ClientConfiguration conf,
DNSToSwitchMapping dnsResolver,
HashedWheelTimer requestTimer,
FeatureProvider featureProvider)
throws IOException, InterruptedException, KeeperException {
throws IOException, InterruptedException, BKException {
this.conf = conf;
this.delayEnsembleChange = conf.getDelayEnsembleChange();
this.reorderReadSequence = conf.isReorderReadSequenceEnabled();

// initialize zookeeper client
if (zkc == null) {
this.zk = ZooKeeperClient.newBuilder()
.connectString(conf.getZkServers())
.sessionTimeoutMs(conf.getZkTimeout())
.operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkTimeout(),
conf.getZkTimeout(), 0))
.statsLogger(statsLogger)
.build();
this.ownZKHandle = true;
// initialize resources
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setNameFormat(
"BookKeeperClientScheduler-%d");
this.scheduler = Executors
.newSingleThreadScheduledExecutor(tfb.build());
this.mainWorkerPool = OrderedSafeExecutor.newBuilder()
.name("BookKeeperClientWorker")
.numThreads(conf.getNumWorkerThreads())
.statsLogger(statsLogger)
.traceTaskExecution(conf.getEnableTaskExecutionStats())
.traceTaskWarnTimeMicroSec(conf.getTaskExecutionWarnTimeMicros())
.build();

// initialize stats logger
this.statsLogger = statsLogger.scope(BookKeeperClientStats.CLIENT_SCOPE);
initOpLoggers(this.statsLogger);

// initialize feature provider
if (null == featureProvider) {
this.featureProvider = SettableFeatureProvider.DISABLE_ALL;
} else {
if (!zkc.getState().isConnected()) {
LOG.error("Unconnected zookeeper handle passed to bookkeeper");
throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
}
this.zk = zkc;
this.ownZKHandle = false;
this.featureProvider = featureProvider;
}
this.disableEnsembleChangeFeature =
this.featureProvider.getFeature(conf.getDisableEnsembleChangeFeatureName());

// initialize registration client
try {
Class<? extends RegistrationClient> regClientCls = conf.getRegistrationClientClass();
this.regClient = ReflectionUtils.newInstance(regClientCls);
this.regClient.initialize(
conf,
scheduler,
statsLogger,
java.util.Optional.ofNullable(zkc));
} catch (ConfigurationException ce) {
LOG.error("Failed to initialize registration client", ce);
throw new IOException("Failed to initialize registration client", ce);
}

// initialize event loop group
Expand All @@ -440,25 +463,6 @@ private BookKeeper(ClientConfiguration conf,
this.ownTimer = false;
}

if (null == featureProvider) {
this.featureProvider = SettableFeatureProvider.DISABLE_ALL;
} else {
this.featureProvider = featureProvider;
}

// get features
this.disableEnsembleChangeFeature = this.featureProvider.getFeature(conf.getDisableEnsembleChangeFeatureName());

// initialize scheduler
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setNameFormat(
"BookKeeperClientScheduler-%d");
this.scheduler = Executors
.newSingleThreadScheduledExecutor(tfb.build());

// initialize stats logger
this.statsLogger = statsLogger.scope(BookKeeperClientStats.CLIENT_SCOPE);
initOpLoggers(this.statsLogger);

// initialize the ensemble placement
this.placementPolicy = initializeEnsemblePlacementPolicy(conf,
dnsResolver, this.requestTimer, this.featureProvider, this.statsLogger);
Expand All @@ -482,18 +486,11 @@ private BookKeeper(ClientConfiguration conf,
} else {
this.readLACSpeculativeRequestPolicy = Optional.<SpeculativeRequestExecutionPolicy>absent();
}
// initialize main worker pool
this.mainWorkerPool = OrderedSafeExecutor.newBuilder()
.name("BookKeeperClientWorker")
.numThreads(conf.getNumWorkerThreads())
.statsLogger(statsLogger)
.traceTaskExecution(conf.getEnableTaskExecutionStats())
.traceTaskWarnTimeMicroSec(conf.getTaskExecutionWarnTimeMicros())
.build();


// initialize bookie client
this.bookieClient = new BookieClient(conf, this.eventLoopGroup, this.mainWorkerPool, statsLogger);
this.bookieWatcher = new BookieWatcher(conf, this.scheduler, this.placementPolicy, this);
this.bookieWatcher = new BookieWatcher(conf, this.placementPolicy, regClient);
if (conf.getDiskWeightBasedPlacementEnabled()) {
LOG.info("Weighted ledger placement enabled");
ThreadFactoryBuilder tFBuilder = new ThreadFactoryBuilder()
Expand All @@ -510,7 +507,12 @@ private BookKeeper(ClientConfiguration conf,
}

// initialize ledger manager
this.ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
try {
this.ledgerManagerFactory =
LedgerManagerFactory.newLedgerManagerFactory(conf, ((ZKRegistrationClient) regClient).getZk());
} catch (KeeperException ke) {
throw new ZKException();
}
this.ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager());
this.ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator();
this.explicitLacInterval = conf.getExplictLacInterval();
Expand Down Expand Up @@ -638,7 +640,7 @@ public static DigestType fromApiDigestType(org.apache.bookkeeper.client.api.Dige
}

ZooKeeper getZkHandle() {
return zk;
return ((ZKRegistrationClient) regClient).getZk();
}

protected ClientConfiguration getConf() {
Expand Down Expand Up @@ -1371,9 +1373,7 @@ public void close() throws BKException, InterruptedException {
if (ownEventLoopGroup) {
eventLoopGroup.shutdownGracefully();
}
if (ownZKHandle) {
zk.close();
}
this.regClient.close();
}

private final void initOpLoggers(StatsLogger stats) {
Expand Down
Loading

0 comments on commit 5eaa25f

Please sign in to comment.