Skip to content

Commit c0ca261

Browse files
AnaNekoleg-jukovec
authored andcommitted
streams: interactive transactions and support
The main purpose of streams is transactions via iproto. Since v. 2.10.0, Tarantool supports streams and interactive transactions over them. Each stream can start its own transaction, so they allows multiplexing several transactions over one connection. API for this feature is the following: * `NewStream()` method to create a stream object for `Connection` and `NewStream(userMode Mode)` method to create a stream object for `ConnectionPool` * stream object `Stream` with `Do()` method and new request objects to work with stream, `BeginRequest` - start transaction via iproto stream; `CommitRequest` - commit transaction; `RollbackRequest` - rollback transaction. Closes #101
1 parent 1e37dc2 commit c0ca261

19 files changed

+1816
-33
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1717
- Support datetime type in msgpack (#118)
1818
- Prepared SQL statements (#117)
1919
- Context support for request objects (#48)
20+
- Streams and interactive transactions support (#101)
2021

2122
### Changed
2223

config.lua

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
-- able to send requests until everything is configured.
33
box.cfg{
44
work_dir = os.getenv("TEST_TNT_WORK_DIR"),
5+
memtx_use_mvcc_engine = os.getenv("TEST_TNT_MEMTX_USE_MVCC_ENGINE") == 'true' or nil,
56
}
67

78
box.once("init", function()

connection.go

+37-10
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
)
2121

2222
const requestsMap = 128
23+
const ignoreStreamId = 0
2324
const (
2425
connDisconnected = 0
2526
connConnected = 1
@@ -143,6 +144,8 @@ type Connection struct {
143144
state uint32
144145
dec *msgpack.Decoder
145146
lenbuf [PacketLengthBytes]byte
147+
148+
lastStreamId uint64
146149
}
147150

148151
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
@@ -528,16 +531,27 @@ func (conn *Connection) dial() (err error) {
528531
return
529532
}
530533

531-
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res SchemaResolver) (err error) {
534+
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
535+
req Request, streamId uint64, res SchemaResolver) (err error) {
532536
hl := h.Len()
533-
h.Write([]byte{
537+
538+
hMapLen := byte(0x82) // 2 element map.
539+
if streamId != ignoreStreamId {
540+
hMapLen = byte(0x83) // 3 element map.
541+
}
542+
hBytes := []byte{
534543
0xce, 0, 0, 0, 0, // Length.
535-
0x82, // 2 element map.
544+
hMapLen,
536545
KeyCode, byte(req.Code()), // Request code.
537546
KeySync, 0xce,
538547
byte(reqid >> 24), byte(reqid >> 16),
539548
byte(reqid >> 8), byte(reqid),
540-
})
549+
}
550+
if streamId != ignoreStreamId {
551+
hBytes = append(hBytes, KeyStreamId, byte(streamId))
552+
}
553+
554+
h.Write(hBytes)
541555

542556
if err = req.Body(res, enc); err != nil {
543557
return
@@ -555,7 +569,7 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res Sch
555569
func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
556570
var packet smallWBuf
557571
req := newAuthRequest(conn.opts.User, string(scramble))
558-
err = pack(&packet, msgpack.NewEncoder(&packet), 0, req, conn.Schema)
572+
err = pack(&packet, msgpack.NewEncoder(&packet), 0, req, ignoreStreamId, conn.Schema)
559573

560574
if err != nil {
561575
return errors.New("auth: pack error " + err.Error())
@@ -869,7 +883,7 @@ func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
869883
}
870884
}
871885

872-
func (conn *Connection) send(req Request) *Future {
886+
func (conn *Connection) send(req Request, streamId uint64) *Future {
873887
fut := conn.newFuture(req.Ctx())
874888
if fut.ready == nil {
875889
return fut
@@ -882,14 +896,14 @@ func (conn *Connection) send(req Request) *Future {
882896
default:
883897
}
884898
}
885-
conn.putFuture(fut, req)
899+
conn.putFuture(fut, req, streamId)
886900
if req.Ctx() != nil {
887901
go conn.contextWatchdog(fut, req.Ctx())
888902
}
889903
return fut
890904
}
891905

892-
func (conn *Connection) putFuture(fut *Future, req Request) {
906+
func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
893907
shardn := fut.requestId & (conn.opts.Concurrency - 1)
894908
shard := &conn.shard[shardn]
895909
shard.bufmut.Lock()
@@ -906,7 +920,7 @@ func (conn *Connection) putFuture(fut *Future, req Request) {
906920
}
907921
blen := shard.buf.Len()
908922
reqid := fut.requestId
909-
if err := pack(&shard.buf, shard.enc, reqid, req, conn.Schema); err != nil {
923+
if err := pack(&shard.buf, shard.enc, reqid, req, streamId, conn.Schema); err != nil {
910924
shard.buf.Trunc(blen)
911925
shard.bufmut.Unlock()
912926
if f := conn.fetchFuture(reqid); f == fut {
@@ -1095,7 +1109,7 @@ func (conn *Connection) Do(req Request) *Future {
10951109
default:
10961110
}
10971111
}
1098-
return conn.send(req)
1112+
return conn.send(req, ignoreStreamId)
10991113
}
11001114

11011115
// ConfiguredTimeout returns a timeout from connection config.
@@ -1121,3 +1135,16 @@ func (conn *Connection) NewPrepared(expr string) (*Prepared, error) {
11211135
}
11221136
return NewPreparedFromResponse(conn, resp)
11231137
}
1138+
1139+
// NewStream creates new Stream object for connection.
1140+
//
1141+
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
1142+
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
1143+
// Since 1.7.0
1144+
func (conn *Connection) NewStream() (*Stream, error) {
1145+
next := atomic.AddUint64(&conn.lastStreamId, 1)
1146+
return &Stream{
1147+
Id: next,
1148+
Conn: conn,
1149+
}, nil
1150+
}

connection_pool/config.lua

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
-- able to send requests until everything is configured.
33
box.cfg{
44
work_dir = os.getenv("TEST_TNT_WORK_DIR"),
5+
memtx_use_mvcc_engine = os.getenv("TEST_TNT_MEMTX_USE_MVCC_ENGINE") == 'true' or nil,
56
}
67

78
box.once("init", function()

connection_pool/connection_pool.go

+14
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,20 @@ func (connPool *ConnectionPool) Do(req tarantool.Request, userMode Mode) *tarant
544544
return conn.Do(req)
545545
}
546546

547+
// NewStream creates new Stream object for connection selected
548+
// by userMode from connPool.
549+
//
550+
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
551+
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
552+
// Since 1.7.0
553+
func (connPool *ConnectionPool) NewStream(userMode Mode) (*tarantool.Stream, error) {
554+
conn, err := connPool.getNextConnection(userMode)
555+
if err != nil {
556+
return nil, err
557+
}
558+
return conn.NewStream()
559+
}
560+
547561
//
548562
// private
549563
//

0 commit comments

Comments
 (0)