Skip to content

Commit 2de4249

Browse files
iproto: support feature discovery
Since version 2.10.0 Tarantool supports feature discovery [1]. Client can send the schema version and supported features and receive server-side schema version and supported features information to tune its behavior. After this patch, the request will be send on `Connect`. Connector will use protocol version that is minimal of connector version (now it's 3) and server version. Feature will be enabled if both client and server supports it (for now client does not support any features from the list). Unknown request type error response is expected for pre-2.10.0 versions. In this case, protocol version would be ProtocolVersionUnsupported (-1) and no features would be enabled. For now it doesn't seems like exposing protocol version and enabled features is useful for a client so private variables are used to store this info. Getters added in export_test for tests. Traces of IPROTO_FEATURE_GRACEFUL_SHUTDOWN flag and protocol version 4 could be found in Tarantool source code but they were removed in the following commits before the release and treated like they never existed. We also ignore them here too. See [2] for more info. 1. tarantool/tarantool#6253 2. tarantool/tarantool-python#262 Closes #120
1 parent 48cf0c7 commit 2de4249

File tree

6 files changed

+227
-8
lines changed

6 files changed

+227
-8
lines changed

Diff for: connection.go

+74
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ const (
3535
type ConnEventKind int
3636
type ConnLogKind int
3737

38+
var clientProtocolVersion int64 = 3
39+
var clientFeatures []uint64 = []uint64{}
40+
3841
const (
3942
// Connected signals that connection is established or reestablished.
4043
Connected ConnEventKind = iota + 1
@@ -146,6 +149,13 @@ type Connection struct {
146149
lenbuf [PacketLengthBytes]byte
147150

148151
lastStreamId uint64
152+
153+
// protocolVersion is IProto max protocol version supported both by
154+
// client and server. Equal to ProtocolVersionUnsupported is server
155+
// does not support IPROTO_ID.
156+
protocolVersion int64
157+
// features contains the list of features supported both by client and server.
158+
features []uint64
149159
}
150160

151161
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
@@ -391,6 +401,10 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
391401
}
392402
}
393403

404+
if err = conn.checkFeatures(); err != nil {
405+
return nil, err
406+
}
407+
394408
return conn, err
395409
}
396410

@@ -1163,3 +1177,63 @@ func (conn *Connection) NewStream() (*Stream, error) {
11631177
Conn: conn,
11641178
}, nil
11651179
}
1180+
1181+
func (conn *Connection) checkFeatures() error {
1182+
resp, err := conn.exchangeProtocolVersion(clientProtocolVersion, clientFeatures)
1183+
1184+
if err != nil {
1185+
if resp.Code == ErUnknownRequestType {
1186+
// IPROTO_ID is not supported by server.
1187+
conn.protocolVersion = ProtocolVersionUnsupported
1188+
conn.features = []uint64{}
1189+
return nil
1190+
}
1191+
1192+
return err
1193+
}
1194+
1195+
if len(resp.Data) == 0 {
1196+
return fmt.Errorf("Unexpected response on protocol version exchange: no data")
1197+
}
1198+
1199+
server, ok := resp.Data[0].(*protocolVersionResponse)
1200+
if !ok {
1201+
return fmt.Errorf("Unexpected response on protocol version exchange: wrong data")
1202+
}
1203+
1204+
conn.protocolVersion = minInt64(server.protocolVersion, clientProtocolVersion)
1205+
conn.features = intersect(server.features, clientFeatures)
1206+
1207+
return nil
1208+
}
1209+
1210+
func intersect(arr1 []uint64, arr2 []uint64) []uint64 {
1211+
var res []uint64 = make([]uint64, minInt(len(arr1), len(arr2)))
1212+
var i int = 0
1213+
1214+
for _, el1 := range arr1 {
1215+
for _, el2 := range arr2 {
1216+
if el1 == el2 {
1217+
res[i] = (el1)
1218+
}
1219+
}
1220+
}
1221+
1222+
return res[0:i]
1223+
}
1224+
1225+
func minInt(v1 int, v2 int) int {
1226+
if v1 > v2 {
1227+
return v2
1228+
}
1229+
1230+
return v1
1231+
}
1232+
1233+
func minInt64(v1 int64, v2 int64) int64 {
1234+
if v1 > v2 {
1235+
return v2
1236+
}
1237+
1238+
return v1
1239+
}

Diff for: const.go

