Skip to content

Commit 943a256

Browse files
committed
Added tests for ReadTable
1 parent e814a71 commit 943a256

File tree

1 file changed

+265
-0
lines changed

1 file changed

+265
-0
lines changed
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
package tech.ydb.table.integration;
2+
3+
import java.time.Duration;
4+
import java.util.List;
5+
import java.util.Random;
6+
import java.util.concurrent.CompletableFuture;
7+
import java.util.concurrent.Semaphore;
8+
import java.util.concurrent.atomic.AtomicLong;
9+
import java.util.concurrent.atomic.AtomicReference;
10+
import java.util.function.IntConsumer;
11+
import java.util.stream.Collectors;
12+
import java.util.stream.LongStream;
13+
14+
import javax.annotation.Nonnull;
15+
16+
import com.google.common.hash.Hashing;
17+
import org.junit.AfterClass;
18+
import org.junit.Assert;
19+
import org.junit.BeforeClass;
20+
import org.junit.ClassRule;
21+
import org.junit.Test;
22+
23+
import tech.ydb.core.Status;
24+
import tech.ydb.core.grpc.GrpcFlowControl;
25+
import tech.ydb.core.grpc.GrpcReadStream;
26+
import tech.ydb.table.Session;
27+
import tech.ydb.table.SessionRetryContext;
28+
import tech.ydb.table.description.TableDescription;
29+
import tech.ydb.table.impl.SimpleTableClient;
30+
import tech.ydb.table.query.ReadTablePart;
31+
import tech.ydb.table.rpc.grpc.GrpcTableRpc;
32+
import tech.ydb.table.settings.BulkUpsertSettings;
33+
import tech.ydb.table.settings.ReadTableSettings;
34+
import tech.ydb.table.values.ListType;
35+
import tech.ydb.table.values.PrimitiveType;
36+
import tech.ydb.table.values.PrimitiveValue;
37+
import tech.ydb.table.values.StructType;
38+
import tech.ydb.table.values.StructValue;
39+
import tech.ydb.test.junit4.GrpcTransportRule;
40+
41+
/**
42+
*
43+
* @author Aleksandr Gorshenin
44+
*/
45+
public class ReadTableTest {
46+
@ClassRule
47+
public static final GrpcTransportRule YDB = new GrpcTransportRule();
48+
private static final String TEST_TABLE = "read_table_test";
49+
private static final long TEST_TABLE_SIZE = 1000;
50+
51+
private static final SimpleTableClient client = SimpleTableClient.newClient(GrpcTableRpc.useTransport(YDB)).build();
52+
private static final SessionRetryContext retryCtx = SessionRetryContext.create(client).build();
53+
54+
55+
@Nonnull
56+
private static String tablePath(String tableName) {
57+
return YDB.getDatabase() + "/" + tableName;
58+
}
59+
60+
private static byte[] pseudoRndData(long seed) {
61+
Random rnd = new Random(seed);
62+
int length = rnd.nextInt(256) + 256;
63+
byte[] data = new byte[length];
64+
rnd.nextBytes(data);
65+
return data;
66+
}
67+
68+
@BeforeClass
69+
public static void prepareTable() {
70+
String tablePath = tablePath(TEST_TABLE);
71+
72+
TableDescription tableDesc = TableDescription.newBuilder()
73+
.addNonnullColumn("id", PrimitiveType.Int64)
74+
.addNonnullColumn("hash", PrimitiveType.Text)
75+
.addNullableColumn("data", PrimitiveType.Bytes)
76+
.setPrimaryKey("id")
77+
.build();
78+
79+
retryCtx.supplyStatus(session -> session.createTable(tablePath, tableDesc))
80+
.join().expectSuccess("Can't create table " + tablePath);
81+
82+
StructType batchType = StructType.of(
83+
"id", PrimitiveType.Int64,
84+
"hash", PrimitiveType.Text,
85+
"data", PrimitiveType.Bytes
86+
);
87+
List<StructValue> batchData = LongStream.range(1, TEST_TABLE_SIZE + 1).mapToObj(id -> {
88+
byte[] data = pseudoRndData(id);
89+
String hash = Hashing.sha256().hashBytes(data).toString();
90+
return batchType.newValue(
91+
"id", PrimitiveValue.newInt64(id),
92+
"hash", PrimitiveValue.newText(hash),
93+
"data", PrimitiveValue.newBytes(data)
94+
);
95+
}).collect(Collectors.toList());
96+
97+
retryCtx.supplyStatus(session -> session.executeBulkUpsert(
98+
tablePath, ListType.of(batchType).newValue(batchData), new BulkUpsertSettings())
99+
).join().expectSuccess("bulk upsert problem in table " + tablePath);
100+
}
101+
102+
@AfterClass
103+
public static void dropTable() {
104+
String tablePath = tablePath(TEST_TABLE);
105+
retryCtx.supplyStatus(session -> session.dropTable(tablePath)).join();
106+
}
107+
108+
@Test
109+
public void readTableTest() {
110+
String tablePath = tablePath(TEST_TABLE);
111+
AtomicLong rowReaded = new AtomicLong(0);
112+
113+
ReadTableSettings rts = ReadTableSettings.newBuilder().column("id").build();
114+
retryCtx.supplyStatus(session -> {
115+
rowReaded.set(0);
116+
return session.executeReadTable(tablePath, rts).start(part -> {
117+
rowReaded.addAndGet(part.getResultSetReader().getRowCount());
118+
});
119+
}).join().expectSuccess("Cannot read table " + tablePath);
120+
121+
Assert.assertEquals(TEST_TABLE_SIZE, rowReaded.get());
122+
}
123+
124+
@Test
125+
public void limitedReadTableTest() {
126+
String tablePath = tablePath(TEST_TABLE);
127+
AtomicLong rowReaded = new AtomicLong(0);
128+
129+
ReadTableSettings rts = ReadTableSettings.newBuilder().column("id").batchLimitRows(100).build();
130+
retryCtx.supplyStatus(session -> {
131+
rowReaded.set(0);
132+
return session.executeReadTable(tablePath, rts).start(part -> {
133+
Assert.assertTrue(part.getResultSetReader().getRowCount() <= 100);
134+
rowReaded.addAndGet(part.getResultSetReader().getRowCount());
135+
});
136+
}).join().expectSuccess("Cannot read table " + tablePath);
137+
138+
Assert.assertEquals(TEST_TABLE_SIZE, rowReaded.get());
139+
}
140+
141+
@Test
142+
public void partialReadTableTest() {
143+
String tablePath = tablePath(TEST_TABLE);
144+
AtomicLong rowReaded = new AtomicLong(0);
145+
146+
ReadTableSettings rts = ReadTableSettings.newBuilder().column("id")
147+
.fromKeyExclusive(PrimitiveValue.newInt64(1))
148+
.toKeyExclusive(PrimitiveValue.newInt64(TEST_TABLE_SIZE))
149+
.build();
150+
retryCtx.supplyStatus(session -> {
151+
rowReaded.set(0);
152+
return session.executeReadTable(tablePath, rts).start(part -> {
153+
rowReaded.addAndGet(part.getResultSetReader().getRowCount());
154+
});
155+
}).join().expectSuccess("Cannot read table " + tablePath);
156+
157+
Assert.assertEquals(TEST_TABLE_SIZE - 2, rowReaded.get());
158+
159+
ReadTableSettings rts2 = ReadTableSettings.newBuilder().column("id")
160+
.fromKeyInclusive(PrimitiveValue.newInt64(2))
161+
.toKeyInclusive(PrimitiveValue.newInt64(TEST_TABLE_SIZE - 1))
162+
.build();
163+
retryCtx.supplyStatus(session -> {
164+
rowReaded.set(0);
165+
return session.executeReadTable(tablePath, rts2).start(part -> {
166+
rowReaded.addAndGet(part.getResultSetReader().getRowCount());
167+
});
168+
}).join().expectSuccess("Cannot read table " + tablePath);
169+
170+
Assert.assertEquals(TEST_TABLE_SIZE - 2, rowReaded.get());
171+
}
172+
173+
@Test
174+
public void flowControlReadTableTest() {
175+
String tablePath = tablePath(TEST_TABLE);
176+
177+
AtomicLong rowReaded = new AtomicLong(0);
178+
TestFlowCall flow = new TestFlowCall();
179+
180+
ReadTableSettings rts = ReadTableSettings.newBuilder().column("id")
181+
.withGrpcFlowControl(flow)
182+
.batchLimitBytes(5000)
183+
.build();
184+
185+
try (Session session = client.createSession(Duration.ofSeconds(5)).join().getValue()) {
186+
GrpcReadStream<ReadTablePart> stream = session.executeReadTable(tablePath, rts);
187+
Assert.assertTrue(flow.isExists());
188+
Assert.assertFalse(flow.isStarted());
189+
190+
CompletableFuture<Status> res = stream.start(part -> {
191+
rowReaded.addAndGet(part.getResultSetReader().getRowCount());
192+
});
193+
194+
Assert.assertTrue(flow.isStarted());
195+
196+
int requested = 0;
197+
long readed = rowReaded.get();
198+
Assert.assertEquals(0l, readed);
199+
200+
while (readed < TEST_TABLE_SIZE) {
201+
flow.requestNext(1);
202+
requested++;
203+
flow.waitUntil(requested);
204+
Assert.assertTrue(rowReaded.get() > readed);
205+
readed = rowReaded.get();
206+
}
207+
208+
Assert.assertTrue(res.join().isSuccess());
209+
}
210+
}
211+
212+
private class TestFlowCall implements GrpcFlowControl {
213+
private class CallImpl implements Call{
214+
private final IntConsumer request;
215+
private final Semaphore semaphore = new Semaphore(0);
216+
private boolean isStarted = false;
217+
218+
CallImpl(IntConsumer request) {
219+
this.request = request;
220+
}
221+
222+
@Override
223+
public void onStart() {
224+
isStarted = true;
225+
}
226+
227+
@Override
228+
public void onMessageReaded() {
229+
semaphore.release();
230+
}
231+
}
232+
233+
private final AtomicReference<CallImpl> current = new AtomicReference<>();
234+
235+
@Override
236+
public Call newCall(IntConsumer req) {
237+
CallImpl call = new CallImpl(req);
238+
current.set(call);
239+
return call;
240+
}
241+
242+
public boolean isExists() {
243+
return current.get() != null;
244+
}
245+
246+
public boolean isStarted() {
247+
CallImpl c = current.get();
248+
Assert.assertNotNull(c);
249+
return c.isStarted;
250+
}
251+
252+
public void requestNext(int count) {
253+
CallImpl c = current.get();
254+
Assert.assertNotNull(c);
255+
c.request.accept(count);
256+
}
257+
258+
public void waitUntil(int count) {
259+
CallImpl c = current.get();
260+
Assert.assertNotNull(c);
261+
c.semaphore.acquireUninterruptibly(count);
262+
c.semaphore.release(count);
263+
}
264+
}
265+
}

0 commit comments

Comments
 (0)