Skip to content

Commit

Permalink
Optimize paginated getChildren: return children names instead of Path…
Browse files Browse the repository at this point in the history
…WithStat
  • Loading branch information
huizhilu committed Nov 20, 2020
1 parent 4ee53c4 commit 60d5e51
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 332 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,18 @@
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.zookeeper.data.PathWithStat;

/**
* Iterator over children nodes of a given path.
* <p>
* Note: the final collection of children may not be strongly consistent with the server.
* If there are concurrent writes to the children during iteration, the final collection could
* miss some children or contain some duplicate children.
*
* @see ZooKeeper#getAllChildrenPaginated(String, boolean)
*/
class ChildrenBatchIterator implements RemoteIterator<String> {
class ChildrenBatchIterator implements RemoteIterator<PathWithStat> {

private final ZooKeeper zooKeeper;
private final String path;
private final Watcher watcher;
private final int batchSize;
private final LinkedList<String> childrenQueue;
private final PaginationNextPage nextPage;
private final LinkedList<PathWithStat> childrenQueue;
private long nextBatchMinZxid;
private int nextBatchZxidOffset;

Expand All @@ -53,44 +47,55 @@ class ChildrenBatchIterator implements RemoteIterator<String> {
this.nextBatchMinZxid = minZxid;

this.childrenQueue = new LinkedList<>();
this.nextPage = new PaginationNextPage();

batchGetChildren();
List<PathWithStat> firstChildrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset);
childrenQueue.addAll(firstChildrenBatch);

updateOffsetsForNextBatch(firstChildrenBatch);
}

@Override
public boolean hasNext() {

// next() never lets childrenQueue empty unless we iterated over all children
return !childrenQueue.isEmpty();
}

@Override
public String next() throws KeeperException, InterruptedException {
public PathWithStat next() throws KeeperException, InterruptedException, NoSuchElementException {

if (!hasNext()) {
throw new NoSuchElementException("No more children");
}

// If we're down to the last element, backfill before returning it
if (childrenQueue.size() == 1 && nextBatchMinZxid != ZooDefs.GetChildrenPaginated.lastPageMinCzxid
&& nextBatchZxidOffset != ZooDefs.GetChildrenPaginated.lastPageMinCzxidOffset) {
batchGetChildren();
if (childrenQueue.size() == 1) {

List<PathWithStat> childrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset);
childrenQueue.addAll(childrenBatch);

updateOffsetsForNextBatch(childrenBatch);
}

return childrenQueue.pop();
PathWithStat returnChildren = childrenQueue.pop();

return returnChildren;
}

/**
* Prepare minZxid and zxidOffset for the next batch request
* Prepare minZxid and zkidOffset for the next batch request based on the children returned in the current
*/
private void updateOffsetsForNextBatch() {
nextBatchMinZxid = nextPage.getMinCzxid();
nextBatchZxidOffset = nextPage.getMinCzxidOffset();
}
private void updateOffsetsForNextBatch(List<PathWithStat> children) {

private void batchGetChildren() throws KeeperException, InterruptedException {
List<String> childrenBatch =
zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset, nextPage);
childrenQueue.addAll(childrenBatch);
updateOffsetsForNextBatch();
for (PathWithStat child : children) {
long childZxid = child.getStat().getCzxid();

if (nextBatchMinZxid == childZxid) {
++nextBatchZxidOffset;
} else {
nextBatchZxidOffset = 1;
nextBatchMinZxid = childZxid;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,16 @@ public interface RemoteIterator<E> {

/**
* Returns true if the iterator has more elements.
*
* @return true if the iterator has more elements, false otherwise.
*/
boolean hasNext();

/**
* Returns the next element in the iteration.
*
* @return the next element in the iteration.
* @throws InterruptedException if the thread is interrupted
* @throws KeeperException if an error is encountered server-side
* @throws NoSuchElementException if the iteration has no more elements
*/
E next() throws InterruptedException, KeeperException;
E next() throws InterruptedException, KeeperException, NoSuchElementException;
}
Loading

0 comments on commit 60d5e51

Please sign in to comment.