+17-6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const (
1818
RollbackRequestCode = 16
1919
PingRequestCode = 64
2020
SubscribeRequestCode = 66
21+
IdRequestCode = 73
2122

2223
KeyCode = 0x00
2324
KeySync = 0x01
@@ -41,6 +42,8 @@ const (
4142
KeySQLBind = 0x41
4243
KeySQLInfo = 0x42
4344
KeyStmtID = 0x43
45+
KeyVersion = 0x54
46+
KeyFeatures = 0x55
4447
KeyTimeout = 0x56
4548
KeyTxnIsolation = 0x59
4649

@@ -69,10 +72,18 @@ const (
6972
RLimitDrop = 1
7073
RLimitWait = 2
7174

72-
OkCode = uint32(0)
73-
PushCode = uint32(0x80)
74-
ErrorCodeBit = 0x8000
75-
PacketLengthBytes = 5
76-
ErSpaceExistsCode = 0xa
77-
IteratorCode = 0x14
75+
FeatureStreams = uint64(0)
76+
FeatureTransactions = uint64(1)
77+
FeatureErrorExtension = uint64(2)
78+
FeatureWatchers = uint64(3)
79+
80+
OkCode = uint32(0)
81+
PushCode = uint32(0x80)
82+
ErrorCodeBit = 0x8000
83+
PacketLengthBytes = 5
84+
ErSpaceExistsCode = 0xa
85+
ErUnknownRequestType = 0x30
86+
IteratorCode = 0x14
87+
88+
ProtocolVersionUnsupported = int64(-1)
7889
)

Diff for: export_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -114,3 +114,11 @@ func RefImplRollbackBody(enc *encoder) error {
114114
func NewEncoder(w io.Writer) *encoder {
115115
return newEncoder(w)
116116
}
117+
118+
func GetProtocolVersion(conn *Connection) int64 {
119+
return conn.protocolVersion
120+
}
121+
122+
func GetFeatures(conn *Connection) []uint64 {
123+
return conn.features
124+
}

Diff for: request.go

+66
Original file line numberDiff line numberDiff line change
@@ -1106,3 +1106,69 @@ func (req *ExecuteRequest) Context(ctx context.Context) *ExecuteRequest {
11061106
req.ctx = ctx
11071107
return req
11081108
}
1109+
1110+
// protocolVersionRequest informs the server about supported protocol
1111+
// version and features.
1112+
type protocolVersionRequest struct {
1113+
baseRequest
1114+
// TODO: or uint32? spaceId is uint32. On the other hand,
1115+
// spaceId is casted to uint64 underneath and protocolVersionRequest
1116+
// is not a request user would send explicitly.
1117+
// Need to ask Oleg about it.
1118+
protocolVersion int64
1119+
features []uint64
1120+
}
1121+
1122+
// newProtocolVersionRequest returns a new protocolVersionRequest.
1123+
func newProtocolVersionRequest(protocolVersion int64,
1124+
features []uint64) *protocolVersionRequest {
1125+
req := new(protocolVersionRequest)
1126+
req.requestCode = IdRequestCode
1127+
req.protocolVersion = protocolVersion
1128+
req.features = features
1129+
return req
1130+
}
1131+
1132+
// exchangeProtocolVersion sends info about client protocol
1133+
// and receives info about server protocol in response.
1134+
func (conn *Connection) exchangeProtocolVersion(protocolVersion int64,
1135+
features []uint64) (resp *Response, err error) {
1136+
req := newProtocolVersionRequest(protocolVersion, features)
1137+
return conn.Do(req).Get()
1138+
}
1139+
1140+
// Body fills an encoder with the protocol version request body.
1141+
func (req *protocolVersionRequest) Body(res SchemaResolver, enc *encoder) error {
1142+
return req.fillProtocolVersionRequest(enc)
1143+
}
1144+
1145+
// Context sets a passed context to the request.
1146+
//
1147+
// Pay attention that when using context with request objects,
1148+
// the timeout option for Connection does not affect the lifetime
1149+
// of the request. For those purposes use context.WithTimeout() as
1150+
// the root context.
1151+
func (req *protocolVersionRequest) Context(ctx context.Context) *protocolVersionRequest {
1152+
req.ctx = ctx
1153+
return req
1154+
}
1155+
1156+
func (req *protocolVersionRequest) fillProtocolVersionRequest(enc *encoder) error {
1157+
enc.EncodeMapLen(2)
1158+
1159+
encodeUint(enc, KeyVersion)
1160+
encodeInt(enc, req.protocolVersion)
1161+
1162+
encodeUint(enc, KeyFeatures)
1163+
1164+
t := len(req.features)
1165+
if err := enc.EncodeArrayLen(t); err != nil {
1166+
return err
1167+
}
1168+
1169+
for _, v := range req.features {
1170+
encodeUint(enc, v)
1171+
}
1172+
1173+
return nil
1174+
}

Diff for: response.go

+36-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ type SQLInfo struct {
2929
InfoAutoincrementIds []uint64
3030
}
3131

32+
type protocolVersionResponse struct {
33+
protocolVersion int64
34+
features []uint64
35+
}
36+
3237
func (meta *ColumnMetaData) DecodeMsgpack(d *decoder) error {
3338
var err error
3439
var l int
@@ -147,8 +152,11 @@ func (resp *Response) decodeBody() (err error) {
147152
offset := resp.buf.Offset()
148153
defer resp.buf.Seek(offset)
149154

150-
var l int
151-
var stmtID, bindCount uint64
155+
var l, larr int
156+
var stmtID, bindCount, feature uint64
157+
var protocolVersion int64
158+
var features []uint64
159+
isProtocolVersionResponse := false
152160

153161
d := newDecoder(&resp.buf)
154162

@@ -190,6 +198,23 @@ func (resp *Response) decodeBody() (err error) {
190198
if bindCount, err = d.DecodeUint64(); err != nil {
191199
return err
192200
}
201+
case KeyVersion:
202+
isProtocolVersionResponse = true
203+
if protocolVersion, err = d.DecodeInt64(); err != nil {
204+
return err
205+
}
206+
case KeyFeatures:
207+
isProtocolVersionResponse = true
208+
if larr, err = d.DecodeArrayLen(); err != nil {
209+
return err
210+
}
211+
features = make([]uint64, larr)
212+
for i := 0; i < larr; i++ {
213+
if feature, err = d.DecodeUint64(); err != nil {
214+
return err
215+
}
216+
features[i] = feature
217+
}
193218
default:
194219
if err = d.Skip(); err != nil {
195220
return err
@@ -204,6 +229,15 @@ func (resp *Response) decodeBody() (err error) {
204229
}
205230
resp.Data = []interface{}{stmt}
206231
}
232+
233+
if isProtocolVersionResponse {
234+
data := &protocolVersionResponse{
235+
protocolVersion: protocolVersion,
236+
features: features,
237+
}
238+
resp.Data = []interface{}{data}
239+
}
240+
207241
if resp.Code != OkCode && resp.Code != PushCode {
208242
resp.Code &^= ErrorCodeBit
209243
err = Error{resp.Code, resp.Error}

Diff for: tarantool_test.go

+26
Original file line numberDiff line numberDiff line change
@@ -2830,6 +2830,32 @@ func TestStream_DoWithClosedConn(t *testing.T) {
28302830
}
28312831
}
28322832

2833+
func TestConnectionProtocolVersion(t *testing.T) {
2834+
conn := test_helpers.ConnectWithValidation(t, server, opts)
2835+
2836+
isProtocolVersionUnsupported, err := test_helpers.IsTarantoolVersionLess(2, 10, 0)
2837+
if err != nil {
2838+
t.Fatalf("Unexpected error has been caught: %s", err.Error())
2839+
}
2840+
2841+
protocolVersion := GetProtocolVersion(conn)
2842+
features := GetFeatures(conn)
2843+
2844+
if isProtocolVersionUnsupported {
2845+
require.Equal(t, protocolVersion, ProtocolVersionUnsupported)
2846+
require.ElementsMatch(t, features, []uint64{})
2847+
} else {
2848+
// First Tarantool protocol version (1) was introduced between
2849+
// 2.10.0-beta1 and 2.10.0-beta2. Versions 2 and 3 were also
2850+
// introduced between 2.10.0-beta1 and 2.10.0-beta2. Version 4
2851+
// was introduced between 2.10.0-beta2 and 2.10.0-rc1 and reverted
2852+
// back to version 3 in the same version interval.
2853+
// Tarantool 2.10.3 still has version 3.
2854+
require.GreaterOrEqual(t, protocolVersion, int64(3))
2855+
require.ElementsMatch(t, features, []uint64{})
2856+
}
2857+
}
2858+
28332859
// runTestMain is a body of TestMain function
28342860
// (see https://pkg.go.dev/testing#hdr-Main).
28352861
// Using defer + os.Exit is not works so TestMain body

0 commit comments

Comments
 (0)