Skip to content

Commit 6b40565

Browse files
authored
Fix scan with limit behavior in DynamoDB adapter (#2291)
1 parent 1d34211 commit 6b40565

File tree

7 files changed

+243
-84
lines changed

7 files changed

+243
-84
lines changed

Diff for: core/src/main/java/com/scalar/db/storage/dynamo/QueryScanner.java

+22-8
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,34 @@
1313
import java.util.Optional;
1414
import javax.annotation.Nonnull;
1515
import javax.annotation.Nullable;
16+
import javax.annotation.concurrent.NotThreadSafe;
1617
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
1718

19+
@NotThreadSafe
1820
public class QueryScanner implements Scanner {
1921

2022
private final PaginatedRequest request;
2123
private final ResultInterpreter resultInterpreter;
2224

2325
private Iterator<Map<String, AttributeValue>> itemsIterator;
26+
@Nullable private Integer remainingLimit;
2427
@Nullable private Map<String, AttributeValue> lastEvaluatedKey;
25-
private int totalResultCount;
2628

2729
private ScannerIterator scannerIterator;
2830

2931
@SuppressFBWarnings("EI_EXPOSE_REP2")
30-
public QueryScanner(PaginatedRequest request, ResultInterpreter resultInterpreter) {
32+
public QueryScanner(PaginatedRequest request, int limit, ResultInterpreter resultInterpreter) {
3133
this.request = request;
32-
this.resultInterpreter = resultInterpreter;
3334

34-
handleResponse(request.execute());
35+
if (limit > 0) {
36+
remainingLimit = limit;
37+
handleResponse(request.execute(limit));
38+
} else {
39+
remainingLimit = null;
40+
handleResponse(request.execute());
41+
}
42+
43+
this.resultInterpreter = resultInterpreter;
3544
}
3645

3746
@Override
@@ -49,18 +58,23 @@ private boolean hasNext() {
4958
return true;
5059
}
5160
if (lastEvaluatedKey != null) {
52-
handleResponse(request.execute(lastEvaluatedKey));
61+
if (remainingLimit != null) {
62+
handleResponse(request.execute(lastEvaluatedKey, remainingLimit));
63+
} else {
64+
handleResponse(request.execute(lastEvaluatedKey));
65+
}
5366
return itemsIterator.hasNext();
5467
}
5568
return false;
5669
}
5770

5871
private void handleResponse(PaginatedRequestResponse response) {
5972
List<Map<String, AttributeValue>> items = response.items();
60-
totalResultCount += items.size();
73+
if (remainingLimit != null) {
74+
remainingLimit -= items.size();
75+
}
6176
itemsIterator = items.iterator();
62-
if ((request.limit() == null || totalResultCount < request.limit())
63-
&& response.hasLastEvaluatedKey()) {
77+
if ((remainingLimit == null || remainingLimit > 0) && response.hasLastEvaluatedKey()) {
6478
lastEvaluatedKey = response.lastEvaluatedKey();
6579
} else {
6680
lastEvaluatedKey = null;

Diff for: core/src/main/java/com/scalar/db/storage/dynamo/SelectStatementHandler.java

+10-14
Original file line numberDiff line numberDiff line change
@@ -142,16 +142,16 @@ private Scanner executeScanWithIndex(Selection selection, TableMetadata tableMet
142142

143143
builder.expressionAttributeNames(expressionAttributeNames);
144144

145+
int limit = 0;
145146
if (selection instanceof Scan) {
146147
Scan scan = (Scan) selection;
147-
if (scan.getLimit() > 0) {
148-
builder.limit(scan.getLimit());
149-
}
148+
limit = scan.getLimit();
150149
}
150+
151151
com.scalar.db.storage.dynamo.request.QueryRequest request =
152152
new com.scalar.db.storage.dynamo.request.QueryRequest(client, builder.build());
153153
return new QueryScanner(
154-
request, new ResultInterpreter(selection.getProjections(), tableMetadata));
154+
request, limit, new ResultInterpreter(selection.getProjections(), tableMetadata));
155155
}
156156

157157
private Scanner executeScan(Scan scan, TableMetadata tableMetadata) {
@@ -171,10 +171,6 @@ private Scanner executeScan(Scan scan, TableMetadata tableMetadata) {
171171
}
172172
}
173173

174-
if (scan.getLimit() > 0) {
175-
builder.limit(scan.getLimit());
176-
}
177-
178174
if (!scan.getProjections().isEmpty()) {
179175
Map<String, String> expressionAttributeNames = new HashMap<>();
180176
projectionExpression(builder, scan, expressionAttributeNames);
@@ -184,20 +180,17 @@ private Scanner executeScan(Scan scan, TableMetadata tableMetadata) {
184180
if (scan.getConsistency() != Consistency.EVENTUAL) {
185181
builder.consistentRead(true);
186182
}
183+
187184
com.scalar.db.storage.dynamo.request.QueryRequest queryRequest =
188185
new com.scalar.db.storage.dynamo.request.QueryRequest(client, builder.build());
189186
return new QueryScanner(
190-
queryRequest, new ResultInterpreter(scan.getProjections(), tableMetadata));
187+
queryRequest, scan.getLimit(), new ResultInterpreter(scan.getProjections(), tableMetadata));
191188
}
192189

193190
private Scanner executeFullScan(ScanAll scan, TableMetadata tableMetadata) {
194191
DynamoOperation dynamoOperation = new DynamoOperation(scan, tableMetadata);
195192
ScanRequest.Builder builder = ScanRequest.builder().tableName(dynamoOperation.getTableName());
196193

197-
if (scan.getLimit() > 0) {
198-
builder.limit(scan.getLimit());
199-
}
200-
201194
if (!scan.getProjections().isEmpty()) {
202195
Map<String, String> expressionAttributeNames = new HashMap<>();
203196
projectionExpression(builder, scan, expressionAttributeNames);
@@ -207,10 +200,13 @@ private Scanner executeFullScan(ScanAll scan, TableMetadata tableMetadata) {
207200
if (scan.getConsistency() != Consistency.EVENTUAL) {
208201
builder.consistentRead(true);
209202
}
203+
210204
com.scalar.db.storage.dynamo.request.ScanRequest requestWrapper =
211205
new com.scalar.db.storage.dynamo.request.ScanRequest(client, builder.build());
212206
return new QueryScanner(
213-
requestWrapper, new ResultInterpreter(scan.getProjections(), tableMetadata));
207+
requestWrapper,
208+
scan.getLimit(),
209+
new ResultInterpreter(scan.getProjections(), tableMetadata));
214210
}
215211

216212
private void projectionExpression(

Diff for: core/src/main/java/com/scalar/db/storage/dynamo/request/PaginatedRequest.java

+18-6
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,34 @@ public interface PaginatedRequest {
88
/**
99
* Execute the request
1010
*
11-
* @return the request response
11+
* @return the response
1212
*/
1313
PaginatedRequestResponse execute();
1414

15+
/**
16+
* Execute the request with limit
17+
*
18+
* @param limit the maximum number of items to evaluate (not necessarily the number of matching
19+
* items)
20+
* @return the response
21+
*/
22+
PaginatedRequestResponse execute(int limit);
23+
1524
/**
1625
* Execute the request that will be evaluated starting from the given start key
1726
*
18-
* @param exclusiveStartKey The primary key of the first item that this operation will evaluate.
19-
* @return the request response
27+
* @param exclusiveStartKey the primary key of the first item that this operation will evaluate.
28+
* @return the response
2029
*/
2130
PaginatedRequestResponse execute(Map<String, AttributeValue> exclusiveStartKey);
2231

2332
/**
24-
* Returns the request limit
33+
* Execute the request that will be evaluated starting from the given start key with limit
2534
*
26-
* @return the request limit
35+
* @param exclusiveStartKey the primary key of the first item that this operation will evaluate.
36+
* @param limit the maximum number of items to evaluate (not necessarily the number of matching
37+
* items)
38+
* @return the response
2739
*/
28-
Integer limit();
40+
PaginatedRequestResponse execute(Map<String, AttributeValue> exclusiveStartKey, int limit);
2941
}

Diff for: core/src/main/java/com/scalar/db/storage/dynamo/request/QueryRequest.java

+26-10
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,21 @@ public QueryRequest(
1919
this.dynamoRequest = dynamoRequest;
2020
}
2121

22+
@Override
23+
public PaginatedRequestResponse execute() {
24+
QueryResponse response = client.query(dynamoRequest);
25+
26+
return new PaginatedRequestResponse(
27+
response.items(), response.hasLastEvaluatedKey(), response.lastEvaluatedKey());
28+
}
29+
30+
@Override
31+
public PaginatedRequestResponse execute(int limit) {
32+
QueryRequest request =
33+
new QueryRequest(this.client, this.dynamoRequest.toBuilder().limit(limit).build());
34+
return request.execute();
35+
}
36+
2237
@Override
2338
public PaginatedRequestResponse execute(Map<String, AttributeValue> exclusiveStartKey) {
2439
QueryRequest request =
@@ -29,15 +44,16 @@ public PaginatedRequestResponse execute(Map<String, AttributeValue> exclusiveSta
2944
}
3045

3146
@Override
32-
public PaginatedRequestResponse execute() {
33-
QueryResponse response = client.query(dynamoRequest);
34-
35-
return new PaginatedRequestResponse(
36-
response.items(), response.hasLastEvaluatedKey(), response.lastEvaluatedKey());
37-
}
38-
39-
@Override
40-
public Integer limit() {
41-
return dynamoRequest.limit();
47+
public PaginatedRequestResponse execute(
48+
Map<String, AttributeValue> exclusiveStartKey, int limit) {
49+
QueryRequest request =
50+
new QueryRequest(
51+
this.client,
52+
this.dynamoRequest
53+
.toBuilder()
54+
.exclusiveStartKey(exclusiveStartKey)
55+
.limit(limit)
56+
.build());
57+
return request.execute();
4258
}
4359
}

Diff for: core/src/main/java/com/scalar/db/storage/dynamo/request/ScanRequest.java

+26-10
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,21 @@ public ScanRequest(
1919
this.dynamoRequest = dynamoRequest;
2020
}
2121

22+
@Override
23+
public PaginatedRequestResponse execute() {
24+
ScanResponse response = client.scan(dynamoRequest);
25+
26+
return new PaginatedRequestResponse(
27+
response.items(), response.hasLastEvaluatedKey(), response.lastEvaluatedKey());
28+
}
29+
30+
@Override
31+
public PaginatedRequestResponse execute(int limit) {
32+
ScanRequest request =
33+
new ScanRequest(this.client, this.dynamoRequest.toBuilder().limit(limit).build());
34+
return request.execute();
35+
}
36+
2237
@Override
2338
public PaginatedRequestResponse execute(Map<String, AttributeValue> exclusiveStartKey) {
2439
ScanRequest request =
@@ -29,15 +44,16 @@ public PaginatedRequestResponse execute(Map<String, AttributeValue> exclusiveSta
2944
}
3045

3146
@Override
32-
public PaginatedRequestResponse execute() {
33-
ScanResponse response = client.scan(dynamoRequest);
34-
35-
return new PaginatedRequestResponse(
36-
response.items(), response.hasLastEvaluatedKey(), response.lastEvaluatedKey());
37-
}
38-
39-
@Override
40-
public Integer limit() {
41-
return dynamoRequest.limit();
47+
public PaginatedRequestResponse execute(
48+
Map<String, AttributeValue> exclusiveStartKey, int limit) {
49+
ScanRequest request =
50+
new ScanRequest(
51+
this.client,
52+
this.dynamoRequest
53+
.toBuilder()
54+
.exclusiveStartKey(exclusiveStartKey)
55+
.limit(limit)
56+
.build());
57+
return request.execute();
4258
}
4359
}

Diff for: core/src/test/java/com/scalar/db/storage/dynamo/QueryScannerTest.java

+16-21
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,10 @@ public void one_ShouldReturnResult() {
4040
Map<String, AttributeValue> item = Collections.emptyMap();
4141
List<Map<String, AttributeValue>> items = Arrays.asList(item, item, item);
4242
when(request.execute()).thenReturn(response);
43-
when(request.limit()).thenReturn(null);
4443
when(response.items()).thenReturn(items);
4544
when(resultInterpreter.interpret(item)).thenReturn(result);
4645

47-
QueryScanner queryScanner = new QueryScanner(request, resultInterpreter);
46+
QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter);
4847

4948
// Act
5049
Optional<Result> actual1 = queryScanner.one();
@@ -71,11 +70,10 @@ public void all_ShouldReturnResults() {
7170
Map<String, AttributeValue> item = Collections.emptyMap();
7271
List<Map<String, AttributeValue>> items = Arrays.asList(item, item, item);
7372
when(request.execute()).thenReturn(response);
74-
when(request.limit()).thenReturn(null);
7573
when(response.items()).thenReturn(items);
7674
when(resultInterpreter.interpret(item)).thenReturn(result);
7775

78-
QueryScanner queryScanner = new QueryScanner(request, resultInterpreter);
76+
QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter);
7977

8078
// Act
8179
List<Result> results1 = queryScanner.all();
@@ -100,9 +98,8 @@ public void iterator_ShouldReturnResults() {
10098
when(response.items()).thenReturn(items);
10199
when(resultInterpreter.interpret(item)).thenReturn(result);
102100
when(request.execute()).thenReturn(response);
103-
when(request.limit()).thenReturn(null);
104101

105-
QueryScanner queryScanner = new QueryScanner(request, resultInterpreter);
102+
QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter);
106103

107104
// Act
108105
Iterator<Result> iterator = queryScanner.iterator();
@@ -134,9 +131,8 @@ public void one_ResponseWithLastEvaluatedKey_ShouldReturnResults() {
134131
when(resultInterpreter.interpret(item)).thenReturn(result);
135132
when(request.execute()).thenReturn(response);
136133
when(request.execute(lastEvaluatedKey)).thenReturn(response);
137-
when(request.limit()).thenReturn(null);
138134

139-
QueryScanner queryScanner = new QueryScanner(request, resultInterpreter);
135+
QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter);
140136

141137
// Act
142138
Optional<Result> actual1 = queryScanner.one();
@@ -164,26 +160,27 @@ public void one_ResponseWithLastEvaluatedKey_ShouldReturnResults() {
164160
@Test
165161
public void one_RequestWithLimitAndResponseWithLastEvaluatedKey_ShouldReturnResults() {
166162
// Arrange
163+
int limit = 3;
164+
167165
Map<String, AttributeValue> item = Collections.emptyMap();
168-
List<Map<String, AttributeValue>> items = Arrays.asList(item, item);
166+
List<Map<String, AttributeValue>> items1 = Arrays.asList(item, item);
167+
List<Map<String, AttributeValue>> items2 = Collections.singletonList(item);
169168
Map<String, AttributeValue> lastEvaluatedKey = Collections.emptyMap();
170169

171-
when(request.limit()).thenReturn(4);
172-
when(response.items()).thenReturn(items);
170+
when(response.items()).thenReturn(items1).thenReturn(items2);
173171
when(response.hasLastEvaluatedKey()).thenReturn(true);
174172
when(response.lastEvaluatedKey()).thenReturn(lastEvaluatedKey);
175-
when(request.execute()).thenReturn(response);
176-
when(request.execute(lastEvaluatedKey)).thenReturn(response);
173+
when(request.execute(limit)).thenReturn(response);
174+
when(request.execute(lastEvaluatedKey, limit - items1.size())).thenReturn(response);
177175
when(resultInterpreter.interpret(item)).thenReturn(result);
178176

179-
QueryScanner queryScanner = new QueryScanner(request, resultInterpreter);
177+
QueryScanner queryScanner = new QueryScanner(request, limit, resultInterpreter);
180178

181179
// Act
182180
Optional<Result> actual1 = queryScanner.one();
183181
Optional<Result> actual2 = queryScanner.one();
184182
Optional<Result> actual3 = queryScanner.one();
185183
Optional<Result> actual4 = queryScanner.one();
186-
Optional<Result> actual5 = queryScanner.one();
187184

188185
// Assert
189186
assertThat(actual1).isPresent();
@@ -192,12 +189,10 @@ public void one_RequestWithLimitAndResponseWithLastEvaluatedKey_ShouldReturnResu
192189
assertThat(actual2.get()).isEqualTo(result);
193190
assertThat(actual3).isPresent();
194191
assertThat(actual3.get()).isEqualTo(result);
195-
assertThat(actual4).isPresent();
196-
assertThat(actual4.get()).isEqualTo(result);
197-
assertThat(actual5).isNotPresent();
192+
assertThat(actual4).isNotPresent();
198193

199-
verify(resultInterpreter, times(4)).interpret(item);
200-
verify(request).execute(lastEvaluatedKey);
201-
verify(request).execute();
194+
verify(resultInterpreter, times(limit)).interpret(item);
195+
verify(request).execute(limit);
196+
verify(request).execute(lastEvaluatedKey, limit - items1.size());
202197
}
203198
}

0 commit comments

Comments
 (0)