Skip to content

Commit bd4b1f5

Browse files
perf: Skip gRPC trailers for StreamingRead & ExecuteStreamingSql (#3661)
1 parent 1b70be0 commit bd4b1f5

File tree

2 files changed

+63
-1
lines changed

2 files changed

+63
-1
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
5252
private TimeUnit streamWaitTimeoutUnit;
5353
private long streamWaitTimeoutValue;
5454
private SpannerException error;
55+
private boolean done;
5556

5657
@VisibleForTesting
5758
GrpcStreamIterator(int prefetchChunks, boolean cancelQueryWhenClientIsClosed) {
@@ -166,11 +167,17 @@ private class ConsumerImpl implements SpannerRpc.ResultStreamConsumer {
166167
@Override
167168
public void onPartialResultSet(PartialResultSet results) {
168169
addToStream(results);
170+
if (results.getLast()) {
171+
done = true;
172+
addToStream(END_OF_STREAM);
173+
}
169174
}
170175

171176
@Override
172177
public void onCompleted() {
173-
addToStream(END_OF_STREAM);
178+
if (!done) {
179+
addToStream(END_OF_STREAM);
180+
}
174181
}
175182

176183
@Override

google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java

+55
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.google.common.testing.SerializableTester.reserialize;
2020
import static com.google.common.truth.Truth.assertThat;
2121
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertFalse;
2223
import static org.junit.Assert.assertThrows;
2324
import static org.junit.Assert.assertTrue;
2425

@@ -1115,4 +1116,58 @@ public void getProtoEnumList() {
11151116
resultSet.getProtoEnum(0, Genre::forNumber);
11161117
});
11171118
}
1119+
1120+
@Test
1121+
public void verifyResultSetWithLastTrue() {
1122+
long[] longArray = {111, 333, 444, 0, -1, -2234, Long.MAX_VALUE, Long.MIN_VALUE};
1123+
1124+
consumer.onPartialResultSet(
1125+
PartialResultSet.newBuilder()
1126+
.setMetadata(
1127+
makeMetadata(Type.struct(Type.StructField.of("f", Type.array(Type.int64())))))
1128+
.addValues(Value.int64Array(longArray).toProto())
1129+
.setLast(false)
1130+
.build());
1131+
assertTrue(resultSet.next());
1132+
consumer.onPartialResultSet(
1133+
PartialResultSet.newBuilder()
1134+
.setMetadata(
1135+
makeMetadata(Type.struct(Type.StructField.of("f", Type.array(Type.int64())))))
1136+
.addValues(Value.int64Array(longArray).toProto())
1137+
.setLast(true)
1138+
.build());
1139+
assertTrue(resultSet.next());
1140+
assertFalse(resultSet.next());
1141+
consumer.onCompleted();
1142+
}
1143+
1144+
@Test
1145+
public void shouldThrowDeadlineExceededIfLastTrueIsNotReceived() {
1146+
long[] longArray = {111, 333, 444, 0, -1, -2234, Long.MAX_VALUE, Long.MIN_VALUE};
1147+
1148+
consumer.onPartialResultSet(
1149+
PartialResultSet.newBuilder()
1150+
.setMetadata(
1151+
makeMetadata(Type.struct(Type.StructField.of("f", Type.array(Type.int64())))))
1152+
.addValues(Value.int64Array(longArray).toProto())
1153+
.setLast(false)
1154+
.build());
1155+
assertTrue(resultSet.next());
1156+
consumer.onPartialResultSet(
1157+
PartialResultSet.newBuilder()
1158+
.setMetadata(
1159+
makeMetadata(Type.struct(Type.StructField.of("f", Type.array(Type.int64())))))
1160+
.addValues(Value.int64Array(longArray).toProto())
1161+
.setLast(false)
1162+
.build());
1163+
assertTrue(resultSet.next());
1164+
SpannerException spannerException =
1165+
assertThrows(
1166+
SpannerException.class,
1167+
() -> {
1168+
assertThat(resultSet.next()).isFalse();
1169+
});
1170+
assertEquals("DEADLINE_EXCEEDED: stream wait timeout", spannerException.getMessage());
1171+
consumer.onCompleted();
1172+
}
11181173
}

0 commit comments

Comments
 (0)