Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize paginated getChildren #8

Merged
merged 7 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions zookeeper-jute/src/main/resources/zookeeper.jute
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ module org.apache.zookeeper.proto {
class GetChildrenPaginatedRequest {
ustring path;
int maxReturned;
long minCzxId;
long czxIdOffset;
long minCzxid;
int czxidOffset;
huizhilu marked this conversation as resolved.
Show resolved Hide resolved
boolean watch;
}
class CheckVersionRequest {
Expand Down Expand Up @@ -240,8 +240,10 @@ module org.apache.zookeeper.proto {
org.apache.zookeeper.data.Stat stat;
}
class GetChildrenPaginatedResponse {
vector<org.apache.zookeeper.data.PathWithStat> children;
vector<ustring> children;
org.apache.zookeeper.data.Stat stat;
long nextPageCzxid;
int nextPageCzxidOffset;
huizhilu marked this conversation as resolved.
Show resolved Hide resolved
}
class GetACLResponse {
vector<org.apache.zookeeper.data.ACL> acl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,24 @@
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.zookeeper.data.PathWithStat;

/**
* Iterator over children nodes of a given path.
* <p>
* Note: the final collection of children may not be strongly consistent with the server.
* If there are concurrent writes to the children during iteration, the final collection could
* miss some children or contain some duplicate children.
*
* @see ZooKeeper#getAllChildrenPaginated(String, boolean)
*/
class ChildrenBatchIterator implements RemoteIterator<PathWithStat> {
class ChildrenBatchIterator implements RemoteIterator<String> {

private final ZooKeeper zooKeeper;
private final String path;
private final Watcher watcher;
private final int batchSize;
private final LinkedList<PathWithStat> childrenQueue;
private final LinkedList<String> childrenQueue;
private final PaginationNextPage nextPage;
private long nextBatchMinZxid;
private int nextBatchZxidOffset;

Expand All @@ -47,55 +53,43 @@ class ChildrenBatchIterator implements RemoteIterator<PathWithStat> {
this.nextBatchMinZxid = minZxid;

this.childrenQueue = new LinkedList<>();
this.nextPage = new PaginationNextPage();

List<PathWithStat> firstChildrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset);
childrenQueue.addAll(firstChildrenBatch);

updateOffsetsForNextBatch(firstChildrenBatch);
batchGetChildren();
}

@Override
public boolean hasNext() {

// next() never lets childrenQueue empty unless we iterated over all children
return !childrenQueue.isEmpty();
}

@Override
public PathWithStat next() throws KeeperException, InterruptedException, NoSuchElementException {

public String next() throws KeeperException, InterruptedException {
if (!hasNext()) {
throw new NoSuchElementException("No more children");
}

// If we're down to the last element, backfill before returning it
if (childrenQueue.size() == 1) {

List<PathWithStat> childrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset);
childrenQueue.addAll(childrenBatch);

updateOffsetsForNextBatch(childrenBatch);
if (childrenQueue.size() == 1 && nextBatchMinZxid != ZooDefs.GetChildrenPaginated.lastPageMinCzxid) {
huizhilu marked this conversation as resolved.
Show resolved Hide resolved
batchGetChildren();
}

PathWithStat returnChildren = childrenQueue.pop();

return returnChildren;
return childrenQueue.pop();
}

/**
* Prepare minZxid and zkidOffset for the next batch request based on the children returned in the current
* Prepare minZxid and zxidOffset for the next batch request
*/
private void updateOffsetsForNextBatch(List<PathWithStat> children) {

for (PathWithStat child : children) {
long childZxid = child.getStat().getCzxid();
private void updateOffsetsForNextBatch() {
nextBatchMinZxid = nextPage.getMinCzxid();
nextBatchZxidOffset = nextPage.getMinCzxidOffset();
huizhilu marked this conversation as resolved.
Show resolved Hide resolved
}

if (nextBatchMinZxid == childZxid) {
++nextBatchZxidOffset;
} else {
nextBatchZxidOffset = 1;
nextBatchMinZxid = childZxid;
}
}
private void batchGetChildren() throws KeeperException, InterruptedException {
List<String> childrenBatch =
zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset, null, nextPage);
childrenQueue.addAll(childrenBatch);
updateOffsetsForNextBatch();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper;

import org.apache.yetus.audience.InterfaceAudience;

/**
* Represents the info used to fetch the next page of data for pagination.
*/
@InterfaceAudience.Public
public class PaginationNextPage {
private long minCzxid;
private int minCzxidOffset;
huizhilu marked this conversation as resolved.
Show resolved Hide resolved

public long getMinCzxid() {
return minCzxid;
}

public void setMinCzxid(long minCzxid) {
huizhilu marked this conversation as resolved.
Show resolved Hide resolved
this.minCzxid = minCzxid;
}

public int getMinCzxidOffset() {
return minCzxidOffset;
}

public void setMinCzxidOffset(int minCzxidOffset) {
this.minCzxidOffset = minCzxidOffset;
}

@Override
public String toString() {
junkaixue marked this conversation as resolved.
Show resolved Hide resolved
return "PaginationNextPage{"
+ "minCzxid=" + minCzxid
+ ", minCzxidOffset=" + minCzxidOffset
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@ public interface RemoteIterator<E> {

/**
* Returns true if the iterator has more elements.
*
* @return true if the iterator has more elements, false otherwise.
*/
boolean hasNext();

/**
* Returns the next element in the iteration.
*
* @return the next element in the iteration.
* @throws InterruptedException if the thread is interrupted
* @throws KeeperException if an error is encountered server-side
* @throws NoSuchElementException if the iteration has no more elements
*/
E next() throws InterruptedException, KeeperException, NoSuchElementException;
E next() throws InterruptedException, KeeperException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,13 @@ public interface AddWatchModes {
int persistentRecursive = 1; // matches AddWatchMode.PERSISTENT_RECURSIVE
}

@InterfaceAudience.Public
public interface GetChildrenPaginated {
// Represents the current fetched page is the final page, no more children left to fetch.
long lastPageMinCzxid = -1L;
int lastPageCzxidOffset = -1;
}

public static final String[] opNames = {"notification", "create", "delete", "exists", "getData", "setData", "getACL", "setACL", "getChildren", "getChildren2", "getMaxChildren", "setMaxChildren", "ping", "reconfig", "getConfig"};

}
Loading