Skip to content

Commit d4ffa8a

Browse files
author
Alexander Lavrukov
committed
better-spliterator: Better spliterator
1 parent 58cba6e commit d4ffa8a

File tree

11 files changed

+428
-31
lines changed

11 files changed

+428
-31
lines changed

repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -357,13 +357,13 @@ public void streamAll() {
357357
assertThatExceptionOfType(IllegalArgumentException.class)
358358
.isThrownBy(() -> db.tx(() -> db.projects().streamAll(5001)));
359359
}
360-
360+
361361
private static <ID extends Entity.Id<?>> ReadTableParams<ID> defaultReadTableParamsNonLegacy() {
362362
return RepositoryTest.<ID>buildReadTableParamsNonLegacy().build();
363363
}
364364

365365
private static <ID extends Entity.Id<?>> ReadTableParams.ReadTableParamsBuilder<ID> buildReadTableParamsNonLegacy() {
366-
return ReadTableParams.<ID>builder().useNewSpliterator(true);
366+
return ReadTableParams.<ID>builder().useNewSpliterator2(true);
367367
}
368368

369369
@Test

repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java

+41-6
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
import tech.ydb.core.Result;
1212
import tech.ydb.core.Status;
1313
import tech.ydb.core.StatusCode;
14+
import tech.ydb.core.grpc.GrpcReadStream;
1415
import tech.ydb.proto.ValueProtos;
1516
import tech.ydb.table.Session;
1617
import tech.ydb.table.query.DataQueryResult;
1718
import tech.ydb.table.query.Params;
19+
import tech.ydb.table.query.ReadTablePart;
1820
import tech.ydb.table.result.ResultSetReader;
1921
import tech.ydb.table.settings.BulkUpsertSettings;
2022
import tech.ydb.table.settings.CommitTxSettings;
@@ -54,14 +56,23 @@
5456
import tech.ydb.yoj.repository.ydb.exception.YdbRepositoryException;
5557
import tech.ydb.yoj.repository.ydb.merge.QueriesMerger;
5658
import tech.ydb.yoj.repository.ydb.readtable.ReadTableMapper;
59+
import tech.ydb.yoj.repository.ydb.spliterator.ClosableSpliterator;
60+
import tech.ydb.yoj.repository.ydb.spliterator.ResultSetIterator;
61+
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliterator;
62+
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliteratorQueue;
63+
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliteratorQueueGrpcStreamAdapter;
64+
import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbLegacySpliterator;
65+
import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbNewLegacySpliterator;
5766
import tech.ydb.yoj.repository.ydb.statement.Statement;
5867
import tech.ydb.yoj.repository.ydb.table.YdbTable;
5968
import tech.ydb.yoj.util.lang.Interrupts;
6069

6170
import java.time.Duration;
6271
import java.util.ArrayList;
72+
import java.util.Iterator;
6373
import java.util.List;
6474
import java.util.Map;
75+
import java.util.concurrent.CompletableFuture;
6576
import java.util.concurrent.TimeUnit;
6677
import java.util.function.Supplier;
6778
import java.util.stream.Collectors;
@@ -78,7 +89,7 @@ public class YdbRepositoryTransaction<REPO extends YdbRepository>
7889
private static final Logger log = LoggerFactory.getLogger(YdbRepositoryTransaction.class);
7990

8091
private final List<YdbRepository.Query<?>> pendingWrites = new ArrayList<>();
81-
private final List<YdbSpliterator<?>> spliterators = new ArrayList<>();
92+
private final List<ClosableSpliterator<?>> spliterators = new ArrayList<>();
8293

8394
@Getter
8495
private final TxOptions options;
@@ -102,8 +113,8 @@ public YdbRepositoryTransaction(REPO repo, @NonNull TxOptions options) {
102113
this.cache = options.isFirstLevelCache() ? new RepositoryCacheImpl() : RepositoryCache.empty();
103114
}
104115

