Skip to content

Commit d1b31c5

Browse files
api: support iproto feature discovery
Since version 2.10.0 Tarantool supports feature discovery [1]. Client can send the schema version and supported features and receive server-side schema version and supported features information to tune its behavior. After this patch, the request will be send on `dial`, where authentication is performed. Connector stores server info in connection internals. After that, user may call API handles to check if it is possible to use a feature. Feature check iterates over lists to check if feature is enabled. It seems that iterating over a small list is way faster than building a map, see [2]. Benchmark tests show that this check is rather fast (0.5 ns for client and server check on HP ProBook 440 G5) so it is not necessary to cache it in any way. Traces of IPROTO_FEATURE_GRACEFUL_SHUTDOWN flag and protocol version 4 could be found in Tarantool source code but they were removed in the following commits before the release and treated like they never existed. We also ignore them here too. See [3] for more info. 1. tarantool/tarantool#6253 2. https://stackoverflow.com/a/52710077/11646599 3. tarantool/tarantool-python#262 Closes #120
1 parent f05dac4 commit d1b31c5

File tree

7 files changed

+278
-6
lines changed

7 files changed

+278
-6
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1010

1111
### Added
1212

13+
- Support IProto feature discovery (#120).
14+
1315
### Changed
1416

1517
### Fixed

connection.go

+92-5
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,9 @@ type Connection struct {
146146
lenbuf [PacketLengthBytes]byte
147147

148148
lastStreamId uint64
149+
150+
serverProtocolVersion ProtocolVersion
151+
serverFeatures []Feature
149152
}
150153

151154
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
@@ -502,6 +505,13 @@ func (conn *Connection) dial() (err error) {
502505
conn.Greeting.Version = bytes.NewBuffer(greeting[:64]).String()
503506
conn.Greeting.auth = bytes.NewBuffer(greeting[64:108]).String()
504507

508+
// IPROTO_ID requests can be processed without authentication.
509+
// https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/requests/#iproto-id
510+
if err = conn.loadProtocolInfo(w, r); err != nil {
511+
connection.Close()
512+
return err
513+
}
514+
505515
// Auth
506516
if opts.User != "" {
507517
scr, err := scramble(conn.Greeting.auth, opts.Pass)
@@ -603,33 +613,43 @@ func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err
603613
return conn.writeRequest(w, req, "auth")
604614
}
605615

606-
func (conn *Connection) readResponse(r io.Reader) (resp Response, err error) {
616+
func (conn *Connection) writeProtocolInfoRequest(w *bufio.Writer, version ProtocolVersion,
617+
features []Feature) (err error) {
618+
req := newProtocolInfoRequest(version, features)
619+
return conn.writeRequest(w, req, "iproto id")
620+
}
621+
622+
func (conn *Connection) readResponse(r io.Reader, reqName string) (resp Response, err error) {
607623
respBytes, err := conn.read(r)
608624
if err != nil {
609-
return resp, errors.New("auth: read error " + err.Error())
625+
return resp, errors.New(reqName + ": read error " + err.Error())
610626
}
611627
resp = Response{buf: smallBuf{b: respBytes}}
612628
err = resp.decodeHeader(conn.dec)
613629
if err != nil {
614-
return resp, errors.New("auth: decode response header error " + err.Error())
630+
return resp, errors.New(reqName + ": decode response header error " + err.Error())
615631
}
616632
err = resp.decodeBody()
617633
if err != nil {
618634
switch err.(type) {
619635
case Error:
620636
return resp, err
621637
default:
622-
return resp, errors.New("auth: decode response body error " + err.Error())
638+
return resp, errors.New(reqName + ": decode response body error " + err.Error())
623639
}
624640
}
625641
return resp, nil
626642
}
627643

628644
func (conn *Connection) readAuthResponse(r io.Reader) (err error) {
629-
_, err = conn.readResponse(r)
645+
_, err = conn.readResponse(r, "auth")
630646
return err
631647
}
632648

649+
func (conn *Connection) readProtocolInfoResponse(r io.Reader) (resp Response, err error) {
650+
return conn.readResponse(r, "iproto id")
651+
}
652+
633653
func (conn *Connection) createConnection(reconnect bool) (err error) {
634654
var reconnects uint
635655
for conn.c == nil && conn.state == connDisconnected {
@@ -1173,3 +1193,70 @@ func (conn *Connection) NewStream() (*Stream, error) {
11731193
Conn: conn,
11741194
}, nil
11751195
}
1196+
1197+
// loadProtocolInfo sends info about client protocol,
1198+
// receives info about server protocol in response
1199+
// and store in in connection serverProtocolInfo.
1200+
func (conn *Connection) loadProtocolInfo(w *bufio.Writer, r *bufio.Reader) error {
1201+
var resp Response
1202+
var err error
1203+
1204+
err = conn.writeProtocolInfoRequest(w, ClientProtocolVersion, ClientFeatures)
1205+
if err != nil {
1206+
return err
1207+
}
1208+
1209+
resp, err = conn.readProtocolInfoResponse(r)
1210+
if err != nil {
1211+
tarantoolError, ok := err.(Error)
1212+
if ok && tarantoolError.Code == ErrUnknownRequestType {
1213+
// IPROTO_ID requests are not supported by server.
1214+
conn.serverProtocolVersion = ProtocolVersionUnsupported
1215+
conn.serverFeatures = []Feature{}
1216+
1217+
return nil
1218+
}
1219+
1220+
return err
1221+
}
1222+
1223+
if len(resp.Data) == 0 {
1224+
return fmt.Errorf("Unexpected response on protocol info exchange: no data")
1225+
}
1226+
1227+
serverProtocolInfo, ok := resp.Data[0].(protocolInfo)
1228+
if !ok {
1229+
return fmt.Errorf("Unexpected response on protocol info exchange: wrong data")
1230+
}
1231+
conn.serverProtocolVersion = serverProtocolInfo.version
1232+
conn.serverFeatures = serverProtocolInfo.features
1233+
1234+
return nil
1235+
}
1236+
1237+
// ServerProtocolVersion returns protocol version supported by
1238+
// connected Tarantool server.
1239+
// Since 1.10.0
1240+
func (conn *Connection) ServerProtocolVersion() ProtocolVersion {
1241+
return conn.serverProtocolVersion
1242+
}
1243+
1244+
// ClientProtocolVersion returns protocol version supported by Go connector.
1245+
// Since 1.10.0
1246+
func (conn *Connection) ClientProtocolVersion() ProtocolVersion {
1247+
return ClientProtocolVersion
1248+
}
1249+
1250+
// IsServerSupportsFeature checks if expected feature
1251+
// is supported by connected Tarantool server.
1252+
// Since 1.10.0
1253+
func (conn *Connection) IsServerSupportsFeature(feature Feature) bool {
1254+
return isFeatureSupported(feature, conn.serverFeatures)
1255+
}
1256+
1257+
// IsServerSupportsFeature checks if expected feature
1258+
// is supported by Go connector.
1259+
// Since 1.10.0
1260+
func (conn *Connection) IsClientSupportsFeature(feature Feature) bool {
1261+
return isFeatureSupported(feature, ClientFeatures)
1262+
}

const.go

+3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const (
1818
RollbackRequestCode = 16
1919
PingRequestCode = 64
2020
SubscribeRequestCode = 66
21+
IdRequestCode = 73
2122

2223
KeyCode = 0x00
2324
KeySync = 0x01
@@ -41,6 +42,8 @@ const (
4142
KeySQLBind = 0x41
4243
KeySQLInfo = 0x42
4344
KeyStmtID = 0x43
45+
KeyVersion = 0x54
46+
KeyFeatures = 0x55
4447
KeyTimeout = 0x56
4548
KeyTxnIsolation = 0x59
4649

protocol.go

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package tarantool
2+
3+
type ProtocolVersion uint64
4+
type Feature uint64
5+
6+
type protocolInfo struct {
7+
version ProtocolVersion
8+
features []Feature
9+
}
10+
11+
const ProtocolVersionUnsupported ProtocolVersion = 0
12+
13+
const (
14+
// Streams support.
15+
FeatureStreams Feature = 0
16+
// Interactive transactions support.
17+
FeatureTransactions Feature = 1
18+
// Support of MP_ERROR object over MessagePack.
19+
FeatureErrorExtension Feature = 2
20+
// Support of watchers.
21+
FeatureWatchers Feature = 3
22+
)
23+
24+
// Protocol version supported by connector. Version 3
25+
// was introduced in Tarantool 2.10.0 and used in latest 2.10.4.
26+
const ClientProtocolVersion ProtocolVersion = 3
27+
28+
// Protocol features supported by connector.
29+
var ClientFeatures = []Feature{
30+
FeatureStreams,
31+
FeatureTransactions,
32+
}
33+
34+
func isFeatureSupported(feature Feature, supportedFeatures []Feature) bool {
35+
// It seems that iterating over a small list is way faster
36+
// than building a map: https://stackoverflow.com/a/52710077/11646599
37+
for _, supportedFeature := range supportedFeatures {
38+
if feature == supportedFeature {
39+
return true
40+
}
41+
}
42+
43+
return false
44+
}

request.go

+57
Original file line numberDiff line numberDiff line change
@@ -1106,3 +1106,60 @@ func (req *ExecuteRequest) Context(ctx context.Context) *ExecuteRequest {
11061106
req.ctx = ctx
11071107
return req
11081108
}
1109+
1110+
// protocolInfoRequest informs the server about supported protocol
1111+
// version and features.
1112+
type protocolInfoRequest struct {
1113+
baseRequest
1114+
protocolInfo
1115+
}
1116+
1117+
// newProtocolInfoRequest returns a new protocolInfoRequest.
1118+
func newProtocolInfoRequest(protocolVersion ProtocolVersion,
1119+
protocolFeatures []ProtocolFeature) *protocolInfoRequest {
1120+
req := new(protocolInfoRequest)
1121+
req.requestCode = IdRequestCode
1122+
req.version = protocolVersion
1123+
req.features = protocolFeatures
1124+
return req
1125+
}
1126+
1127+
// Body fills an encoder with the protocol version request body.
1128+
func (req *protocolInfoRequest) Body(res SchemaResolver, enc *encoder) error {
1129+
return req.fillProtocolInfoRequest(enc)
1130+
}
1131+
1132+
// Context sets a passed context to the request.
1133+
//
1134+
// Pay attention that when using context with request objects,
1135+
// the timeout option for Connection does not affect the lifetime
1136+
// of the request. For those purposes use context.WithTimeout() as
1137+
// the root context.
1138+
func (req *protocolInfoRequest) Context(ctx context.Context) *protocolInfoRequest {
1139+
req.ctx = ctx
1140+
return req
1141+
}
1142+
1143+
func (req *protocolInfoRequest) fillProtocolInfoRequest(enc *encoder) error {
1144+
enc.EncodeMapLen(2)
1145+
1146+
encodeUint(enc, KeyVersion)
1147+
if err := enc.Encode(req.version); err != nil {
1148+
return err
1149+
}
1150+
1151+
encodeUint(enc, KeyFeatures)
1152+
1153+
t := len(req.features)
1154+
if err := enc.EncodeArrayLen(t); err != nil {
1155+
return err
1156+
}
1157+
1158+
for _, feature := range req.features {
1159+
if err := enc.Encode(feature); err != nil {
1160+
return err
1161+
}
1162+
}
1163+
1164+
return nil
1165+
}

response.go

+30-1
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,14 @@ func (resp *Response) decodeBody() (err error) {
147147
offset := resp.buf.Offset()
148148
defer resp.buf.Seek(offset)
149149

150-
var l int
150+
var l, larr int
151151
var stmtID, bindCount uint64
152+
var serverProtocolInfo protocolInfo = protocolInfo{
153+
version: ProtocolVersionUnsupported,
154+
features: []ProtocolFeature{},
155+
}
156+
var protocolFeature ProtocolFeature
157+
isProtocolInfoResponse := false
152158

153159
d := newDecoder(&resp.buf)
154160

@@ -190,6 +196,24 @@ func (resp *Response) decodeBody() (err error) {
190196
if bindCount, err = d.DecodeUint64(); err != nil {
191197
return err
192198
}
199+
case KeyVersion:
200+
isProtocolInfoResponse = true
201+
if err = d.Decode(&serverProtocolInfo.version); err != nil {
202+
return err
203+
}
204+
case KeyFeatures:
205+
isProtocolInfoResponse = true
206+
if larr, err = d.DecodeArrayLen(); err != nil {
207+
return err
208+
}
209+
210+
serverProtocolInfo.features = make([]ProtocolFeature, larr)
211+
for i := 0; i < larr; i++ {
212+
if err = d.Decode(&protocolFeature); err != nil {
213+
return err
214+
}
215+
serverProtocolInfo.features[i] = protocolFeature
216+
}
193217
default:
194218
if err = d.Skip(); err != nil {
195219
return err
@@ -204,6 +228,11 @@ func (resp *Response) decodeBody() (err error) {
204228
}
205229
resp.Data = []interface{}{stmt}
206230
}
231+
232+
if isProtocolInfoResponse {
233+
resp.Data = []interface{}{serverProtocolInfo}
234+
}
235+
207236
if resp.Code != OkCode && resp.Code != PushCode {
208237
resp.Code &^= ErrorCodeBit
209238
err = Error{resp.Code, resp.Error}

tarantool_test.go

+50
Original file line numberDiff line numberDiff line change
@@ -2830,6 +2830,56 @@ func TestStream_DoWithClosedConn(t *testing.T) {
28302830
}
28312831
}
28322832

2833+
func TestConnectionProtocolInfo(t *testing.T) {
2834+
conn := test_helpers.ConnectWithValidation(t, server, opts)
2835+
2836+
isLess210, err := test_helpers.IsTarantoolVersionLess(2, 10, 0)
2837+
if err != nil {
2838+
t.Fatalf("Unexpected error has been caught: %s", err.Error())
2839+
}
2840+
// First Tarantool protocol version (1) was introduced between
2841+
// 2.10.0-beta1 and 2.10.0-beta2. Versions 2 and 3 were also
2842+
// introduced between 2.10.0-beta1 and 2.10.0-beta2. Version 4
2843+
// was introduced between 2.10.0-beta2 and 2.10.0-rc1 and reverted
2844+
// back to version 3 in the same version interval.
2845+
// So each release Tarantool >= 2.10 have protocol version >= 3.
2846+
// (Tarantool 2.10.4 still has version 3.)
2847+
tarantool210ProtocolVersion := ProtocolVersion(3)
2848+
2849+
require.Equal(t, conn.ClientProtocolVersion(), ProtocolVersion(3))
2850+
require.Equal(t, conn.IsClientSupportsFeature(FeatureStreams), true)
2851+
require.Equal(t, conn.IsClientSupportsFeature(FeatureTransactions), true)
2852+
require.Equal(t, conn.IsClientSupportsFeature(FeatureErrorExtension), false)
2853+
require.Equal(t, conn.IsClientSupportsFeature(FeatureWatchers), false)
2854+
2855+
if isLess210 {
2856+
require.Equal(t, conn.ServerProtocolVersion(), ProtocolVersionUnsupported)
2857+
require.Equal(t, conn.IsServerSupportsFeature(FeatureStreams), false)
2858+
require.Equal(t, conn.IsServerSupportsFeature(FeatureTransactions), false)
2859+
require.Equal(t, conn.IsServerSupportsFeature(FeatureErrorExtension), false)
2860+
require.Equal(t, conn.IsServerSupportsFeature(FeatureWatchers), false)
2861+
} else {
2862+
require.GreaterOrEqual(t, conn.ServerProtocolVersion(), tarantool210ProtocolVersion)
2863+
require.Equal(t, conn.IsServerSupportsFeature(FeatureStreams), true)
2864+
require.Equal(t, conn.IsServerSupportsFeature(FeatureTransactions), true)
2865+
require.Equal(t, conn.IsServerSupportsFeature(FeatureErrorExtension), true)
2866+
require.Equal(t, conn.IsServerSupportsFeature(FeatureWatchers), true)
2867+
}
2868+
}
2869+
2870+
func BenchmarkConnectionFeature(b *testing.B) {
2871+
conn := test_helpers.ConnectWithValidation(b, server, opts)
2872+
defer conn.Close()
2873+
2874+
b.ResetTimer()
2875+
b.RunParallel(func(pb *testing.PB) {
2876+
for pb.Next() {
2877+
_ = conn.IsClientSupportsFeature(FeatureStreams)
2878+
_ = conn.IsServerSupportsFeature(FeatureStreams)
2879+
}
2880+
})
2881+
}
2882+
28332883
// runTestMain is a body of TestMain function
28342884
// (see https://pkg.go.dev/testing#hdr-Main).
28352885
// Using defer + os.Exit is not works so TestMain body

0 commit comments

Comments
 (0)