99 "github.com/google/uuid"
1010 libcluster "github.com/tarantool/tt/lib/cluster"
1111 "github.com/tarantool/tt/lib/connect"
12- "go.etcd.io/etcd/api/v3/mvccpb"
13- clientv3 "go.etcd.io/etcd/client/v3"
1412 "gopkg.in/yaml.v2"
1513)
1614
@@ -63,36 +61,22 @@ type SwitchStatusCtx struct {
6361 TaskID string
6462}
6563
66- func makeEtcdOpts (uriOpts connect.UriOpts ) libcluster.EtcdOpts {
67- opts := libcluster.EtcdOpts {
68- Endpoints : []string {uriOpts .Endpoint },
69- Username : uriOpts .Username ,
70- Password : uriOpts .Password ,
71- KeyFile : uriOpts .KeyFile ,
72- CertFile : uriOpts .CertFile ,
73- CaPath : uriOpts .CaPath ,
74- CaFile : uriOpts .CaFile ,
75- SkipHostVerify : uriOpts .SkipHostVerify ,
76- Timeout : uriOpts .Timeout ,
77- }
78-
79- return opts
80- }
81-
8264// Switch master instance.
8365func Switch (url string , switchCtx SwitchCtx ) error {
8466 uriOpts , err := connect .CreateUriOpts (url )
8567 if err != nil {
8668 return fmt .Errorf ("invalid URL %q: %w" , url , err )
8769 }
70+ connOpts := libcluster.ConnectOpts {
71+ Username : switchCtx .Username ,
72+ Password : switchCtx .Password ,
73+ }
8874
89- opts := makeEtcdOpts (uriOpts )
90-
91- etcd , err := libcluster .ConnectEtcd (opts )
75+ conn , err := libcluster .ConnectCStorage (uriOpts , connOpts )
9276 if err != nil {
93- return fmt .Errorf ("unable to connect to etcd : %w" , err )
77+ return fmt .Errorf ("unable to connect to config storage : %w" , err )
9478 }
95- defer etcd .Close ()
79+ defer conn .Close ()
9680
9781 cmd := switchCmd {
9882 Command : "switch" ,
@@ -109,54 +93,40 @@ func Switch(url string, switchCtx SwitchCtx) error {
10993 key := uriOpts .Prefix + failoverPath + uuid
11094
11195 if switchCtx .Wait {
112- ctx , cancel_watch := context .WithTimeout (context .Background (),
96+ ctxWatch , cancelWatch := context .WithTimeout (context .Background (),
11397 time .Duration (switchCtx .Timeout )* time .Second + cmdAdditionalWait )
114- outputChan := make (chan * clientv3.Event )
115- defer cancel_watch ()
116-
117- go func () {
118- waitChan := etcd .Watch (ctx , key )
119- defer close (outputChan )
120-
121- for resp := range waitChan {
122- for _ , ev := range resp .Events {
123- switch ev .Type {
124- case mvccpb .PUT :
125- outputChan <- ev
126- }
127- }
128- }
129- }()
98+ defer cancelWatch ()
99+ watchChan := conn .Watch (ctxWatch , key )
130100
131- ctx_put , cancel := context .WithTimeout (context .Background (), defaultEtcdTimeout )
132- _ , err = etcd .Put (ctx_put , key , string (yamlCmd ))
101+ ctx , cancel := context .WithTimeout (context .Background (), defaultEtcdTimeout )
102+ err = conn .Put (ctx , key , string (yamlCmd ))
133103 cancel ()
134104
135105 if err != nil {
136106 return err
137107 }
138108
139- for ev := range outputChan {
140- result := switchCmdResult {}
141- err = yaml .Unmarshal (ev .Kv . Value , & result )
109+ for ev := range watchChan {
110+ var result switchCmdResult
111+ err = yaml .Unmarshal (ev .Data , & result )
142112 if err != nil {
143113 return err
144114 }
145- fmt .Printf ("%s" , ev . Kv . Value )
115+ fmt .Printf ("%s" , ev )
146116 if result .Status == "success" || result .Status == "failed" {
147117 return nil
148118 }
149119 }
150- if ctx .Err () == context .DeadlineExceeded {
120+ if ctxWatch .Err () == context .DeadlineExceeded {
151121 log .Info ("Timeout for command execution reached." )
152122 return nil
153123 }
154124
155- return ctx . Err ()
125+ return fmt . Errorf ( "unexpected problem with watch context: %w" , ctxWatch . Err () )
156126 }
157127
158128 ctx , cancel := context .WithTimeout (context .Background (), defaultEtcdTimeout )
159- _ , err = etcd .Put (ctx , key , string (yamlCmd ))
129+ err = conn .Put (ctx , key , string (yamlCmd ))
160130 cancel ()
161131
162132 if err != nil {
@@ -177,30 +147,28 @@ func SwitchStatus(url string, switchCtx SwitchStatusCtx) error {
177147 if err != nil {
178148 return fmt .Errorf ("invalid URL %q: %w" , url , err )
179149 }
180-
181- opts := makeEtcdOpts (uriOpts )
182-
183- etcd , err := libcluster .ConnectEtcd (opts )
150+ var connOpts libcluster.ConnectOpts
151+ conn , err := libcluster .ConnectCStorage (uriOpts , connOpts )
184152 if err != nil {
185- return fmt .Errorf ("unable to connect to etcd : %w" , err )
153+ return fmt .Errorf ("unable to connect to config storage : %w" , err )
186154 }
187- defer etcd .Close ()
155+ defer conn .Close ()
188156
189157 key := uriOpts .Prefix + failoverPath + switchCtx .TaskID
190158
191159 ctx , cancel := context .WithTimeout (context .Background (), defaultEtcdTimeout )
192- result , err := etcd .Get (ctx , key , clientv3 . WithLimit ( 1 ) )
160+ result , err := conn .Get (ctx , key )
193161 cancel ()
194162
195163 if err != nil {
196164 return err
197165 }
198166
199- if len (result . Kvs ) != 1 {
167+ if len (result ) != 1 {
200168 return fmt .Errorf ("task with id `%s` is not found" , switchCtx .TaskID )
201169 }
202170
203- fmt .Print (string (result . Kvs [0 ].Value ))
171+ fmt .Print (string (result [0 ].Value ))
204172
205173 return nil
206174}
0 commit comments