Skip to content

Commit db70cc5

Browse files
api: support iproto feature discovery
Since version 2.10.0 Tarantool supports feature discovery [1]. Client can send the schema version and supported features and receive server-side schema version and supported features information to tune its behavior. After this patch, the request will be send on `Connect`. Connector stores server info in connection internals. After that, user may call `IsProtocolVersionSupported` and `IsProtocolFeatureSupported` handles to check if it is possible to use a feature. `IsProtocolFeatureSupported` iterates over lists to check if feature is enabled. It seems that iterating over a small list is way faster than building a map, see [3]. Benchmark tests show that this check is rather fast (0.5 ns on HP ProBook 440 G5) so it is not necessary to cache it in any way. Traces of IPROTO_FEATURE_GRACEFUL_SHUTDOWN flag and protocol version 4 could be found in Tarantool source code but they were removed in the following commits before the release and treated like they never existed. We also ignore them here too. See [2] for more info. 1. tarantool/tarantool#6253 2. tarantool/tarantool-python#262 3. https://stackoverflow.com/a/52710077/11646599 Closes #120
1 parent 48cf0c7 commit db70cc5

8 files changed

+283
-1
lines changed

Diff for: CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1010

1111
### Added
1212

13+
- Support IProto feature discovery (#120).
14+
1315
### Changed
1416

1517
### Fixed

Diff for: connection.go

+64
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ type Connection struct {
146146
lenbuf [PacketLengthBytes]byte
147147

148148
lastStreamId uint64
149+
150+
serverProtocolInfo protocolInfo
149151
}
150152

151153
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
@@ -391,6 +393,13 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
391393
}
392394
}
393395

396+
if err = conn.loadProtocolInfo(); err != nil {
397+
conn.mutex.Lock()
398+
defer conn.mutex.Unlock()
399+
conn.closeConnection(err, true)
400+
return nil, err
401+
}
402+
394403
return conn, err
395404
}
396405

@@ -687,6 +696,10 @@ func (conn *Connection) reconnect(neterr error, c net.Conn) {
687696
conn.closeConnection(neterr, false)
688697
if err := conn.createConnection(true); err != nil {
689698
conn.closeConnection(err, true)
699+
} else {
700+
if err = conn.loadProtocolInfo(); err != nil {
701+
conn.closeConnection(err, true)
702+
}
690703
}
691704
}
692705
} else {
@@ -1163,3 +1176,54 @@ func (conn *Connection) NewStream() (*Stream, error) {
11631176
Conn: conn,
11641177
}, nil
11651178
}
1179+
1180+
// loadProtocolInfo sends info about client protocol,
1181+
// receives info about server protocol in response
1182+
// and store in in connection serverProtocolInfo.
1183+
func (conn *Connection) loadProtocolInfo() error {
1184+
var ok bool
1185+
1186+
resp, err := conn.exchangeProtocolInfo(
1187+
ClientProtocolVersion,
1188+
ClientProtocolFeatures)
1189+
1190+
if err != nil {
1191+
if resp.Code == ErrUnknownRequestType {
1192+
// IPROTO_ID requests are not supported by server.
1193+
conn.serverProtocolInfo = protocolInfo{
1194+
version: ProtocolVersionUnsupported,
1195+
features: []ProtocolFeature{},
1196+
}
1197+
return nil
1198+
}
1199+
1200+
return err
1201+
}
1202+
1203+
if len(resp.Data) == 0 {
1204+
return fmt.Errorf("Unexpected response on protocol info exchange: no data")
1205+
}
1206+
1207+
conn.serverProtocolInfo, ok = resp.Data[0].(protocolInfo)
1208+
if !ok {
1209+
return fmt.Errorf("Unexpected response on protocol info exchange: wrong data")
1210+
}
1211+
1212+
return nil
1213+
}
1214+
1215+
// IsProtocolVersionSupported checks if expected protocol version
1216+
// is supported by both server and client.
1217+
// Since 1.10.0
1218+
func (conn *Connection) IsProtocolVersionSupported(version ProtocolVersion) bool {
1219+
return (version <= ClientProtocolVersion) &&
1220+
(version <= conn.serverProtocolInfo.version)
1221+
}
1222+
1223+
// IsProtocolFeatureSupported checks if expected protocol feature
1224+
// is supported by both server and client.
1225+
// Since 1.10.0
1226+
func (conn *Connection) IsProtocolFeatureSupported(feature ProtocolFeature) bool {
1227+
return isFeatureSupported(feature, ClientProtocolFeatures) &&
1228+
isFeatureSupported(feature, conn.serverProtocolInfo.features)
1229+
}

