Skip to content

Commit 1b0a90c

Browse files
authored
Merge pull request #505 from alex268/master
Added option to control the grpc stream flow
2 parents eb21d71 + f9bd86b commit 1b0a90c

File tree

14 files changed

+757
-182
lines changed

14 files changed

+757
-182
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package tech.ydb.core.grpc;
2+
3+
import java.util.function.IntConsumer;
4+
5+
/**
6+
*
7+
* @author Aleksandr Gorshenin
8+
*/
9+
@FunctionalInterface
10+
public interface GrpcFlowControl {
11+
interface Call {
12+
void onStart();
13+
void onMessageRead();
14+
}
15+
16+
Call newCall(IntConsumer req);
17+
}

core/src/main/java/tech/ydb/core/grpc/GrpcReadStream.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* @param <R> type of message received
1111
*/
1212
public interface GrpcReadStream<R> {
13+
@FunctionalInterface
1314
interface Observer<R> {
1415
void onNext(R value);
1516
}

core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
import io.grpc.Metadata;
99

10+
import tech.ydb.core.impl.call.GrpcFlows;
11+
1012
/**
1113
* @author Nikolay Perfilov
1214
*/
@@ -16,13 +18,15 @@ public class GrpcRequestSettings {
1618
private final String traceId;
1719
private final List<String> clientCapabilities;
1820
private final Consumer<Metadata> trailersHandler;
21+
private final GrpcFlowControl flowControl;
1922

2023
private GrpcRequestSettings(Builder builder) {
2124
this.deadlineAfter = builder.deadlineAfter;
2225
this.preferredNodeID = builder.preferredNodeID;
2326
this.traceId = builder.traceId;
2427
this.clientCapabilities = builder.clientCapabilities;
2528
this.trailersHandler = builder.trailersHandler;
29+
this.flowControl = builder.flowControl;
2630
}
2731

2832
public static Builder newBuilder() {
@@ -49,12 +53,17 @@ public Consumer<Metadata> getTrailersHandler() {
4953
return trailersHandler;
5054
}
5155

56+
public GrpcFlowControl getFlowControl() {
57+
return flowControl;
58+
}
59+
5260
public static final class Builder {
5361
private long deadlineAfter = 0L;
5462
private Integer preferredNodeID = null;
5563
private String traceId = null;
5664
private List<String> clientCapabilities = null;
5765
private Consumer<Metadata> trailersHandler = null;
66+
private GrpcFlowControl flowControl = GrpcFlows.SIMPLE_FLOW;
5867

5968
/**
6069
* Returns a new {@code Builder} with a deadline, based on the running Java Virtual Machine's
@@ -95,6 +104,11 @@ public Builder withTraceId(String traceId) {
95104
return this;
96105
}
97106

107+
public Builder withFlowControl(GrpcFlowControl flowCtrl) {
108+
this.flowControl = flowCtrl;
109+
return this;
110+
}
111+
98112
public Builder withClientCapabilities(List<String> clientCapabilities) {
99113
this.clientCapabilities = clientCapabilities;
100114
return this;

core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import tech.ydb.core.Status;
1717
import tech.ydb.core.StatusCode;
1818
import tech.ydb.core.UnexpectedResultException;
19+
import tech.ydb.core.grpc.GrpcFlowControl;
1920
import tech.ydb.core.grpc.GrpcReadStream;
2021
import tech.ydb.core.grpc.GrpcReadWriteStream;
2122
import tech.ydb.core.grpc.GrpcRequestSettings;
@@ -132,7 +133,9 @@ public <ReqT, RespT> GrpcReadStream<RespT> readStreamCall(
132133
);
133134
}
134135

135-
return new ReadStreamCall<>(traceId, call, request, makeMetadataFromSettings(settings), handler);
136+
Metadata metadata = makeMetadataFromSettings(settings);
137+
GrpcFlowControl flowCtrl = settings.getFlowControl();
138+
return new ReadStreamCall<>(traceId, call, flowCtrl, request, metadata, handler);
136139
} catch (UnexpectedResultException ex) {
137140
logger.warn("ReadStreamCall[{}] got unexpected status {}", traceId, ex.getStatus());
138141
return new EmptyStream<>(ex.getStatus());
@@ -175,9 +178,9 @@ public <ReqT, RespT> GrpcReadWriteStream<RespT, ReqT> readWriteStreamCall(
175178
);
176179
}
177180

178-
return new ReadWriteStreamCall<>(
179-
traceId, call, makeMetadataFromSettings(settings), getAuthCallOptions(), handler
180-
);
181+
Metadata metadata = makeMetadataFromSettings(settings);
182+
GrpcFlowControl flowCtrl = settings.getFlowControl();
183+
return new ReadWriteStreamCall<>(traceId, call, flowCtrl, metadata, getAuthCallOptions(), handler);
181184
} catch (UnexpectedResultException ex) {
182185
logger.warn("ReadWriteStreamCall[{}] got unexpected status {}", traceId, ex.getStatus());
183186
return new EmptyStream<>(ex.getStatus());
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package tech.ydb.core.impl.call;
2+
3+
import java.util.function.IntConsumer;
4+
5+
import tech.ydb.core.grpc.GrpcFlowControl;
6+
7+
/**
8+
*
9+
* @author Aleksandr Gorshenin
10+
*/
11+
public class GrpcFlows {
12+
public static final GrpcFlowControl SIMPLE_FLOW = SimpleCall::new;
13+
14+
private GrpcFlows() { }
15+
16+
private static class SimpleCall implements GrpcFlowControl.Call {
17+
private final IntConsumer req;
18+
19+
SimpleCall(IntConsumer req) {
20+
this.req = req;
21+
}
22+
23+
@Override
24+
public void onStart() {
25+
req.accept(1);
26+
}
27+
28+
@Override
29+
public void onMessageRead() {
30+
req.accept(1);
31+
}
32+
}
33+
}

core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.util.concurrent.CancellationException;
44
import java.util.concurrent.CompletableFuture;
5-
import java.util.concurrent.atomic.AtomicReference;
65
import java.util.concurrent.locks.Lock;
76
import java.util.concurrent.locks.ReentrantLock;
87

@@ -16,6 +15,7 @@
1615
import org.slf4j.LoggerFactory;
1716

1817
import tech.ydb.core.Status;
18+
import tech.ydb.core.grpc.GrpcFlowControl;
1919
import tech.ydb.core.grpc.GrpcReadStream;
2020
import tech.ydb.core.grpc.GrpcStatuses;
2121
import tech.ydb.core.grpc.GrpcTransport;
@@ -35,48 +35,51 @@ public class ReadStreamCall<ReqT, RespT> extends ClientCall.Listener<RespT> impl
3535
private final GrpcStatusHandler statusConsumer;
3636
private final ReqT request;
3737
private final Metadata headers;
38+
private final GrpcFlowControl.Call flow;
3839

3940
private final CompletableFuture<Status> statusFuture = new CompletableFuture<>();
40-
private final AtomicReference<Observer<RespT>> observerReference = new AtomicReference<>();
41-
42-
public ReadStreamCall(
43-
String traceId,
44-
ClientCall<ReqT, RespT> call,
45-
ReqT request,
46-
Metadata headers,
47-
GrpcStatusHandler statusConsumer
48-
) {
41+
42+
private Observer<RespT> consumer;
43+
44+
public ReadStreamCall(String traceId, ClientCall<ReqT, RespT> call, GrpcFlowControl flowCtrl, ReqT req,
45+
Metadata headers, GrpcStatusHandler statusHandler) {
4946
this.traceId = traceId;
5047
this.call = call;
51-
this.request = request;
48+
this.request = req;
5249
this.headers = headers;
53-
this.statusConsumer = statusConsumer;
50+
this.statusConsumer = statusHandler;
51+
this.flow = flowCtrl.newCall(this::nextRequest);
5452
}
5553

5654
@Override
5755
public CompletableFuture<Status> start(Observer<RespT> observer) {
58-
if (!observerReference.compareAndSet(null, observer)) {
59-
throw new IllegalStateException("Read stream call is already started");
60-
}
61-
6256
callLock.lock();
6357
try {
58+
if (consumer != null) {
59+
throw new IllegalStateException("Read stream call is already started");
60+
}
61+
if (observer == null) {
62+
throw new IllegalArgumentException("Observer must be not empty");
63+
}
64+
65+
consumer = observer;
6466
call.start(this, headers);
6567
if (logger.isTraceEnabled()) {
6668
logger.trace("ReadStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request));
6769
}
6870
call.sendMessage(request);
6971
// close stream by client side
7072
call.halfClose();
71-
call.request(1);
72-
} catch (Throwable t) {
73+
// init flow
74+
flow.onStart();
75+
} catch (Throwable th) {
76+
statusFuture.completeExceptionally(th);
77+
7378
try {
74-
call.cancel(null, t);
79+
call.cancel(null, th);
7580
} catch (Throwable ex) {
7681
logger.error("ReadStreamCall[{}] got exception while canceling", traceId, ex);
7782
}
78-
79-
statusFuture.completeExceptionally(t);
8083
} finally {
8184
callLock.unlock();
8285
}
@@ -94,20 +97,24 @@ public void cancel() {
9497
}
9598
}
9699

100+
private void nextRequest(int count) {
101+
// request delivery of the next inbound message.
102+
callLock.lock();
103+
try {
104+
call.request(count);
105+
} finally {
106+
callLock.unlock();
107+
}
108+
}
109+
97110
@Override
98111
public void onMessage(RespT message) {
99112
try {
100113
if (logger.isTraceEnabled()) {
101114
logger.trace("ReadStreamCall[{}] <-- {}", traceId, TextFormat.shortDebugString((Message) message));
102115
}
103-
observerReference.get().onNext(message);
104-
// request delivery of the next inbound message.
105-
callLock.lock();
106-
try {
107-
call.request(1);
108-
} finally {
109-
callLock.unlock();
110-
}
116+
consumer.onNext(message);
117+
flow.onMessageRead();
111118
} catch (Exception ex) {
112119
statusFuture.completeExceptionally(ex);
113120

core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import java.util.Queue;
55
import java.util.concurrent.CancellationException;
66
import java.util.concurrent.CompletableFuture;
7-
import java.util.concurrent.atomic.AtomicReference;
87
import java.util.concurrent.locks.Lock;
98
import java.util.concurrent.locks.ReentrantLock;
109

@@ -18,6 +17,7 @@
1817
import org.slf4j.LoggerFactory;
1918

2019
import tech.ydb.core.Status;
20+
import tech.ydb.core.grpc.GrpcFlowControl;
2121
import tech.ydb.core.grpc.GrpcReadWriteStream;
2222
import tech.ydb.core.grpc.GrpcStatuses;
2323
import tech.ydb.core.grpc.GrpcTransport;
@@ -39,23 +39,21 @@ public class ReadWriteStreamCall<R, W> extends ClientCall.Listener<R> implements
3939
private final GrpcStatusHandler statusConsumer;
4040
private final Metadata headers;
4141
private final AuthCallOptions callOptions;
42+
private final GrpcFlowControl.Call flow;
4243

43-
private final CompletableFuture<Status> statusFuture = new CompletableFuture<>();
44-
private final AtomicReference<Observer<R>> observerReference = new AtomicReference<>();
4544
private final Queue<W> messagesQueue = new ArrayDeque<>();
4645

47-
public ReadWriteStreamCall(
48-
String traceId,
49-
ClientCall<W, R> call,
50-
Metadata headers,
51-
AuthCallOptions callOptions,
52-
GrpcStatusHandler statusConsumer
53-
) {
46+
private final CompletableFuture<Status> statusFuture = new CompletableFuture<>();
47+
private Observer<R> consumer = null;
48+
49+
public ReadWriteStreamCall(String traceId, ClientCall<W, R> call, GrpcFlowControl flowCtrl,
50+
Metadata headers, AuthCallOptions options, GrpcStatusHandler statusHandler) {
5451
this.traceId = traceId;
5552
this.call = call;
5653
this.headers = headers;
57-
this.statusConsumer = statusConsumer;
58-
this.callOptions = callOptions;
54+
this.statusConsumer = statusHandler;
55+
this.callOptions = options;
56+
this.flow = flowCtrl.newCall(this::nextRequest);
5957
}
6058

6159
@Override
@@ -65,14 +63,18 @@ public String authToken() {
6563

6664
@Override
6765
public CompletableFuture<Status> start(Observer<R> observer) {
68-
if (!observerReference.compareAndSet(null, observer)) {
69-
throw new IllegalStateException("Read stream call is already started");
70-
}
71-
7266
callLock.lock();
7367
try {
68+
if (consumer != null) {
69+
throw new IllegalStateException("Read write stream call is already started");
70+
}
71+
if (observer == null) {
72+
throw new IllegalArgumentException("Observer must be not empty");
73+
}
74+
consumer = observer;
7475
call.start(this, headers);
75-
call.request(1);
76+
// init flow control
77+
flow.onStart();
7678
} catch (Throwable t) {
7779
try {
7880
call.cancel(null, t);
@@ -133,24 +135,27 @@ public void cancel() {
133135
}
134136
}
135137

138+
private void nextRequest(int count) {
139+
// request delivery of the next inbound message.
140+
callLock.lock();
141+
try {
142+
call.request(count);
143+
} finally {
144+
callLock.unlock();
145+
}
146+
}
147+
136148
@Override
137149
public void onMessage(R message) {
138150
try {
139151
if (logger.isTraceEnabled()) {
140152
logger.trace("ReadWriteStreamCall[{}] <-- {}", traceId, TextFormat.shortDebugString((Message) message));
141153
}
142154

143-
observerReference.get().onNext(message);
144-
// request delivery of the next inbound message.
145-
callLock.lock();
146-
try {
147-
call.request(1);
148-
} finally {
149-
callLock.unlock();
150-
}
155+
consumer.onNext(message);
156+
flow.onMessageRead();
151157
} catch (Exception ex) {
152158
statusFuture.completeExceptionally(ex);
153-
154159
try {
155160
callLock.lock();
156161
try {

0 commit comments

Comments
 (0)