105-
private <V> YdbSpliterator<V> createSpliterator(String request, boolean isOrdered) {
106-
YdbSpliterator<V> spliterator = new YdbSpliterator<>(request, isOrdered);
116+
private <V> YdbNewLegacySpliterator<V> createSpliterator(String request, boolean isOrdered) {
117+
YdbNewLegacySpliterator<V> spliterator = new YdbNewLegacySpliterator<>(request, isOrdered);
107118
spliterators.add(spliterator);
108119
return spliterator;
109120
}
@@ -153,7 +164,7 @@ private void doCommit() {
153164

154165
private void closeStreams() {
155166
Exception summaryException = null;
156-
for (YdbSpliterator<?> spliterator : spliterators) {
167+
for (ClosableSpliterator<?> spliterator : spliterators) {
157168
try {
158169
spliterator.close();
159170
} catch (Exception e) {
@@ -387,7 +398,7 @@ public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT
387398
String yql = getYql(statement);
388399
Params sdkParams = getSdkParams(statement, params);
389400

390-
YdbSpliterator<RESULT> spliterator = createSpliterator("scanQuery: " + yql, false);
401+
YdbNewLegacySpliterator<RESULT> spliterator = createSpliterator("scanQuery: " + yql, false);
391402

392403
initSession();
393404
session.executeScanQuery(
@@ -489,7 +500,7 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
489500
}
490501

491502
if (params.isUseNewSpliterator()) {
492-
YdbSpliterator<RESULT> spliterator = createSpliterator("readTable: " + tableName, params.isOrdered());
503+
YdbNewLegacySpliterator<RESULT> spliterator = createSpliterator("readTable: " + tableName, params.isOrdered());
493504

494505
initSession();
495506
session.readTable(
@@ -500,6 +511,30 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
500511
return spliterator.createStream();
501512
}
502513

514+
if (params.isUseNewSpliterator2()) {
515+
initSession();
516+
517+
// TODO: configure stream timeout
518+
YdbSpliteratorQueue<Iterator<RESULT>> queue = new YdbSpliteratorQueue<>(1, Duration.ofMinutes(5));
519+
520+
var adapter = new YdbSpliteratorQueueGrpcStreamAdapter<>("readTable: " + tableName, queue);
521+
GrpcReadStream<ReadTablePart> grpcStream = session.executeReadTable(tableName, settings.build());
522+
CompletableFuture<Status> future = grpcStream.start(readTablePart -> {
523+
ResultSetIterator<RESULT> iterator = new ResultSetIterator<>(
524+
readTablePart.getResultSetReader(),
525+
mapper::mapResult
526+
);
527+
adapter.onNext(iterator);
528+
});
529+
future.whenComplete(adapter::onSupplierThreadComplete);
530+
531+
YdbSpliterator<RESULT> spliterator = new YdbSpliterator<>(queue, params.isOrdered());
532+
533+
spliterators.add(spliterator);
534+
535+
return spliterator.createStream();
536+
}
537+
503538
try {
504539
YdbLegacySpliterator<RESULT> spliterator = new YdbLegacySpliterator<>(params.isOrdered(), action ->
505540
doCall("read table " + mapper.getTableName(""), () -> {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import java.util.Spliterator;
4+
5+
public interface ClosableSpliterator<V> extends Spliterator<V> {
6+
void close();
7+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import tech.ydb.proto.ValueProtos;
4+
import tech.ydb.table.result.ResultSetReader;
5+
import tech.ydb.yoj.repository.ydb.client.YdbConverter;
6+
7+
import java.util.ArrayList;
8+
import java.util.Iterator;
9+
import java.util.List;
10+
import java.util.NoSuchElementException;
11+
12+
public final class ResultSetIterator<V> implements Iterator<V> {
13+
private final ResultSetReader resultSet;
14+
private final ResultConverter<V> converter;
15+
private final List<ValueProtos.Column> columns;
16+
17+
private int position = 0;
18+
19+
public ResultSetIterator(ResultSetReader resultSet, ResultConverter<V> converter) {
20+
List<ValueProtos.Column> columns;
21+
if (resultSet.getRowCount() > 0) {
22+
resultSet.setRowIndex(0);
23+
columns = getColumns(resultSet);
24+
} else {
25+
columns = new ArrayList<>();
26+
}
27+
28+
this.resultSet = resultSet;
29+
this.converter = converter;
30+
this.columns = columns;
31+
}
32+
33+
@Override
34+
public boolean hasNext() {
35+
return position < resultSet.getRowCount();
36+
}
37+
38+
@Override
39+
public V next() {
40+
if (!hasNext()) {
41+
throw new NoSuchElementException();
42+
}
43+
44+
ValueProtos.Value value = buildValue(position++);
45+
46+
return converter.convert(columns, value);
47+
}
48+
49+
private ValueProtos.Value buildValue(int rowIndex) {
50+
resultSet.setRowIndex(rowIndex);
51+
ValueProtos.Value.Builder value = ValueProtos.Value.newBuilder();
52+
for (int i = 0; i < columns.size(); i++) {
53+
value.addItems(YdbConverter.convertValueToProto(resultSet.getColumn(i)));
54+
}
55+
return value.build();
56+
}
57+
58+
private static List<ValueProtos.Column> getColumns(ResultSetReader resultSet) {
59+
List<ValueProtos.Column> columns = new ArrayList<>();
60+
for (int i = 0; i < resultSet.getColumnCount(); i++) {
61+
columns.add(ValueProtos.Column.newBuilder()
62+
.setName(resultSet.getColumnName(i))
63+
.build()
64+
);
65+
}
66+
return columns;
67+
}
68+
69+
@FunctionalInterface
70+
public interface ResultConverter<V> {
71+
V convert(List<ValueProtos.Column> columns, ValueProtos.Value value);
72+
}
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import tech.ydb.yoj.ExperimentalApi;
4+
5+
import javax.annotation.Nullable;
6+
import java.util.Iterator;
7+
import java.util.Spliterator;
8+
import java.util.function.Consumer;
9+
import java.util.stream.Stream;
10+
import java.util.stream.StreamSupport;
11+
12+
@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42")
13+
public final class YdbSpliterator<V> implements ClosableSpliterator<V> {
14+
private final YdbSpliteratorQueue<Iterator<V>> queue;
15+
private final int flags;
16+
17+
private Iterator<V> valueIterator;
18+
19+
private boolean closed = false;
20+
21+
public YdbSpliterator(YdbSpliteratorQueue<Iterator<V>> queue, boolean isOrdered) {
22+
this.queue = queue;
23+
this.flags = (isOrdered ? ORDERED : 0) | NONNULL;
24+
}
25+
26+
// Correct way to create stream with YdbSpliterator. onClose call is important for avoid supplier thread leak.
27+
public Stream<V> createStream() {
28+
return StreamSupport.stream(this, false).onClose(this::close);
29+
}
30+
31+
@Override
32+
public void close() {
33+
if (closed) {
34+
return;
35+
}
36+
closed = true;
37+
queue.close();
38+
}
39+
40+
@Override
41+
public boolean tryAdvance(Consumer<? super V> action) {
42+
if (closed) {
43+
return false;
44+
}
45+
46+
// WARNING: At one point in time, this spliterator will store up to queue.size() + 2 blocks from YDB in memory.
47+
// One block right here, one in the queue, one in the grpc thread, waiting for free space in the queue.
48+
// Maximum response size in YDB - 50mb. It means that it could be up to 150mb for spliterator.
49+
valueIterator = getValueIterator(valueIterator, queue);
50+
if (valueIterator == null) {
51+
close();
52+
return false;
53+
}
54+
55+
V value = valueIterator.next();
56+
57+
action.accept(value);
58+
59+
return true;
60+
}
61+
62+
/*
63+
* Returns not empty valueIterator, null in case of end of stream
64+
*/
65+
@Nullable
66+
private static <V> Iterator<V> getValueIterator(
67+
@Nullable Iterator<V> valueIterator, YdbSpliteratorQueue<Iterator<V>> queue
68+
) {
69+
// valueIterator could be null only on first call of tryAdvance
70+
if (valueIterator == null) {
71+
valueIterator = queue.poll();
72+
if (valueIterator == null) {
73+
return null;
74+
}
75+
}
76+
77+
// queue could return empty iterator, we have to select one with elements
78+
while (!valueIterator.hasNext()) {
79+
valueIterator = queue.poll();
80+
if (valueIterator == null) {
81+
return null;
82+
}
83+
}
84+
85+
return valueIterator;
86+
}
87+
88+
@Override
89+
public Spliterator<V> trySplit() {
90+
return null;
91+
}
92+
93+
@Override
94+
public long estimateSize() {
95+
return Long.MAX_VALUE;
96+
}
97+
98+
@Override
99+
public long getExactSizeIfKnown() {
100+
return -1;
101+
}
102+
103+
@Override
104+
public int characteristics() {
105+
return flags;
106+
}
107+
}

0 commit comments

Comments
 (0)