Diff for: const.go

+3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const (
1818
RollbackRequestCode = 16
1919
PingRequestCode = 64
2020
SubscribeRequestCode = 66
21+
IdRequestCode = 73
2122

2223
KeyCode = 0x00
2324
KeySync = 0x01
@@ -41,6 +42,8 @@ const (
4142
KeySQLBind = 0x41
4243
KeySQLInfo = 0x42
4344
KeyStmtID = 0x43
45+
KeyVersion = 0x54
46+
KeyFeatures = 0x55
4447
KeyTimeout = 0x56
4548
KeyTxnIsolation = 0x59
4649

Diff for: example_test.go

+30
Original file line numberDiff line numberDiff line change
@@ -334,8 +334,18 @@ func ExampleCommitRequest() {
334334
conn := example_connect()
335335
defer conn.Close()
336336

337+
if conn.IsProtocolFeatureSupported(ProtocolFeatureStreams) != true {
338+
fmt.Printf("Streams are not supported")
339+
return
340+
}
341+
337342
stream, _ := conn.NewStream()
338343

344+
if conn.IsProtocolFeatureSupported(ProtocolFeatureTransactions) != true {
345+
fmt.Printf("Transactions are not supported")
346+
return
347+
}
348+
339349
// Begin transaction
340350
req = tarantool.NewBeginRequest()
341351
resp, err = stream.Do(req).Get()
@@ -410,8 +420,18 @@ func ExampleRollbackRequest() {
410420
conn := example_connect()
411421
defer conn.Close()
412422

423+
if conn.IsProtocolFeatureSupported(ProtocolFeatureStreams) != true {
424+
fmt.Printf("Streams are not supported")
425+
return
426+
}
427+
413428
stream, _ := conn.NewStream()
414429

430+
if conn.IsProtocolFeatureSupported(ProtocolFeatureTransactions) != true {
431+
fmt.Printf("Transactions are not supported")
432+
return
433+
}
434+
415435
// Begin transaction
416436
req = tarantool.NewBeginRequest()
417437
resp, err = stream.Do(req).Get()
@@ -486,8 +506,18 @@ func ExampleBeginRequest_TxnIsolation() {
486506
conn := example_connect()
487507
defer conn.Close()
488508

509+
if conn.IsProtocolFeatureSupported(ProtocolFeatureStreams) != true {
510+
fmt.Printf("Streams are not supported")
511+
return
512+
}
513+
489514
stream, _ := conn.NewStream()
490515

516+
if conn.IsProtocolFeatureSupported(ProtocolFeatureTransactions) != true {
517+
fmt.Printf("Transactions are not supported")
518+
return
519+
}
520+
491521
// Begin transaction
492522
req = tarantool.NewBeginRequest().
493523
TxnIsolation(tarantool.ReadConfirmedLevel).

Diff for: protocol.go

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package tarantool
2+
3+
type ProtocolVersion uint64
4+
type ProtocolFeature uint64
5+
6+
type protocolInfo struct {
7+
version ProtocolVersion
8+
features []ProtocolFeature
9+
}
10+
11+
const ProtocolVersionUnsupported ProtocolVersion = 0
12+
13+
const (
14+
// Streams support.
15+
ProtocolFeatureStreams ProtocolFeature = 0
16+
// Interactive tranactions support.
17+
ProtocolFeatureTransactions ProtocolFeature = 1
18+
// Support of MP_ERROR object over MessagePack.
19+
ProtocolFeatureErrorExtension ProtocolFeature = 2
20+
// Support of watchers.
21+
ProtocolFeatureWatchers ProtocolFeature = 3
22+
)
23+
24+
// Protocol version supported by connector. Version 3
25+
// was introduced in Tarantool 2.10.0 and used in latest 2.10.4.
26+
const ClientProtocolVersion ProtocolVersion = 3
27+
28+
// Protocol features supported by connector.
29+
var ClientProtocolFeatures = []ProtocolFeature{
30+
ProtocolFeatureStreams,
31+
ProtocolFeatureTransactions,
32+
}
33+
34+
func isFeatureSupported(feature ProtocolFeature, supportedFeatures []ProtocolFeature) bool {
35+
// It seems that iterating over a small list is way faster
36+
// than building a map: https://stackoverflow.com/a/52710077/11646599
37+
for _, supportedFeature := range supportedFeatures {
38+
if feature == supportedFeature {
39+
return true
40+
}
41+
}
42+
43+
return false
44+
}

Diff for: request.go

+65
Original file line numberDiff line numberDiff line change
@@ -1106,3 +1106,68 @@ func (req *ExecuteRequest) Context(ctx context.Context) *ExecuteRequest {
11061106
req.ctx = ctx
11071107
return req
11081108
}
1109+
1110+
// protocolInfoRequest informs the server about supported protocol
1111+
// version and features.
1112+
type protocolInfoRequest struct {
1113+
baseRequest
1114+
protocolInfo
1115+
}
1116+
1117+
// newProtocolInfoRequest returns a new protocolInfoRequest.
1118+
func newProtocolInfoRequest(protocolVersion ProtocolVersion,
1119+
protocolFeatures []ProtocolFeature) *protocolInfoRequest {
1120+
req := new(protocolInfoRequest)
1121+
req.requestCode = IdRequestCode
1122+
req.version = protocolVersion
1123+
req.features = protocolFeatures
1124+
return req
1125+
}
1126+
1127+
// exchangeProtocolInfo sends info about client protocol
1128+
// and receives info about server protocol in response.
1129+
func (conn *Connection) exchangeProtocolInfo(version ProtocolVersion,
1130+
features []ProtocolFeature) (resp *Response, err error) {
1131+
req := newProtocolInfoRequest(version, features)
1132+
return conn.Do(req).Get()
1133+
}
1134+
1135+
// Body fills an encoder with the protocol version request body.
1136+
func (req *protocolInfoRequest) Body(res SchemaResolver, enc *encoder) error {
1137+
return req.fillProtocolInfoRequest(enc)
1138+
}
1139+
1140+
// Context sets a passed context to the request.
1141+
//
1142+
// Pay attention that when using context with request objects,
1143+
// the timeout option for Connection does not affect the lifetime
1144+
// of the request. For those purposes use context.WithTimeout() as
1145+
// the root context.
1146+
func (req *protocolInfoRequest) Context(ctx context.Context) *protocolInfoRequest {
1147+
req.ctx = ctx
1148+
return req
1149+
}
1150+
1151+
func (req *protocolInfoRequest) fillProtocolInfoRequest(enc *encoder) error {
1152+
enc.EncodeMapLen(2)
1153+
1154+
encodeUint(enc, KeyVersion)
1155+
if err := enc.Encode(req.version); err != nil {
1156+
return err
1157+
}
1158+
1159+
encodeUint(enc, KeyFeatures)
1160+
1161+
t := len(req.features)
1162+
if err := enc.EncodeArrayLen(t); err != nil {
1163+
return err
1164+
}
1165+
1166+
for _, feature := range req.features {
1167+
if err := enc.Encode(feature); err != nil {
1168+
return err
1169+
}
1170+
}
1171+
1172+
return nil
1173+
}

Diff for: response.go

+31-1
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,15 @@ func (resp *Response) decodeBody() (err error) {
147147
offset := resp.buf.Offset()
148148
defer resp.buf.Seek(offset)
149149

150-
var l int
150+
var l, larr int
151151
var stmtID, bindCount uint64
152+
var serverProtocolInfo protocolInfo = protocolInfo{
153+
version: ProtocolVersionUnsupported,
154+
features: []ProtocolFeature{},
155+
}
156+
157+
var protocolFeature ProtocolFeature
158+
isProtocolInfoResponse := false
152159

153160
d := newDecoder(&resp.buf)
154161

@@ -190,6 +197,24 @@ func (resp *Response) decodeBody() (err error) {
190197
if bindCount, err = d.DecodeUint64(); err != nil {
191198
return err
192199
}
200+
case KeyVersion:
201+
isProtocolInfoResponse = true
202+
if err = d.Decode(&serverProtocolInfo.version); err != nil {
203+
return err
204+
}
205+
case KeyFeatures:
206+
isProtocolInfoResponse = true
207+
if larr, err = d.DecodeArrayLen(); err != nil {
208+
return err
209+
}
210+
211+
serverProtocolInfo.features = make([]ProtocolFeature, larr)
212+
for i := 0; i < larr; i++ {
213+
if err = d.Decode(&protocolFeature); err != nil {
214+
return err
215+
}
216+
serverProtocolInfo.features[i] = protocolFeature
217+
}
193218
default:
194219
if err = d.Skip(); err != nil {
195220
return err
@@ -204,6 +229,11 @@ func (resp *Response) decodeBody() (err error) {
204229
}
205230
resp.Data = []interface{}{stmt}
206231
}
232+
233+
if isProtocolInfoResponse {
234+
resp.Data = []interface{}{serverProtocolInfo}
235+
}
236+
207237
if resp.Code != OkCode && resp.Code != PushCode {
208238
resp.Code &^= ErrorCodeBit
209239
err = Error{resp.Code, resp.Error}

Diff for: tarantool_test.go

+44
Original file line numberDiff line numberDiff line change
@@ -2830,6 +2830,50 @@ func TestStream_DoWithClosedConn(t *testing.T) {
28302830
}
28312831
}
28322832

2833+
func TestConnectionProtocolInfo(t *testing.T) {
2834+
conn := test_helpers.ConnectWithValidation(t, server, opts)
2835+
2836+
isProtocolInfoUnsupported, err := test_helpers.IsTarantoolVersionLess(2, 10, 0)
2837+
if err != nil {
2838+
t.Fatalf("Unexpected error has been caught: %s", err.Error())
2839+
}
2840+
// First Tarantool protocol version (1) was introduced between
2841+
// 2.10.0-beta1 and 2.10.0-beta2. Versions 2 and 3 were also
2842+
// introduced between 2.10.0-beta1 and 2.10.0-beta2. Version 4
2843+
// was introduced between 2.10.0-beta2 and 2.10.0-rc1 and reverted
2844+
// back to version 3 in the same version interval.
2845+
// So each release Tarantool >= 2.10 have protocol version >= 3.
2846+
// (Tarantool 2.10.4 still has version 3.)
2847+
minProtocolVersion := ProtocolVersion(1)
2848+
tarantool210ProtocolVersion := ProtocolVersion(3)
2849+
2850+
if isProtocolInfoUnsupported {
2851+
require.Equal(t, conn.IsProtocolVersionSupported(minProtocolVersion), false)
2852+
require.Equal(t, conn.IsProtocolFeatureSupported(ProtocolFeatureStreams), false)
2853+
require.Equal(t, conn.IsProtocolFeatureSupported(ProtocolFeatureTransactions), false)
2854+
require.Equal(t, conn.IsProtocolFeatureSupported(ProtocolFeatureErrorExtension), false)
2855+
require.Equal(t, conn.IsProtocolFeatureSupported(ProtocolFeatureWatchers), false)
2856+
} else {
2857+
require.Equal(t, conn.IsProtocolVersionSupported(tarantool210ProtocolVersion), true)
2858+
require.Equal(t, conn.IsProtocolFeatureSupported(ProtocolFeatureStreams), true)
2859+
require.Equal(t, conn.IsProtocolFeatureSupported(ProtocolFeatureTransactions), true)
2860+
require.Equal(t, conn.IsProtocolFeatureSupported(ProtocolFeatureErrorExtension), false)
2861+
require.Equal(t, conn.IsProtocolFeatureSupported(ProtocolFeatureWatchers), false)
2862+
}
2863+
}
2864+
2865+
func BenchmarkConnectionProtocolFeature(b *testing.B) {
2866+
conn := test_helpers.ConnectWithValidation(b, server, opts)
2867+
defer conn.Close()
2868+
2869+
b.ResetTimer()
2870+
b.RunParallel(func(pb *testing.PB) {
2871+
for pb.Next() {
2872+
_ = conn.IsProtocolFeatureSupported(ProtocolFeatureStreams)
2873+
}
2874+
})
2875+
}
2876+
28332877
// runTestMain is a body of TestMain function
28342878
// (see https://pkg.go.dev/testing#hdr-Main).
28352879
// Using defer + os.Exit is not works so TestMain body

0 commit comments

Comments
 (0)