Skip to content

Commit 9e2201e

Browse files
author
Alexander Lavrukov
committed
better-spliterator: Better spliterator
1 parent edaa8c4 commit 9e2201e

File tree

5 files changed

+426
-0
lines changed

5 files changed

+426
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import tech.ydb.proto.ValueProtos;
4+
5+
import java.util.List;
6+
7+
@FunctionalInterface
8+
public interface ResultConverter<V> {
9+
V convert(List<ValueProtos.Column> columns, ValueProtos.Value value);
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import tech.ydb.proto.ValueProtos;
4+
import tech.ydb.table.result.ResultSetReader;
5+
import tech.ydb.yoj.repository.ydb.client.YdbConverter;
6+
7+
import java.util.ArrayList;
8+
import java.util.Iterator;
9+
import java.util.List;
10+
import java.util.NoSuchElementException;
11+
12+
/* package */ final class ResultSetIterator<V> implements Iterator<V> {
13+
private final ResultSetReader resultSet;
14+
private final List<ValueProtos.Column> columns;
15+
16+
private final ResultConverter<V> converter;
17+
18+
private int position = 0;
19+
20+
public ResultSetIterator(ResultConverter<V> converter, ResultSetReader resultSet) {
21+
this.converter = converter;
22+
this.resultSet = resultSet;
23+
24+
if (resultSet.getRowCount() > 0) {
25+
columns = getColumns(resultSet);
26+
} else {
27+
columns = new ArrayList<>();
28+
}
29+
30+
this.resultSet.setRowIndex(0);
31+
}
32+
33+
@Override
34+
public boolean hasNext() {
35+
return position < resultSet.getRowCount();
36+
}
37+
38+
@Override
39+
public V next() {
40+
if (!hasNext()) {
41+
throw new NoSuchElementException();
42+
}
43+
44+
ValueProtos.Value value = buildValue(position++);
45+
46+
return converter.convert(columns, value);
47+
}
48+
49+
private ValueProtos.Value buildValue(int rowIndex) {
50+
resultSet.setRowIndex(rowIndex);
51+
ValueProtos.Value.Builder value = ValueProtos.Value.newBuilder();
52+
for (int col = 0; col < columns.size(); col++) {
53+
value.addItems(YdbConverter.convertValueToProto(resultSet.getColumn(col)));
54+
}
55+
return value.build();
56+
}
57+
58+
private static List<ValueProtos.Column> getColumns(ResultSetReader resultSet) {
59+
resultSet.setRowIndex(0);
60+
List<ValueProtos.Column> result = new ArrayList<>();
61+
for (int i = 0; i < resultSet.getColumnCount(); i++) {
62+
result.add(ValueProtos.Column.newBuilder()
63+
.setName(resultSet.getColumnName(i))
64+
.build());
65+
}
66+
return result;
67+
}
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import tech.ydb.table.result.ResultSetReader;
4+
import tech.ydb.yoj.ExperimentalApi;
5+
6+
import java.time.Duration;
7+
import java.util.Spliterator;
8+
import java.util.function.Consumer;
9+
import java.util.stream.Stream;
10+
import java.util.stream.StreamSupport;
11+
12+
@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42")
13+
public final class YdbSpliterator<V> implements Spliterator<V> {
14+
private static final Duration DEFAULT_STREAM_WORK_TIMEOUT = Duration.ofMinutes(5);
15+
16+
private final ResultConverter<V> converter;
17+
private final int flags;
18+
private final YdbSpliteratorQueue<ResultSetReader> queue;
19+
20+
private ResultSetIterator<V> resultIterator;
21+
22+
private boolean closed = false;
23+
24+
public YdbSpliterator(ResultConverter<V> converter, boolean isOrdered) {
25+
this(converter, isOrdered, DEFAULT_STREAM_WORK_TIMEOUT);
26+
}
27+
28+
private YdbSpliterator(ResultConverter<V> converter, boolean isOrdered, Duration streamWorkTimeout) {
29+
this.converter = converter;
30+
this.flags = (isOrdered ? ORDERED : 0) | NONNULL;
31+
this.queue = new YdbSpliteratorQueue<>(1, streamWorkTimeout);
32+
}
33+
34+
// Correct way to create stream with YdbSpliterator. onClose call is important for avoid supplier thread leak.
35+
public Stream<V> createStream() {
36+
return StreamSupport.stream(this, false).onClose(this::close);
37+
}
38+
39+
@Override
40+
public boolean tryAdvance(Consumer<? super V> action) {
41+
if (closed) {
42+
return false;
43+
}
44+
45+
if (resultIterator == null || !resultIterator.hasNext()) {
46+
ResultSetReader resultSet = queue.poll();
47+
if (resultSet == null) {
48+
closed = true;
49+
return false;
50+
}
51+
resultIterator = new ResultSetIterator<>(converter, resultSet);
52+
}
53+
54+
V value = resultIterator.next();
55+
56+
action.accept(value);
57+
58+
return true;
59+
}
60+
61+
public void close() {
62+
closed = true;
63+
queue.close();
64+
}
65+
66+
@Override
67+
public Spliterator<V> trySplit() {
68+
return null;
69+
}
70+
71+
@Override
72+
public long estimateSize() {
73+
return Long.MAX_VALUE;
74+
}
75+
76+
@Override
77+
public long getExactSizeIfKnown() {
78+
return -1;
79+
}
80+
81+
@Override
82+
public int characteristics() {
83+
return flags;
84+
}
85+
86+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import com.google.common.base.Preconditions;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import tech.ydb.yoj.ExperimentalApi;
7+
import tech.ydb.yoj.repository.db.exception.DeadlineExceededException;
8+
import tech.ydb.yoj.repository.db.exception.QueryInterruptedException;
9+
10+
import java.time.Duration;
11+
import java.util.ArrayDeque;
12+
import java.util.concurrent.TimeUnit;
13+
import java.util.concurrent.locks.Condition;
14+
import java.util.concurrent.locks.Lock;
15+
import java.util.concurrent.locks.ReentrantLock;
16+
17+
@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42")
18+
/* package */ final class YdbSpliteratorQueue<V> {
19+
private static final Logger log = LoggerFactory.getLogger(YdbSpliteratorQueue.class);
20+
21+
private static final SupplierStatus UNDONE_SUPPLIER_STATUS = () -> false;
22+
23+
private final int maxQueueSize;
24+
private final ArrayDeque<V> queue;
25+
private final long streamWorkDeadlineNanos;
26+
27+
private final Lock lock = new ReentrantLock();
28+
private final Condition newElement = lock.newCondition();
29+
private final Condition queueIsNotFull = lock.newCondition();
30+
31+
private SupplierStatus supplierStatus = UNDONE_SUPPLIER_STATUS;
32+
private boolean closed = false;
33+
34+
public YdbSpliteratorQueue(int maxQueueSize, Duration streamWorkTimeout) {
35+
Preconditions.checkArgument(maxQueueSize > 0, "maxQueueSize must be greater than 0");
36+
this.maxQueueSize = maxQueueSize;
37+
this.queue = new ArrayDeque<>(maxQueueSize);
38+
this.streamWorkDeadlineNanos = System.nanoTime() + TimeUnit.NANOSECONDS.toNanos(saturatedToNanos(streamWorkTimeout));
39+
}
40+
41+
public boolean onNext(V value) {
42+
Preconditions.checkState(supplierStatus.equals(UNDONE_SUPPLIER_STATUS),
43+
"can't call onNext after supplierDone"
44+
);
45+
46+
lock.lock();
47+
try {
48+
if (!awaitFreeSpaceLocked()) {
49+
return false;
50+
}
51+
52+
queue.add(value);
53+
54+
newElement.signal();
55+
} finally {
56+
lock.unlock();
57+
}
58+
59+
return true;
60+
}
61+
62+
public boolean awaitFreeSpace() {
63+
Preconditions.checkState(supplierStatus.equals(UNDONE_SUPPLIER_STATUS),
64+
"can't call onNext after supplierDone"
65+
);
66+
67+
lock.lock();
68+
try {
69+
return awaitFreeSpaceLocked();
70+
} finally {
71+
lock.unlock();
72+
}
73+
}
74+
75+
private boolean awaitFreeSpaceLocked() {
76+
if (closed) {
77+
return false;
78+
}
79+
80+
if (queue.size() != maxQueueSize) {
81+
return true;
82+
}
83+
84+
try {
85+
if (!queueIsNotFull.await(calculateTimeout(), TimeUnit.NANOSECONDS)) {
86+
throw new OfferDeadlineExceededException();
87+
}
88+
} catch (InterruptedException e) {
89+
Thread.currentThread().interrupt();
90+
throw new QueryInterruptedException("Supplier thread interrupted", e);
91+
}
92+
93+
return !closed;
94+
}
95+
96+
// (supplier thread) Send knowledge to stream when data is over.
97+
public void supplierDone(SupplierStatus status) {
98+
lock.lock();
99+
try {
100+
if (closed) {
101+
return;
102+
}
103+
104+
supplierStatus = status;
105+
106+
newElement.signal();
107+
} finally {
108+
lock.unlock();
109+
}
110+
}
111+
112+
public boolean isClosed() {
113+
lock.lock();
114+
try {
115+
return closed;
116+
} finally {
117+
lock.unlock();
118+
}
119+
}
120+
121+
public V poll() {
122+
lock.lock();
123+
try {
124+
if (closed) {
125+
return null;
126+
}
127+
128+
if (queue.isEmpty()) {
129+
if (supplierStatus.isDone()) {
130+
return null;
131+
}
132+
133+
try {
134+
if (!newElement.await(calculateTimeout(), TimeUnit.NANOSECONDS)) {
135+
log.warn("Supplier thread was closed because consumer didn't poll an element of stream on timeout");
136+
throw new DeadlineExceededException("Stream deadline exceeded on poll");
137+
}
138+
} catch (InterruptedException e) {
139+
Thread.currentThread().interrupt();
140+
throw new QueryInterruptedException("Consumer thread interrupted", e);
141+
}
142+
143+
if (closed || supplierStatus.isDone()) {
144+
return null;
145+
}
146+
}
147+
148+
V value = queue.pop();
149+
150+
queueIsNotFull.signal();
151+
152+
return value;
153+
} finally {
154+
lock.unlock();
155+
}
156+
}
157+
158+
public void close() {
159+
lock.lock();
160+
try {
161+
if (closed) {
162+
return;
163+
}
164+
165+
closed = true;
166+
167+
queueIsNotFull.signal();
168+
newElement.signalAll();
169+
} finally {
170+
lock.unlock();
171+
}
172+
}
173+
174+
private long calculateTimeout() {
175+
return TimeUnit.NANOSECONDS.toNanos(streamWorkDeadlineNanos - System.nanoTime());
176+
}
177+
178+
public static final class OfferDeadlineExceededException extends RuntimeException {
179+
}
180+
181+
// copy-paste from com.google.common.util.concurrent.Uninterruptibles
182+
private static long saturatedToNanos(Duration duration) {
183+
try {
184+
return duration.toNanos();
185+
} catch (ArithmeticException ignore) {
186+
return duration.isNegative() ? -9223372036854775808L : 9223372036854775807L;
187+
}
188+
}
189+
190+
@FunctionalInterface
191+
public interface SupplierStatus {
192+
boolean isDone();
193+
}
194+
}

0 commit comments

Comments
 (0)