Skip to content

Commit 4ee53c4

Browse files
committed
Optimize paginated getChildren
1 parent 58e65ad commit 4ee53c4

File tree

13 files changed

+522
-209
lines changed

13 files changed

+522
-209
lines changed

zookeeper-jute/src/main/resources/zookeeper.jute

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,8 @@ module org.apache.zookeeper.proto {
164164
class GetChildrenPaginatedRequest {
165165
ustring path;
166166
int maxReturned;
167-
long minCzxId;
168-
long czxIdOffset;
167+
long minCzxid;
168+
int czxidOffset;
169169
boolean watch;
170170
}
171171
class CheckVersionRequest {
@@ -240,8 +240,10 @@ module org.apache.zookeeper.proto {
240240
org.apache.zookeeper.data.Stat stat;
241241
}
242242
class GetChildrenPaginatedResponse {
243-
vector<org.apache.zookeeper.data.PathWithStat> children;
243+
vector<ustring> children;
244244
org.apache.zookeeper.data.Stat stat;
245+
long nextPageCzxid;
246+
int nextPageCzxidOffset;
245247
}
246248
class GetACLResponse {
247249
vector<org.apache.zookeeper.data.ACL> acl;

zookeeper-server/src/main/java/org/apache/zookeeper/ChildrenBatchIterator.java

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,24 @@
2121
import java.util.LinkedList;
2222
import java.util.List;
2323
import java.util.NoSuchElementException;
24-
import org.apache.zookeeper.data.PathWithStat;
2524

2625
/**
2726
* Iterator over children nodes of a given path.
27+
* <p>
28+
* Note: the final collection of children may not be strongly consistent with the server.
29+
* If there are concurrent writes to the children during iteration, the final collection could
30+
* miss some children or contain some duplicate children.
31+
*
32+
* @see ZooKeeper#getAllChildrenPaginated(String, boolean)
2833
*/
29-
class ChildrenBatchIterator implements RemoteIterator<PathWithStat> {
34+
class ChildrenBatchIterator implements RemoteIterator<String> {
3035

3136
private final ZooKeeper zooKeeper;
3237
private final String path;
3338
private final Watcher watcher;
3439
private final int batchSize;
35-
private final LinkedList<PathWithStat> childrenQueue;
40+
private final LinkedList<String> childrenQueue;
41+
private final PaginationNextPage nextPage;
3642
private long nextBatchMinZxid;
3743
private int nextBatchZxidOffset;
3844

@@ -47,55 +53,44 @@ class ChildrenBatchIterator implements RemoteIterator<PathWithStat> {
4753
this.nextBatchMinZxid = minZxid;
4854

4955
this.childrenQueue = new LinkedList<>();
56+
this.nextPage = new PaginationNextPage();
5057

51-
List<PathWithStat> firstChildrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset);
52-
childrenQueue.addAll(firstChildrenBatch);
53-
54-
updateOffsetsForNextBatch(firstChildrenBatch);
58+
batchGetChildren();
5559
}
5660

5761
@Override
5862
public boolean hasNext() {
59-
6063
// next() never lets childrenQueue empty unless we iterated over all children
6164
return !childrenQueue.isEmpty();
6265
}
6366

6467
@Override
65-
public PathWithStat next() throws KeeperException, InterruptedException, NoSuchElementException {
66-
68+
public String next() throws KeeperException, InterruptedException {
6769
if (!hasNext()) {
6870
throw new NoSuchElementException("No more children");
6971
}
7072

7173
// If we're down to the last element, backfill before returning it
72-
if (childrenQueue.size() == 1) {
73-
74-
List<PathWithStat> childrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset);
75-
childrenQueue.addAll(childrenBatch);
76-
77-
updateOffsetsForNextBatch(childrenBatch);
74+
if (childrenQueue.size() == 1 && nextBatchMinZxid != ZooDefs.GetChildrenPaginated.lastPageMinCzxid
75+
&& nextBatchZxidOffset != ZooDefs.GetChildrenPaginated.lastPageMinCzxidOffset) {
76+
batchGetChildren();
7877
}
7978

80-
PathWithStat returnChildren = childrenQueue.pop();
81-
82-
return returnChildren;
79+
return childrenQueue.pop();
8380
}
8481

8582
/**
86-
* Prepare minZxid and zkidOffset for the next batch request based on the children returned in the current
83+
* Prepare minZxid and zxidOffset for the next batch request
8784
*/
88-
private void updateOffsetsForNextBatch(List<PathWithStat> children) {
89-
90-
for (PathWithStat child : children) {
91-
long childZxid = child.getStat().getCzxid();
85+
private void updateOffsetsForNextBatch() {
86+
nextBatchMinZxid = nextPage.getMinCzxid();
87+
nextBatchZxidOffset = nextPage.getMinCzxidOffset();
88+
}
9289

93-
if (nextBatchMinZxid == childZxid) {
94-
++nextBatchZxidOffset;
95-
} else {
96-
nextBatchZxidOffset = 1;
97-
nextBatchMinZxid = childZxid;
98-
}
99-
}
90+
private void batchGetChildren() throws KeeperException, InterruptedException {
91+
List<String> childrenBatch =
92+
zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset, nextPage);
93+
childrenQueue.addAll(childrenBatch);
94+
updateOffsetsForNextBatch();
10095
}
10196
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.zookeeper;
19+
20+
import org.apache.yetus.audience.InterfaceAudience;
21+
22+
/**
23+
* Represents the info used to fetch the next page of data for pagination.
24+
*/
25+
@InterfaceAudience.Public
26+
public class PaginationNextPage {
27+
private long minCzxid;
28+
private int minCzxidOffset;
29+
30+
public PaginationNextPage() {
31+
}
32+
33+
public PaginationNextPage(long minCzxid, int minCzxidOffset) {
34+
this.minCzxid = minCzxid;
35+
this.minCzxidOffset = minCzxidOffset;
36+
}
37+
38+
public long getMinCzxid() {
39+
return minCzxid;
40+
}
41+
42+
public void setMinCzxid(long minCzxid) {
43+
this.minCzxid = minCzxid;
44+
}
45+
46+
public int getMinCzxidOffset() {
47+
return minCzxidOffset;
48+
}
49+
50+
public void setMinCzxidOffset(int minCzxidOffset) {
51+
this.minCzxidOffset = minCzxidOffset;
52+
}
53+
54+
@Override
55+
public String toString() {
56+
return "PaginationNextPage{" +
57+
"minCzxid=" + minCzxid +
58+
", minCzxidOffset=" + minCzxidOffset +
59+
'}';
60+
}
61+
}

zookeeper-server/src/main/java/org/apache/zookeeper/RemoteIterator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,18 @@ public interface RemoteIterator<E> {
2727

2828
/**
2929
* Returns true if the iterator has more elements.
30+
*
3031
* @return true if the iterator has more elements, false otherwise.
3132
*/
3233
boolean hasNext();
3334

3435
/**
3536
* Returns the next element in the iteration.
37+
*
3638
* @return the next element in the iteration.
3739
* @throws InterruptedException if the thread is interrupted
3840
* @throws KeeperException if an error is encountered server-side
3941
* @throws NoSuchElementException if the iteration has no more elements
4042
*/
41-
E next() throws InterruptedException, KeeperException, NoSuchElementException;
43+
E next() throws InterruptedException, KeeperException;
4244
}

zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,12 @@ public interface AddWatchModes {
161161
int persistentRecursive = 1; // matches AddWatchMode.PERSISTENT_RECURSIVE
162162
}
163163

164+
@InterfaceAudience.Public
165+
public interface GetChildrenPaginated {
166+
long lastPageMinCzxid = -1L;
167+
int lastPageMinCzxidOffset = -1;
168+
}
169+
164170
public static final String[] opNames = {"notification", "create", "delete", "exists", "getData", "setData", "getACL", "setACL", "getChildren", "getChildren2", "getMaxChildren", "setMaxChildren", "ping", "reconfig", "getConfig"};
165171

166172
}

zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java

Lines changed: 100 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import org.apache.zookeeper.client.ZooKeeperSaslClient;
5454
import org.apache.zookeeper.common.PathUtils;
5555
import org.apache.zookeeper.data.ACL;
56-
import org.apache.zookeeper.data.PathWithStat;
5756
import org.apache.zookeeper.data.Stat;
5857
import org.apache.zookeeper.proto.AddWatchRequest;
5958
import org.apache.zookeeper.proto.CheckWatchesRequest;
@@ -2769,11 +2768,15 @@ public List<String> getChildren(final String path, Watcher watcher) throws Keepe
27692768
* @param maxReturned
27702769
* - the maximum number of children returned
27712770
* @param minCzxId
2772-
* - only return children whose creation zkid is equal or greater than {@code minCzxId}
2773-
* @param czxIdOffset
2774-
* - how many children with zkid == minCzxId to skip server-side, as they were returned in previous pages
2771+
* - only return children whose creation zxid is equal or greater than {@code minCzxId}
2772+
* @param czxidOffset
2773+
* - how many children with zxid == minCzxId to skip server-side, as they were returned in previous pages
2774+
* @param nextPage
2775+
* - if not null, {@link PaginationNextPage} info returned from server will be copied to {@code nextPage}.
2776+
* The info can be used for fetching the next page of remaining children, or checking whether the
2777+
* returned page is the last one
27752778
* @return
2776-
* an ordered list of children nodes, up to {@code maxReturned}, ordered by czxid
2779+
* a list of children nodes, up to {@code maxReturned}
27772780
* @throws KeeperException
27782781
* if the server signals an error with a non-zero error code.
27792782
* @throws IllegalArgumentException
@@ -2788,10 +2791,14 @@ public List<String> getChildren(final String path, Watcher watcher) throws Keepe
27882791
*
27892792
* @since 3.6.2
27902793
*/
2791-
public List<PathWithStat> getChildren(final String path, Watcher watcher, final int maxReturned, final long minCzxId, final int czxIdOffset)
2794+
public List<String> getChildren(final String path,
2795+
Watcher watcher,
2796+
int maxReturned,
2797+
long minCzxId,
2798+
int czxidOffset,
2799+
final PaginationNextPage nextPage)
27922800
throws KeeperException, InterruptedException {
2793-
final String clientPath = path;
2794-
PathUtils.validatePath(clientPath);
2801+
PathUtils.validatePath(path);
27952802

27962803
if (maxReturned <= 0) {
27972804
throw new IllegalArgumentException("Cannot return less than 1 children");
@@ -2800,26 +2807,100 @@ public List<PathWithStat> getChildren(final String path, Watcher watcher, final
28002807
// the watch contains the un-chroot path
28012808
WatchRegistration wcb = null;
28022809
if (watcher != null) {
2803-
wcb = new ChildWatchRegistration(watcher, clientPath);
2810+
wcb = new ChildWatchRegistration(watcher, path);
28042811
}
28052812

2806-
final String serverPath = prependChroot(clientPath);
2813+
final String serverPath = prependChroot(path);
28072814

28082815
RequestHeader h = new RequestHeader();
28092816
h.setType(ZooDefs.OpCode.getChildrenPaginated);
28102817
GetChildrenPaginatedRequest request = new GetChildrenPaginatedRequest();
28112818
request.setPath(serverPath);
28122819
request.setWatch(watcher != null);
28132820
request.setMaxReturned(maxReturned);
2814-
request.setMinCzxId(minCzxId);
2815-
request.setCzxIdOffset(czxIdOffset);
2816-
GetChildrenPaginatedResponse response = new GetChildrenPaginatedResponse();
2817-
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
2818-
if (r.getErr() != 0) {
2819-
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
2820-
clientPath);
2821+
2822+
Set<String> children = null;
2823+
GetChildrenPaginatedResponse response;
2824+
boolean isFirstPage = true;
2825+
boolean needNextPage = true;
2826+
2827+
while (needNextPage) {
2828+
request.setMinCzxid(minCzxId);
2829+
// If not the first page, always start from czxidOffset 0, to avoid the case:
2830+
// if a child with the same czxid is returned in the previous page, and then deleted
2831+
// on the server, the starting offset for the next page should be shifted smaller accordingly.
2832+
// If the next page still starts from czxidOffset, the children that not in the previous page
2833+
// but their offset is less than czxidOffset, they would be missed.
2834+
// HashSet is used to de-dup the duplicate children.
2835+
request.setCzxidOffset(isFirstPage ? czxidOffset : 0);
2836+
response = new GetChildrenPaginatedResponse();
2837+
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
2838+
if (r.getErr() != 0) {
2839+
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
2840+
path);
2841+
}
2842+
minCzxId = response.getNextPageCzxid();
2843+
czxidOffset = response.getNextPageCzxidOffset();
2844+
needNextPage = needNextPage(maxReturned, minCzxId, czxidOffset);
2845+
2846+
if (isFirstPage) {
2847+
// If all children are returned in the first page,
2848+
// no need to use hash set to de-dup children
2849+
if (!needNextPage) {
2850+
updateNextPage(nextPage, minCzxId, czxidOffset);
2851+
return response.getChildren();
2852+
}
2853+
children = new HashSet<>();
2854+
isFirstPage = false;
2855+
}
2856+
2857+
children.addAll(response.getChildren());
2858+
}
2859+
2860+
updateNextPage(nextPage, minCzxId, czxidOffset);
2861+
2862+
return new ArrayList<>(children);
2863+
}
2864+
2865+
/**
2866+
* Returns a list of all the children given the path.
2867+
* <p>
2868+
* The difference between this API and {@link #getChildren(String, boolean)} is:
2869+
* when there are lots of children and the network buffer exceeds {@code jute.maxbuffer},
2870+
* this API will fetch the children using pagination and be able to return all children;
2871+
* while {@link #getChildren(String, boolean)} will fail.
2872+
* <p>
2873+
* The final list of children returned is NOT strongly consistent with the server's data:
2874+
* the list might contain some deleted children if some children are deleted before the last page is fetched.
2875+
* <p>
2876+
* If the watch is true and the call is successful (no exception is thrown),
2877+
# a watch will be left on the node with the given path. The watch will be
2878+
* triggered by a successful operation that deletes the node of the given
2879+
* path or creates/deletes a child under the node.
2880+
* <p>
2881+
*
2882+
* @param path the path of the parent node
2883+
* @param watch whether or not leave a watch on the given node
2884+
* @return a list of all children of the given path
2885+
* @throws KeeperException if the server signals an error with a non-zero error code.
2886+
* @throws InterruptedException if the server transaction is interrupted.
2887+
*/
2888+
public List<String> getAllChildrenPaginated(String path, boolean watch)
2889+
throws KeeperException, InterruptedException {
2890+
return getChildren(path, watch ? watchManager.defaultWatcher : null, Integer.MAX_VALUE, 0L, 0, null);
2891+
}
2892+
2893+
private boolean needNextPage(int maxReturned, long minCzxId, int czxIdOffset) {
2894+
return maxReturned == Integer.MAX_VALUE
2895+
&& minCzxId != ZooDefs.GetChildrenPaginated.lastPageMinCzxid
2896+
&& czxIdOffset != ZooDefs.GetChildrenPaginated.lastPageMinCzxidOffset;
2897+
}
2898+
2899+
private void updateNextPage(PaginationNextPage nextPage, long minCzxId, int czxIdOffset) {
2900+
if (nextPage != null) {
2901+
nextPage.setMinCzxid(minCzxId);
2902+
nextPage.setMinCzxidOffset(czxIdOffset);
28212903
}
2822-
return response.getChildren();
28232904
}
28242905

28252906
/**
@@ -2853,7 +2934,7 @@ public List<PathWithStat> getChildren(final String path, Watcher watcher, final
28532934
*
28542935
* @since 3.6.2
28552936
*/
2856-
public RemoteIterator<PathWithStat> getChildrenIterator(String path, Watcher watcher, int batchSize, long minCzxId)
2937+
public RemoteIterator<String> getChildrenIterator(String path, Watcher watcher, int batchSize, long minCzxId)
28572938
throws KeeperException, InterruptedException {
28582939
return new ChildrenBatchIterator(this, path, watcher, batchSize, minCzxId);
28592940
}

0 commit comments

Comments
 (0)