Skip to content

Commit 44e709c

Browse files
committed
Added tests for ExecuteQuery
1 parent 943a256 commit 44e709c

File tree

1 file changed

+214
-0
lines changed

1 file changed

+214
-0
lines changed
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
package tech.ydb.query.impl;
2+
3+
import java.time.Duration;
4+
import java.util.List;
5+
import java.util.Random;
6+
import java.util.concurrent.CompletableFuture;
7+
import java.util.concurrent.Semaphore;
8+
import java.util.concurrent.atomic.AtomicLong;
9+
import java.util.concurrent.atomic.AtomicReference;
10+
import java.util.function.IntConsumer;
11+
import java.util.stream.Collectors;
12+
import java.util.stream.LongStream;
13+
14+
import com.google.common.hash.Hashing;
15+
import org.junit.AfterClass;
16+
import org.junit.Assert;
17+
import org.junit.BeforeClass;
18+
import org.junit.ClassRule;
19+
import org.junit.Test;
20+
21+
import tech.ydb.common.transaction.TxMode;
22+
import tech.ydb.core.Result;
23+
import tech.ydb.core.grpc.GrpcFlowControl;
24+
import tech.ydb.query.QueryClient;
25+
import tech.ydb.query.QuerySession;
26+
import tech.ydb.query.QueryStream;
27+
import tech.ydb.query.result.QueryInfo;
28+
import tech.ydb.query.settings.ExecuteQuerySettings;
29+
import tech.ydb.table.SessionRetryContext;
30+
import tech.ydb.table.description.TableDescription;
31+
import tech.ydb.table.impl.SimpleTableClient;
32+
import tech.ydb.table.query.Params;
33+
import tech.ydb.table.rpc.grpc.GrpcTableRpc;
34+
import tech.ydb.table.settings.BulkUpsertSettings;
35+
import tech.ydb.table.values.ListType;
36+
import tech.ydb.table.values.PrimitiveType;
37+
import tech.ydb.table.values.PrimitiveValue;
38+
import tech.ydb.table.values.StructType;
39+
import tech.ydb.table.values.StructValue;
40+
import tech.ydb.test.junit4.GrpcTransportRule;
41+
42+
43+
/**
44+
*
45+
* @author Aleksandr Gorshenin
46+
*/
47+
public class QueryExecuteTest {
48+
@ClassRule
49+
public static final GrpcTransportRule YDB = new GrpcTransportRule();
50+
private static final String TEST_TABLE = "big_table_test";
51+
private static final long TEST_TABLE_SIZE = 5000;
52+
53+
private static byte[] pseudoRndData(long seed) {
54+
Random rnd = new Random(seed);
55+
int length = rnd.nextInt(256) + 256;
56+
byte[] data = new byte[length];
57+
rnd.nextBytes(data);
58+
return data;
59+
}
60+
61+
@BeforeClass
62+
public static void prepareTable() {
63+
SimpleTableClient client = SimpleTableClient.newClient(GrpcTableRpc.useTransport(YDB)).build();
64+
SessionRetryContext retryCtx = SessionRetryContext.create(client).build();
65+
String tablePath = YDB.getDatabase() + "/" + TEST_TABLE;
66+
67+
TableDescription tableDesc = TableDescription.newBuilder()
68+
.addNonnullColumn("id", PrimitiveType.Int64)
69+
.addNonnullColumn("hash", PrimitiveType.Text)
70+
.addNullableColumn("data", PrimitiveType.Bytes)
71+
.setPrimaryKey("hash")
72+
.build();
73+
74+
retryCtx.supplyStatus(session -> session.createTable(tablePath, tableDesc))
75+
.join().expectSuccess("Can't create table " + tablePath);
76+
77+
StructType batchType = StructType.of(
78+
"id", PrimitiveType.Int64,
79+
"hash", PrimitiveType.Text,
80+
"data", PrimitiveType.Bytes
81+
);
82+
List<StructValue> batchData = LongStream.range(1, TEST_TABLE_SIZE + 1).mapToObj(id -> {
83+
byte[] data = pseudoRndData(id);
84+
String hash = Hashing.sha256().hashBytes(data).toString();
85+
return batchType.newValue(
86+
"id", PrimitiveValue.newInt64(id),
87+
"hash", PrimitiveValue.newText(hash),
88+
"data", PrimitiveValue.newBytes(data)
89+
);
90+
}).collect(Collectors.toList());
91+
92+
retryCtx.supplyStatus(session -> session.executeBulkUpsert(
93+
tablePath, ListType.of(batchType).newValue(batchData), new BulkUpsertSettings())
94+
).join().expectSuccess("bulk upsert problem in table " + tablePath);
95+
}
96+
97+
@AfterClass
98+
public static void dropTable() {
99+
String tablePath = YDB.getDatabase() + "/" + TEST_TABLE;
100+
SimpleTableClient client = SimpleTableClient.newClient(GrpcTableRpc.useTransport(YDB)).build();
101+
SessionRetryContext retryCtx = SessionRetryContext.create(client).build();
102+
retryCtx.supplyStatus(session -> session.dropTable(tablePath)).join();
103+
}
104+
105+
@Test
106+
public void streamReadTest() {
107+
AtomicLong rowReaded = new AtomicLong(0);
108+
109+
try (QueryClient client = QueryClient.newClient(YDB).build()) {
110+
try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) {
111+
session.createQuery("SELECT * FROM " + TEST_TABLE, TxMode.NONE).execute(part -> {
112+
rowReaded.addAndGet(part.getResultSetReader().getRowCount());
113+
}).join().getStatus().expectSuccess("Cannot execute query");
114+
}
115+
}
116+
117+
Assert.assertEquals(TEST_TABLE_SIZE, rowReaded.get());
118+
}
119+
120+
121+
@Test
122+
public void flowControlTest() {
123+
AtomicLong rowReaded = new AtomicLong(0);
124+
TestFlowCall flow = new TestFlowCall();
125+
126+
ExecuteQuerySettings settings = ExecuteQuerySettings.newBuilder()
127+
.withPartBytesLimit(1000)
128+
.withGrpcFlowControl(flow)
129+
.build();
130+
131+
try (QueryClient client = QueryClient.newClient(YDB).build()) {
132+
try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) {
133+
String query = "SELECT * FROM " + TEST_TABLE;
134+
QueryStream stream = session.createQuery(query, TxMode.NONE, Params.empty(), settings);
135+
Assert.assertTrue(flow.isExists());
136+
Assert.assertFalse(flow.isStarted());
137+
138+
CompletableFuture<Result<QueryInfo>> res = stream.execute(part -> {
139+
rowReaded.addAndGet(part.getResultSetReader().getRowCount());
140+
});
141+
142+
Assert.assertTrue(flow.isStarted());
143+
144+
int requested = 0;
145+
long readed = rowReaded.get();
146+
Assert.assertEquals(0l, readed);
147+
148+
while (readed < TEST_TABLE_SIZE) {
149+
flow.requestNext(1);
150+
requested++;
151+
flow.waitUntil(requested);
152+
Assert.assertTrue(rowReaded.get() > readed);
153+
readed = rowReaded.get();
154+
}
155+
156+
Assert.assertTrue(res.join().isSuccess());
157+
}
158+
}
159+
}
160+
161+
private class TestFlowCall implements GrpcFlowControl {
162+
private class CallImpl implements Call{
163+
private final IntConsumer request;
164+
private final Semaphore semaphore = new Semaphore(0);
165+
private boolean isStarted = false;
166+
167+
CallImpl(IntConsumer request) {
168+
this.request = request;
169+
}
170+
171+
@Override
172+
public void onStart() {
173+
isStarted = true;
174+
}
175+
176+
@Override
177+
public void onMessageReaded() {
178+
semaphore.release();
179+
}
180+
}
181+
182+
private final AtomicReference<CallImpl> current = new AtomicReference<>();
183+
184+
@Override
185+
public Call newCall(IntConsumer req) {
186+
CallImpl call = new CallImpl(req);
187+
current.set(call);
188+
return call;
189+
}
190+
191+
public boolean isExists() {
192+
return current.get() != null;
193+
}
194+
195+
public boolean isStarted() {
196+
CallImpl c = current.get();
197+
Assert.assertNotNull(c);
198+
return c.isStarted;
199+
}
200+
201+
public void requestNext(int count) {
202+
CallImpl c = current.get();
203+
Assert.assertNotNull(c);
204+
c.request.accept(count);
205+
}
206+
207+
public void waitUntil(int count) {
208+
CallImpl c = current.get();
209+
Assert.assertNotNull(c);
210+
c.semaphore.acquireUninterruptibly(count);
211+
c.semaphore.release(count);
212+
}
213+
}
214+
}

0 commit comments

Comments
 (0)