Skip to content

Commit 27d51bd

Browse files
committed
Added grpc flow control option to readTable,scanQuery and queryService
1 parent 987c9c3 commit 27d51bd

File tree

5 files changed

+59
-11
lines changed

5 files changed

+59
-11
lines changed

query/src/main/java/tech/ydb/query/impl/SessionImpl.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ private static YdbQuery.StatsMode mapStatsMode(QueryStatsMode mode) {
183183
GrpcReadStream<YdbQuery.ExecuteQueryResponsePart> createGrpcStream(
184184
String query, YdbQuery.TransactionControl tx, Params prms, ExecuteQuerySettings settings
185185
) {
186-
YdbQuery.ExecuteQueryRequest.Builder requestBuilder = YdbQuery.ExecuteQueryRequest.newBuilder()
186+
YdbQuery.ExecuteQueryRequest.Builder request = YdbQuery.ExecuteQueryRequest.newBuilder()
187187
.setSessionId(sessionId)
188188
.setExecMode(mapExecMode(settings.getExecMode()))
189189
.setStatsMode(mapStatsMode(settings.getStatsMode()))
@@ -197,15 +197,19 @@ GrpcReadStream<YdbQuery.ExecuteQueryResponsePart> createGrpcStream(
197197

198198
String resourcePool = settings.getResourcePool();
199199
if (resourcePool != null && !resourcePool.isEmpty()) {
200-
requestBuilder.setPoolId(resourcePool);
200+
request.setPoolId(resourcePool);
201201
}
202202

203203
if (tx != null) {
204-
requestBuilder.setTxControl(tx);
204+
request.setTxControl(tx);
205205
}
206206

207-
YdbQuery.ExecuteQueryRequest request = requestBuilder.build();
208-
return rpc.executeQuery(request, makeOptions(settings).build());
207+
GrpcRequestSettings.Builder options = makeOptions(settings);
208+
if (settings.getGrpcFlowControl() != null) {
209+
options = options.withFlowControl(settings.getGrpcFlowControl());
210+
}
211+
212+
return rpc.executeQuery(request.build(), options.build());
209213
}
210214

211215
@Override

query/src/main/java/tech/ydb/query/settings/ExecuteQuerySettings.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package tech.ydb.query.settings;
22

3+
import tech.ydb.core.grpc.GrpcFlowControl;
34
import tech.ydb.core.settings.BaseRequestSettings;
45

56
/**
@@ -11,13 +12,15 @@ public class ExecuteQuerySettings extends BaseRequestSettings {
1112
private final QueryStatsMode statsMode;
1213
private final boolean concurrentResultSets;
1314
private final String resourcePool;
15+
private final GrpcFlowControl flowControl;
1416

1517
private ExecuteQuerySettings(Builder builder) {
1618
super(builder);
1719
this.execMode = builder.execMode;
1820
this.statsMode = builder.statsMode;
1921
this.concurrentResultSets = builder.concurrentResultSets;
2022
this.resourcePool = builder.resourcePool;
23+
this.flowControl = builder.flowControl;
2124
}
2225

2326
public QueryExecMode getExecMode() {
@@ -40,6 +43,10 @@ public String getResourcePool() {
4043
return this.resourcePool;
4144
}
4245

46+
public GrpcFlowControl getGrpcFlowControl() {
47+
return flowControl;
48+
}
49+
4350
public static Builder newBuilder() {
4451
return new Builder();
4552
}
@@ -49,6 +56,7 @@ public static class Builder extends BaseBuilder<Builder> {
4956
private QueryStatsMode statsMode = QueryStatsMode.NONE;
5057
private boolean concurrentResultSets = false;
5158
private String resourcePool = null;
59+
private GrpcFlowControl flowControl = null;
5260

5361
public Builder withExecMode(QueryExecMode mode) {
5462
this.execMode = mode;
@@ -79,6 +87,11 @@ public Builder withResourcePool(String poolId) {
7987
return this;
8088
}
8189

90+
public Builder withGrpcFlowControl(GrpcFlowControl ctrl) {
91+
this.flowControl = ctrl;
92+
return this;
93+
}
94+
8295
@Override
8396
public ExecuteQuerySettings build() {
8497
return new ExecuteQuerySettings(this);

table/src/main/java/tech/ydb/table/impl/BaseSession.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1199,9 +1199,12 @@ public GrpcReadStream<ReadTablePart> executeReadTable(String tablePath, ReadTabl
11991199
request.addAllColumns(settings.getColumns());
12001200
}
12011201

1202-
final GrpcRequestSettings opts = makeOptions(settings).build();
1203-
final GrpcReadStream<YdbTable.ReadTableResponse> origin = rpc.streamReadTable(request.build(), opts);
1202+
GrpcRequestSettings.Builder options = makeOptions(settings);
1203+
if (settings.getGrpcFlowControl() != null) {
1204+
options = options.withFlowControl(settings.getGrpcFlowControl());
1205+
}
12041206

1207+
GrpcReadStream<YdbTable.ReadTableResponse> origin = rpc.streamReadTable(request.build(), options.build());
12051208
return new ProxyReadStream<>(origin, (response, future, observer) -> {
12061209
StatusIds.StatusCode statusCode = response.getStatus();
12071210
if (statusCode == StatusIds.StatusCode.SUCCESS) {
@@ -1222,18 +1225,20 @@ public GrpcReadStream<ReadTablePart> executeReadTable(String tablePath, ReadTabl
12221225

12231226
@Override
12241227
public GrpcReadStream<ResultSetReader> executeScanQuery(String query, Params params,
1225-
ExecuteScanQuerySettings settings
1226-
) {
1228+
ExecuteScanQuerySettings settings) {
12271229
YdbTable.ExecuteScanQueryRequest req = YdbTable.ExecuteScanQueryRequest.newBuilder()
12281230
.setQuery(YdbTable.Query.newBuilder().setYqlText(query))
12291231
.setMode(settings.getMode().toPb())
12301232
.putAllParameters(params.toPb())
12311233
.setCollectStats(settings.getCollectStats().toPb())
12321234
.build();
12331235

1234-
GrpcRequestSettings opts = makeOptions(settings).build();
1236+
GrpcRequestSettings.Builder opts = makeOptions(settings);
1237+
if (settings.getGrpcFlowControl() != null) {
1238+
opts = opts.withFlowControl(settings.getGrpcFlowControl());
1239+
}
12351240

1236-
final GrpcReadStream<YdbTable.ExecuteScanQueryPartialResponse> origin = rpc.streamExecuteScanQuery(req, opts);
1241+
GrpcReadStream<YdbTable.ExecuteScanQueryPartialResponse> origin = rpc.streamExecuteScanQuery(req, opts.build());
12371242
return new ProxyReadStream<>(origin, (response, future, observer) -> {
12381243
StatusIds.StatusCode statusCode = response.getStatus();
12391244
if (statusCode == StatusIds.StatusCode.SUCCESS) {

table/src/main/java/tech/ydb/table/settings/ExecuteScanQuerySettings.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package tech.ydb.table.settings;
22

3+
import tech.ydb.core.grpc.GrpcFlowControl;
34
import tech.ydb.core.settings.BaseRequestSettings;
45
import tech.ydb.proto.table.YdbTable;
56
import tech.ydb.table.query.stats.QueryStatsCollectionMode;
@@ -27,11 +28,13 @@ public YdbTable.ExecuteScanQueryRequest.Mode toPb() {
2728

2829
private final Mode mode;
2930
private final QueryStatsCollectionMode collectStats;
31+
private final GrpcFlowControl flowControl;
3032

3133
public ExecuteScanQuerySettings(Builder builder) {
3234
super(builder);
3335
this.mode = builder.mode;
3436
this.collectStats = builder.collectStats;
37+
this.flowControl = builder.flowControl;
3538
}
3639

3740
public static Builder newBuilder() {
@@ -41,6 +44,7 @@ public static Builder newBuilder() {
4144
public static final class Builder extends BaseBuilder<Builder> {
4245
private Mode mode = Mode.EXEC;
4346
private QueryStatsCollectionMode collectStats = QueryStatsCollectionMode.NONE;
47+
private GrpcFlowControl flowControl = null;
4448

4549
public Builder setMode(Mode mode) {
4650
this.mode = mode;
@@ -52,6 +56,11 @@ public Builder setCollectStats(QueryStatsCollectionMode collectStats) {
5256
return this;
5357
}
5458

59+
public Builder setGrpcFlowControl(GrpcFlowControl ctrl) {
60+
this.flowControl = ctrl;
61+
return this;
62+
}
63+
5564
@Override
5665
public ExecuteScanQuerySettings build() {
5766
return new ExecuteScanQuerySettings(this);
@@ -65,4 +74,8 @@ public Mode getMode() {
6574
public QueryStatsCollectionMode getCollectStats() {
6675
return collectStats;
6776
}
77+
78+
public GrpcFlowControl getGrpcFlowControl() {
79+
return flowControl;
80+
}
6881
}

table/src/main/java/tech/ydb/table/settings/ReadTableSettings.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import com.google.common.collect.ImmutableList;
1111

12+
import tech.ydb.core.grpc.GrpcFlowControl;
1213
import tech.ydb.core.settings.BaseRequestSettings;
1314
import tech.ydb.table.values.PrimitiveValue;
1415
import tech.ydb.table.values.TupleValue;
@@ -31,6 +32,7 @@ public class ReadTableSettings extends BaseRequestSettings {
3132
private final ImmutableList<String> columns;
3233
private final int batchLimitBytes;
3334
private final int batchLimitRows;
35+
private final GrpcFlowControl flowControl;
3436

3537
private ReadTableSettings(Builder builder) {
3638
super(builder);
@@ -43,6 +45,7 @@ private ReadTableSettings(Builder builder) {
4345
this.columns = ImmutableList.copyOf(builder.columns);
4446
this.batchLimitBytes = builder.batchLimitBytes;
4547
this.batchLimitRows = builder.batchLimitRows;
48+
this.flowControl = builder.flowControl;
4649
}
4750

4851
public static Builder newBuilder() {
@@ -87,6 +90,10 @@ public int batchLimitRows() {
8790
return batchLimitRows;
8891
}
8992

93+
public GrpcFlowControl getGrpcFlowControl() {
94+
return flowControl;
95+
}
96+
9097
/**
9198
* BUILDER
9299
*/
@@ -100,6 +107,7 @@ public static final class Builder extends BaseBuilder<Builder> {
100107
private List<String> columns = Collections.emptyList();
101108
private int batchLimitBytes = 0;
102109
private int batchLimitRows = 0;
110+
private GrpcFlowControl flowControl = null;
103111

104112
public Builder orderedRead(boolean ordered) {
105113
this.ordered = ordered;
@@ -184,6 +192,11 @@ public Builder batchLimitRows(int batchLimitRows) {
184192
return this;
185193
}
186194

195+
public Builder withGrpcFlowControl(GrpcFlowControl ctrl) {
196+
this.flowControl = ctrl;
197+
return this;
198+
}
199+
187200
@Override
188201
public ReadTableSettings build() {
189202
return new ReadTableSettings(this);

0 commit comments

Comments
 (0)