Skip to content

Commit 94b2183

Browse files
committed
CLIENT-2706 In scan/query with max_records set, count discarded records that would have exceeded max_records. This count is used to correctly determine if a node’s partitions should be retried on the next scan/query page.
1 parent 916acf7 commit 94b2183

File tree

4 files changed

+23
-10
lines changed

4 files changed

+23
-10
lines changed

src/include/aerospike/as_partition_tracker.h

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ typedef struct as_node_partitions_s {
4444
as_vector parts_partial;
4545
uint64_t record_count;
4646
uint64_t record_max;
47+
uint64_t disallowed_count;
4748
uint32_t parts_unavailable;
4849
} as_node_partitions;
4950

@@ -138,17 +139,29 @@ as_partition_tracker_set_last(
138139
}
139140

140141
static inline bool
141-
as_partition_tracker_reached_max_records_sync(as_partition_tracker* pt)
142+
as_partition_tracker_reached_max_records_sync(as_partition_tracker* pt, as_node_partitions* np)
142143
{
143144
// Sync scan/query runs in multiple threads, so atomics are required.
144-
return pt && pt->check_max && (as_aaf_uint64(&pt->record_count, 1) > pt->max_records);
145+
if (pt && pt->check_max && (as_aaf_uint64(&pt->record_count, 1) > pt->max_records)) {
146+
// Record was returned, but would exceed max_records.
147+
// Discard record and increment disallowed_count.
148+
np->disallowed_count++;
149+
return true;
150+
}
151+
return false;
145152
}
146153

147154
static inline bool
148-
as_partition_tracker_reached_max_records_async(as_partition_tracker* pt)
155+
as_partition_tracker_reached_max_records_async(as_partition_tracker* pt, as_node_partitions* np)
149156
{
150157
// Async scan/query runs in a single event loop thread, so atomics are not necessary.
151-
return pt && pt->check_max && (++pt->record_count > pt->max_records);
158+
if (pt && pt->check_max && (++pt->record_count > pt->max_records)) {
159+
// Record was returned, but would exceed max_records.
160+
// Discard record and increment disallowed_count.
161+
np->disallowed_count++;
162+
return true;
163+
}
164+
return false;
152165
}
153166

154167
static inline uint16_t

src/main/aerospike/aerospike_query.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ as_query_parse_record_async(
336336
return status;
337337
}
338338

339-
if (as_partition_tracker_reached_max_records_async(qe->pt)) {
339+
if (as_partition_tracker_reached_max_records_async(qe->pt, qc->np)) {
340340
as_record_destroy(&rec);
341341
return AEROSPIKE_OK;
342342
}
@@ -471,7 +471,7 @@ as_query_parse_record(uint8_t** pp, as_msg* msg, as_query_task* task, as_error*
471471
return status;
472472
}
473473

474-
if (as_partition_tracker_reached_max_records_sync(task->pt)) {
474+
if (as_partition_tracker_reached_max_records_sync(task->pt, task->np)) {
475475
as_record_destroy(&rec);
476476
return AEROSPIKE_OK;
477477
}

src/main/aerospike/aerospike_scan.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ as_scan_parse_record_async(
201201
return status;
202202
}
203203

204-
if (as_partition_tracker_reached_max_records_async(se->pt)) {
204+
if (as_partition_tracker_reached_max_records_async(se->pt, sc->np)) {
205205
as_record_destroy(&rec);
206206
return AEROSPIKE_OK;
207207
}
@@ -305,7 +305,7 @@ as_scan_parse_record(uint8_t** pp, as_msg* msg, as_scan_task* task, as_error* er
305305
return status;
306306
}
307307

308-
if (as_partition_tracker_reached_max_records_sync(task->pt)) {
308+
if (as_partition_tracker_reached_max_records_sync(task->pt, task->np)) {
309309
as_record_destroy(&rec);
310310
return AEROSPIKE_OK;
311311
}

src/main/aerospike/as_partition_tracker.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ as_partition_tracker_is_complete(as_partition_tracker* pt, as_cluster* cluster,
446446
for (uint32_t i = 0; i < list->size; i++) {
447447
as_node_partitions* np = as_vector_get(list, i);
448448

449-
if (np->record_count >= np->record_max) {
449+
if (np->record_count + np->disallowed_count >= np->record_max) {
450450
mark_retry(pt, np);
451451
is_done = false;
452452
}
@@ -460,7 +460,7 @@ as_partition_tracker_is_complete(as_partition_tracker* pt, as_cluster* cluster,
460460
for (uint32_t i = 0; i < list->size; i++) {
461461
as_node_partitions* np = as_vector_get(list, i);
462462

463-
if (np->record_count > 0) {
463+
if (np->record_count + np->disallowed_count > 0) {
464464
mark_retry(pt, np);
465465
}
466466
}

0 commit comments

Comments
 (0)