Skip to content

Commit a7e8f30

Browse files
committed
failover: support TcS as config storage
Closes #1193 @TarantoolBot document Title: `tt cluster failover` commands support TcS as config storage
1 parent 82f3e8e commit a7e8f30

5 files changed

Lines changed: 234 additions & 58 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1111

1212
- `tt create`: add template for Tarantool Config Storage.
1313
- `tt create`: add template for non-vshard Cluster.
14+
- `tt cluster failover`: support Tarantool Config Storage.
1415

1516
### Changed
1617

cli/cluster/cmd/failover.go

Lines changed: 26 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import (
99
"github.com/google/uuid"
1010
libcluster "github.com/tarantool/tt/lib/cluster"
1111
"github.com/tarantool/tt/lib/connect"
12-
"go.etcd.io/etcd/api/v3/mvccpb"
13-
clientv3 "go.etcd.io/etcd/client/v3"
1412
"gopkg.in/yaml.v2"
1513
)
1614

@@ -63,36 +61,22 @@ type SwitchStatusCtx struct {
6361
TaskID string
6462
}
6563

66-
func makeEtcdOpts(uriOpts connect.UriOpts) libcluster.EtcdOpts {
67-
opts := libcluster.EtcdOpts{
68-
Endpoints: []string{uriOpts.Endpoint},
69-
Username: uriOpts.Username,
70-
Password: uriOpts.Password,
71-
KeyFile: uriOpts.KeyFile,
72-
CertFile: uriOpts.CertFile,
73-
CaPath: uriOpts.CaPath,
74-
CaFile: uriOpts.CaFile,
75-
SkipHostVerify: uriOpts.SkipHostVerify,
76-
Timeout: uriOpts.Timeout,
77-
}
78-
79-
return opts
80-
}
81-
8264
// Switch master instance.
8365
func Switch(url string, switchCtx SwitchCtx) error {
8466
uriOpts, err := connect.CreateUriOpts(url)
8567
if err != nil {
8668
return fmt.Errorf("invalid URL %q: %w", url, err)
8769
}
70+
connOpts := libcluster.ConnectOpts{
71+
Username: switchCtx.Username,
72+
Password: switchCtx.Password,
73+
}
8874

89-
opts := makeEtcdOpts(uriOpts)
90-
91-
etcd, err := libcluster.ConnectEtcd(opts)
75+
conn, err := libcluster.ConnectCStorage(uriOpts, connOpts)
9276
if err != nil {
93-
return fmt.Errorf("unable to connect to etcd: %w", err)
77+
return fmt.Errorf("unable to connect to config storage: %w", err)
9478
}
95-
defer etcd.Close()
79+
defer conn.Close()
9680

9781
cmd := switchCmd{
9882
Command: "switch",
@@ -109,54 +93,40 @@ func Switch(url string, switchCtx SwitchCtx) error {
10993
key := uriOpts.Prefix + failoverPath + uuid
11094

11195
if switchCtx.Wait {
112-
ctx, cancel_watch := context.WithTimeout(context.Background(),
96+
ctxWatch, cancelWatch := context.WithTimeout(context.Background(),
11397
time.Duration(switchCtx.Timeout)*time.Second+cmdAdditionalWait)
114-
outputChan := make(chan *clientv3.Event)
115-
defer cancel_watch()
116-
117-
go func() {
118-
waitChan := etcd.Watch(ctx, key)
119-
defer close(outputChan)
120-
121-
for resp := range waitChan {
122-
for _, ev := range resp.Events {
123-
switch ev.Type {
124-
case mvccpb.PUT:
125-
outputChan <- ev
126-
}
127-
}
128-
}
129-
}()
98+
defer cancelWatch()
99+
watchChan := conn.Watch(ctxWatch, key)
130100

131-
ctx_put, cancel := context.WithTimeout(context.Background(), defaultEtcdTimeout)
132-
_, err = etcd.Put(ctx_put, key, string(yamlCmd))
101+
ctx, cancel := context.WithTimeout(context.Background(), defaultEtcdTimeout)
102+
err = conn.Put(ctx, key, string(yamlCmd))
133103
cancel()
134104

135105
if err != nil {
136106
return err
137107
}
138108

139-
for ev := range outputChan {
140-
result := switchCmdResult{}
141-
err = yaml.Unmarshal(ev.Kv.Value, &result)
109+
for ev := range watchChan {
110+
var result switchCmdResult
111+
err = yaml.Unmarshal(ev.Data, &result)
142112
if err != nil {
143113
return err
144114
}
145-
fmt.Printf("%s", ev.Kv.Value)
115+
fmt.Printf("%s", ev)
146116
if result.Status == "success" || result.Status == "failed" {
147117
return nil
148118
}
149119
}
150-
if ctx.Err() == context.DeadlineExceeded {
120+
if ctxWatch.Err() == context.DeadlineExceeded {
151121
log.Info("Timeout for command execution reached.")
152122
return nil
153123
}
154124

155-
return ctx.Err()
125+
return fmt.Errorf("unexpected problem with watch context: %w", ctxWatch.Err())
156126
}
157127

158128
ctx, cancel := context.WithTimeout(context.Background(), defaultEtcdTimeout)
159-
_, err = etcd.Put(ctx, key, string(yamlCmd))
129+
err = conn.Put(ctx, key, string(yamlCmd))
160130
cancel()
161131

162132
if err != nil {
@@ -177,30 +147,28 @@ func SwitchStatus(url string, switchCtx SwitchStatusCtx) error {
177147
if err != nil {
178148
return fmt.Errorf("invalid URL %q: %w", url, err)
179149
}
180-
181-
opts := makeEtcdOpts(uriOpts)
182-
183-
etcd, err := libcluster.ConnectEtcd(opts)
150+
var connOpts libcluster.ConnectOpts
151+
conn, err := libcluster.ConnectCStorage(uriOpts, connOpts)
184152
if err != nil {
185-
return fmt.Errorf("unable to connect to etcd: %w", err)
153+
return fmt.Errorf("unable to connect to config storage: %w", err)
186154
}
187-
defer etcd.Close()
155+
defer conn.Close()
188156

189157
key := uriOpts.Prefix + failoverPath + switchCtx.TaskID
190158

191159
ctx, cancel := context.WithTimeout(context.Background(), defaultEtcdTimeout)
192-
result, err := etcd.Get(ctx, key, clientv3.WithLimit(1))
160+
result, err := conn.Get(ctx, key)
193161
cancel()
194162

195163
if err != nil {
196164
return err
197165
}
198166

199-
if len(result.Kvs) != 1 {
167+
if len(result) != 1 {
200168
return fmt.Errorf("task with id `%s` is not found", switchCtx.TaskID)
201169
}
202170

203-
fmt.Print(string(result.Kvs[0].Value))
171+
fmt.Print(string(result[0].Value))
204172

205173
return nil
206174
}

lib/cluster/configstorage.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package cluster
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
libconnect "github.com/tarantool/tt/lib/connect"
8+
)
9+
10+
type CSWatchEvent struct {
11+
Data []byte
12+
}
13+
14+
type CSConnection interface {
15+
Close() error
16+
Get(ctx context.Context, key string) ([]Data, error)
17+
Put(ctx context.Context, key, value string) error
18+
Watch(ctx context.Context, key string) <-chan CSWatchEvent
19+
}
20+
21+
func ConnectCStorage(
22+
uriOpts libconnect.UriOpts,
23+
connOpts ConnectOpts,
24+
) (CSConnection, error) {
25+
sc, errEtcd := connectEtcdCS(uriOpts, connOpts)
26+
if errEtcd == nil {
27+
return sc, nil
28+
}
29+
30+
sc, errTarantool := connectTarantoolCS(uriOpts, connOpts)
31+
if errTarantool == nil {
32+
return sc, nil
33+
}
34+
35+
return nil, fmt.Errorf("failed to establish a connection to tarantool or etcd: %w, %w",
36+
errTarantool, errEtcd)
37+
}

lib/cluster/etcd.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/tarantool/go-tarantool/v2"
1515
libconnect "github.com/tarantool/tt/lib/connect"
16+
"go.etcd.io/etcd/api/v3/mvccpb"
1617
"go.etcd.io/etcd/client/pkg/v3/transport"
1718
clientv3 "go.etcd.io/etcd/client/v3"
1819
"go.uber.org/zap"
@@ -882,3 +883,71 @@ func MakeEtcdOptsFromUriOpts(src libconnect.UriOpts) EtcdOpts {
882883
Timeout: src.Timeout,
883884
}
884885
}
886+
887+
type etcdCSConnection struct {
888+
cli *clientv3.Client
889+
}
890+
891+
func connectEtcdCS(uriOpts libconnect.UriOpts, connOpts ConnectOpts) (CSConnection, error) {
892+
cli, err := ConnectEtcdUriOpts(uriOpts, connOpts)
893+
if err != nil {
894+
return nil, err
895+
}
896+
return &etcdCSConnection{
897+
cli: cli,
898+
}, nil
899+
}
900+
901+
func (c *etcdCSConnection) Close() error {
902+
return c.cli.Close()
903+
}
904+
905+
func (c *etcdCSConnection) Get(ctx context.Context, key string) ([]Data, error) {
906+
resp, err := c.cli.Get(ctx, key)
907+
if err != nil {
908+
return nil, fmt.Errorf("failed to fetch data from etcd: %w", err)
909+
}
910+
911+
switch {
912+
case len(resp.Kvs) == 0:
913+
// It should not happen, but we need to be sure to avoid a null pointer
914+
// dereference.
915+
return nil, fmt.Errorf("a configuration data not found in etcd for key %q",
916+
key)
917+
case len(resp.Kvs) > 1:
918+
return nil, fmt.Errorf("too many responses (%v) from etcd for key %q",
919+
resp.Kvs, key)
920+
}
921+
922+
collected := []Data{
923+
{
924+
Source: string(resp.Kvs[0].Key),
925+
Value: resp.Kvs[0].Value,
926+
Revision: resp.Kvs[0].ModRevision,
927+
},
928+
}
929+
return collected, nil
930+
}
931+
932+
func (c *etcdCSConnection) Put(ctx context.Context, key, value string) error {
933+
_, err := c.cli.Put(ctx, key, value)
934+
return err
935+
}
936+
937+
func (c *etcdCSConnection) Watch(ctx context.Context, key string) <-chan CSWatchEvent {
938+
ch := make(chan CSWatchEvent)
939+
innerCh := c.cli.Watch(ctx, key)
940+
go func() {
941+
defer close(ch)
942+
943+
for resp := range innerCh {
944+
for _, ev := range resp.Events {
945+
switch ev.Type {
946+
case mvccpb.PUT:
947+
ch <- CSWatchEvent{Data: ev.Kv.Value}
948+
}
949+
}
950+
}
951+
}()
952+
return ch
953+
}

lib/cluster/tarantool.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,3 +618,104 @@ func ConnectTarantool(uriOpts libconnect.UriOpts,
618618
}
619619
return conn, nil
620620
}
621+
622+
type tarantoolCSConnection struct {
623+
conn tarantool.Connector
624+
}
625+
626+
func connectTarantoolCS(uriOpts libconnect.UriOpts, connOpts ConnectOpts) (CSConnection, error) {
627+
conn, err := ConnectTarantool(uriOpts, connOpts)
628+
if err != nil {
629+
return nil, err
630+
}
631+
return &tarantoolCSConnection{
632+
conn: conn,
633+
}, nil
634+
}
635+
636+
func (c *tarantoolCSConnection) Close() error {
637+
return c.conn.Close()
638+
}
639+
640+
func (c *tarantoolCSConnection) Get(ctx context.Context, key string) ([]Data, error) {
641+
data, err := c.call(ctx, "config.storage.get", []any{key})
642+
if err != nil {
643+
return nil, fmt.Errorf("failed to fetch data from tarantool: %w", err)
644+
}
645+
if len(data) != 1 {
646+
return nil, fmt.Errorf("unexpected response from tarantool: %q", data)
647+
}
648+
649+
var resp tarantoolGetResponse
650+
if err := mapstructure.Decode(data[0], &resp); err != nil {
651+
return nil, fmt.Errorf("failed to map response from tarantool: %q", data[0])
652+
}
653+
654+
switch {
655+
case len(resp.Data) == 0:
656+
return nil, fmt.Errorf("a configuration data not found in tarantool for key %q",
657+
key)
658+
case len(resp.Data) > 1:
659+
// It should not happen, but we need to be sure to avoid a null pointer
660+
// dereference.
661+
return nil, fmt.Errorf("too many responses (%v) from tarantool for key %q",
662+
resp, key)
663+
}
664+
665+
return []Data{
666+
{
667+
Source: key,
668+
Value: []byte(resp.Data[0].Value),
669+
Revision: resp.Data[0].ModRevision,
670+
},
671+
}, err
672+
}
673+
674+
func (c *tarantoolCSConnection) Put(ctx context.Context, key, value string) error {
675+
_, err := c.call(ctx, "config.storage.put", []any{key, value})
676+
if err != nil {
677+
return fmt.Errorf("failed to put data into tarantool: %w", err)
678+
}
679+
return nil
680+
}
681+
682+
func (c *tarantoolCSConnection) Watch(ctx context.Context, key string) <-chan CSWatchEvent {
683+
ch := make(chan CSWatchEvent)
684+
data, err := c.Get(ctx, key)
685+
if err != nil {
686+
return nil
687+
}
688+
origValue := string(data[0].Value)
689+
go func() {
690+
defer close(ch)
691+
for {
692+
select {
693+
case <-time.After(10 * time.Millisecond):
694+
case <-ctx.Done():
695+
return
696+
}
697+
698+
data, err := c.Get(ctx, key)
699+
if err != nil && string(data[0].Value) != origValue {
700+
ch <- CSWatchEvent{data[0].Value}
701+
}
702+
}
703+
}()
704+
705+
return ch
706+
}
707+
708+
func (c *tarantoolCSConnection) call(
709+
ctx context.Context,
710+
fun string,
711+
args []any,
712+
) ([]any, error) {
713+
req := tarantool.NewCallRequest(fun).Args(args)
714+
715+
var result []any
716+
if err := c.conn.Do(req.Context(ctx)).GetTyped(&result); err != nil {
717+
return nil, err
718+
}
719+
720+
return result, nil
721+
}

0 commit comments

Comments
 (0)