Skip to content

Commit e33c53a

Browse files
committed
cli: implement cluster worker publish command
@TarantoolBot document Title: `tt cluster worker publish` implementation Implement `tt cluster worker publish` command for publishing worker configurations to etcd or tarantool config storage (TCS). The command accepts a URL with format: ``` http(s)://[username:password@]host:port/prefix/host-name/worker-name ``` * prefix - a base path to the worker configuration. * host-name - a name of the host. * worker-name - a name of the worker. Possible arguments: * timeout - a request timeout in seconds (default 3.0). * ssl_key_file - a path to a private SSL key file. * ssl_cert_file - a path to an SSL certificate file. * ssl_ca_file - a path to a trusted certificate authorities (CA) file. * ssl_ca_path - a path to a trusted certificate authorities (CA) directory. * ssl_ciphers - a list of allowed SSL ciphers. * verify_host - set off (default true) verification of the certificate’s name against the host. * verify_peer - set off (default true) verification of the peer’s SSL certificate. The command supports the following environment variables: * TT_CLI_USERNAME - specifies a Tarantool username; * TT_CLI_PASSWORD - specifies a Tarantool password. * TT_CLI_ETCD_USERNAME - specifies a Etcd username; * TT_CLI_ETCD_PASSWORD - specifies a Etcd password. The priority of credentials: environment variables < command flags < URL credentials. Usage: ``` tt cluster worker publish <URI> <FILE> ``` Supported flags: --force force publish and skip checking existence -h, --help help for publish -p, --password string password (used as etcd/tarantool config storage credentials) -u, --username string username (used as etcd/tarantool config storage credentials) Example: ``` tt cluster worker publish \ https://user:pass@localhost:2379/cluster/workers/host/server-1 \ worker.yaml ``` The implementation uses go-storage library for both etcd and TCS. Without --force flag, it checks if the key already exists before publishing. Due to go-storage limitations (no "count" predicate support), this check is done via two separate transactions with a potential race condition. Closes TNTP-7062
1 parent 7a6bee5 commit e33c53a

File tree

8 files changed

+897
-43
lines changed

8 files changed

+897
-43
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
99

1010
### Added
1111

12+
- `tt cluster`: worker publish subcommand.
13+
1214
### Changed
1315

1416
### Fixed

cli/cluster/cmd/storage.go

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
package cmd
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
"crypto/x509"
7+
"fmt"
8+
"io/fs"
9+
"os"
10+
"strings"
11+
"time"
12+
13+
"github.com/tarantool/go-storage"
14+
etcdDriver "github.com/tarantool/go-storage/driver/etcd"
15+
tcsDriver "github.com/tarantool/go-storage/driver/tcs"
16+
"github.com/tarantool/go-tarantool/v2"
17+
libconnect "github.com/tarantool/tt/lib/connect"
18+
"github.com/tarantool/tt/lib/dial"
19+
"go.etcd.io/etcd/client/pkg/v3/transport"
20+
clientv3 "go.etcd.io/etcd/client/v3"
21+
"go.uber.org/zap"
22+
"google.golang.org/grpc"
23+
)
24+
25+
// ConnectStorage connects to etcd or tarantool config storage and returns
26+
// a storage.Storage instance. It tries etcd first, then tarantool.
27+
func ConnectStorage(
28+
opts libconnect.UriOpts,
29+
username, password string,
30+
) (storage.Storage, func(), error) {
31+
etcdcli, errEtcd := connectEtcd(opts, username, password)
32+
if errEtcd == nil {
33+
drv := etcdDriver.New(etcdcli)
34+
stg := storage.NewStorage(drv)
35+
return stg, func() { etcdcli.Close() }, nil
36+
}
37+
38+
conn, errTarantool := connectTarantool(opts, username, password)
39+
if errTarantool == nil {
40+
drv := tcsDriver.New(conn)
41+
stg := storage.NewStorage(drv)
42+
return stg, func() { conn.Close() }, nil
43+
}
44+
45+
return nil, nil, fmt.Errorf("failed to connect to etcd or tarantool: %w, %w",
46+
errTarantool, errEtcd)
47+
}
48+
49+
// connectEtcd creates an etcd client from URI options.
50+
func connectEtcd(
51+
opts libconnect.UriOpts,
52+
username, password string,
53+
) (*clientv3.Client, error) {
54+
var endpoints []string
55+
if opts.Endpoint != "" {
56+
endpoints = []string{opts.Endpoint}
57+
}
58+
59+
etcdUsername := username
60+
etcdPassword := password
61+
if etcdUsername == "" {
62+
etcdUsername = os.Getenv(libconnect.EtcdUsernameEnv)
63+
}
64+
if etcdPassword == "" {
65+
etcdPassword = os.Getenv(libconnect.EtcdPasswordEnv)
66+
}
67+
68+
var tlsConfig *tls.Config
69+
if opts.KeyFile != "" || opts.CertFile != "" || opts.CaFile != "" ||
70+
opts.CaPath != "" || opts.SkipHostVerify {
71+
72+
tlsInfo := transport.TLSInfo{
73+
CertFile: opts.CertFile,
74+
KeyFile: opts.KeyFile,
75+
TrustedCAFile: opts.CaFile,
76+
}
77+
78+
var err error
79+
tlsConfig, err = tlsInfo.ClientConfig()
80+
if err != nil {
81+
return nil, fmt.Errorf("failed to create tls client config: %w", err)
82+
}
83+
84+
if opts.CaPath != "" {
85+
roots, err := loadRootCACerts(opts.CaPath)
86+
if err != nil {
87+
return nil, fmt.Errorf("failed to load CA directory: %w", err)
88+
}
89+
tlsConfig.RootCAs = roots
90+
}
91+
92+
if opts.SkipHostVerify {
93+
tlsConfig.InsecureSkipVerify = true
94+
}
95+
}
96+
97+
client, err := clientv3.New(clientv3.Config{
98+
Endpoints: endpoints,
99+
DialTimeout: opts.Timeout,
100+
Username: etcdUsername,
101+
Password: etcdPassword,
102+
TLS: tlsConfig,
103+
Logger: zap.NewNop(),
104+
DialOptions: []grpc.DialOption{grpc.WithBlock()}, //nolint:staticcheck
105+
})
106+
if err != nil {
107+
return nil, fmt.Errorf("failed to create etcd client: %w", err)
108+
}
109+
110+
return client, nil
111+
}
112+
113+
// connectTarantool creates a tarantool connection from URI options.
114+
func connectTarantool(
115+
opts libconnect.UriOpts,
116+
username, password string,
117+
) (*tarantool.Connection, error) {
118+
tarantoolUsername := username
119+
tarantoolPassword := password
120+
if tarantoolUsername == "" {
121+
tarantoolUsername = os.Getenv(libconnect.TarantoolUsernameEnv)
122+
}
123+
if tarantoolPassword == "" {
124+
tarantoolPassword = os.Getenv(libconnect.TarantoolPasswordEnv)
125+
}
126+
127+
dialOpts := dial.Opts{
128+
Address: fmt.Sprintf("tcp://%s", opts.Host),
129+
User: tarantoolUsername,
130+
Password: tarantoolPassword,
131+
SslKeyFile: opts.KeyFile,
132+
SslCertFile: opts.CertFile,
133+
SslCaFile: opts.CaFile,
134+
SslCiphers: opts.Ciphers,
135+
}
136+
137+
dialer, err := dial.New(dialOpts)
138+
if err != nil {
139+
return nil, fmt.Errorf("failed to create dialer: %w", err)
140+
}
141+
142+
connectorOpts := tarantool.Opts{
143+
Timeout: opts.Timeout,
144+
}
145+
146+
ctx, cancel := contextWithTimeout(connectorOpts.Timeout)
147+
defer cancel()
148+
149+
conn, err := tarantool.Connect(ctx, dialer, connectorOpts)
150+
if err != nil {
151+
return nil, fmt.Errorf("failed to connect to tarantool: %w", err)
152+
}
153+
154+
return conn, nil
155+
}
156+
157+
// contextWithTimeout creates a context with optional timeout.
158+
func contextWithTimeout(timeout time.Duration) (context.Context, context.CancelFunc) {
159+
if timeout > 0 {
160+
return context.WithTimeout(context.Background(), timeout)
161+
}
162+
return context.Background(), func() {}
163+
}
164+
165+
// loadRootCACerts loads root CA certificates from a directory.
166+
func loadRootCACerts(caPath string) (*x509.CertPool, error) {
167+
roots := x509.NewCertPool()
168+
169+
files, err := os.ReadDir(caPath)
170+
if err != nil {
171+
return nil, fmt.Errorf("failed to read CA directory: %w", err)
172+
}
173+
174+
for _, f := range files {
175+
if f.IsDir() || isSameDirSymlink(f, caPath) {
176+
continue
177+
}
178+
179+
data, err := os.ReadFile(caPath + "/" + f.Name())
180+
if err != nil {
181+
continue
182+
}
183+
184+
roots.AppendCertsFromPEM(data)
185+
}
186+
187+
return roots, nil
188+
}
189+
190+
// isSameDirSymlink checks if a directory entry is a symlink pointing to the same directory.
191+
func isSameDirSymlink(f fs.DirEntry, dir string) bool {
192+
if f.Type()&fs.ModeSymlink == 0 {
193+
return false
194+
}
195+
196+
target, err := os.Readlink(dir + "/" + f.Name())
197+
return err == nil && !strings.Contains(target, "/")
198+
}

cli/cluster/cmd/worker.go

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,27 @@
11
package cmd
22

33
import (
4+
"context"
45
"fmt"
56
"os"
67
"strings"
78

9+
"github.com/tarantool/go-storage"
10+
"github.com/tarantool/go-storage/operation"
811
libconnect "github.com/tarantool/tt/lib/connect"
912
)
1013

1114
// WorkerPublishCtx contains information about cluster worker publish command
1215
// execution context.
1316
type WorkerPublishCtx struct {
14-
// Username defines a username for connection.
15-
Username string
16-
// Password defines a password for connection.
17-
Password string
18-
// Force defines whether the publish should be forced.
19-
Force bool
17+
// Storage is the storage instance for the operation.
18+
Storage storage.Storage
19+
// Key is the key in storage for the worker configuration.
20+
Key string
2021
// Src is a raw data to publish.
2122
Src []byte
23+
// Force defines whether the publish should be forced.
24+
Force bool
2225
}
2326

2427
// WorkerShowCtx contains information about cluster worker show command
@@ -108,9 +111,50 @@ func ResolveWorkerCredentials(
108111
return username, password
109112
}
110113

111-
// WorkerPublish publishes a worker configuration. Unimplemented.
112-
func WorkerPublish(url string, ctx WorkerPublishCtx) error {
113-
return fmt.Errorf("unimplemented")
114+
// WorkerPublish publishes a worker configuration to storage.
115+
// Without Force flag, it checks if the key already exists and returns an error if so.
116+
// With Force flag, it overwrites the existing configuration unconditionally.
117+
func WorkerPublish(publishCtx WorkerPublishCtx) error {
118+
ctx := context.Background()
119+
key := []byte(publishCtx.Key)
120+
value := publishCtx.Src
121+
122+
if publishCtx.Force {
123+
_, err := publishCtx.Storage.Tx(ctx).Then(operation.Put(key, value)).Commit()
124+
if err != nil {
125+
return fmt.Errorf("failed to publish worker configuration: %w", err)
126+
}
127+
return nil
128+
}
129+
130+
// Without force: check if key exists first, then put.
131+
// Note: This is not atomic - there's a race condition between Get and Put.
132+
// For TCS (tarantool config storage), we cannot use predicates like
133+
// VersionEqual or ValueEqual because they require an existing record.
134+
// TCS supports "count" predicate (count == 0 for non-existent key),
135+
// but go-storage library doesn't support it yet.
136+
// See: https://github.com/tarantool/go-storage/issues
137+
getResp, err := publishCtx.Storage.Tx(ctx).Then(operation.Get(key)).Commit()
138+
if err != nil {
139+
return fmt.Errorf("failed to publish worker configuration: %w", err)
140+
}
141+
142+
// Check if key exists (has values in results).
143+
for _, result := range getResp.Results {
144+
if len(result.Values) > 0 {
145+
return fmt.Errorf(
146+
"worker configuration already exists at %q, use --force to overwrite",
147+
publishCtx.Key)
148+
}
149+
}
150+
151+
// Key doesn't exist, perform Put.
152+
_, err = publishCtx.Storage.Tx(ctx).Then(operation.Put(key, value)).Commit()
153+
if err != nil {
154+
return fmt.Errorf("failed to publish worker configuration: %w", err)
155+
}
156+
157+
return nil
114158
}
115159

116160
// WorkerShow shows a worker configuration. Unimplemented.

cli/cluster/cmd/worker_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -239,11 +239,6 @@ func TestResolveWorkerCredentials(t *testing.T) {
239239
}
240240
}
241241

242-
func TestWorkerPublish(t *testing.T) {
243-
err := WorkerPublish("http://localhost:2379/prefix/host/worker", WorkerPublishCtx{})
244-
require.EqualError(t, err, "unimplemented")
245-
}
246-
247242
func TestWorkerShow(t *testing.T) {
248243
err := WorkerShow("http://localhost:2379/prefix/host/worker", WorkerShowCtx{})
249244
require.EqualError(t, err, "unimplemented")

cli/cmd/cluster.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,14 @@ var switchStatusCtx = clustercmd.SwitchStatusCtx{
6767
var rolesChangeCtx = clustercmd.RolesChangeCtx{}
6868

6969
var workerPublishCtx = clustercmd.WorkerPublishCtx{
70-
Username: "",
71-
Password: "",
72-
Force: false,
70+
Force: false,
7371
}
7472

73+
var (
74+
workerPublishUsername string
75+
workerPublishPassword string
76+
)
77+
7578
var workerShowCtx = clustercmd.WorkerShowCtx{
7679
Username: "",
7780
Password: "",
@@ -306,9 +309,9 @@ func newClusterWorkerCmd() *cobra.Command {
306309
Run: RunModuleFunc(internalClusterWorkerPublishModule),
307310
Args: cobra.ExactArgs(2),
308311
}
309-
publishCmd.Flags().StringVarP(&workerPublishCtx.Username, "username", "u", "",
312+
publishCmd.Flags().StringVarP(&workerPublishUsername, "username", "u", "",
310313
"username (used as etcd/tarantool config storage credentials)")
311-
publishCmd.Flags().StringVarP(&workerPublishCtx.Password, "password", "p", "",
314+
publishCmd.Flags().StringVarP(&workerPublishPassword, "password", "p", "",
312315
"password (used as etcd/tarantool config storage credentials)")
313316
publishCmd.Flags().BoolVar(&workerPublishCtx.Force, "force", false,
314317
"force publish and skip checking existence")
@@ -617,10 +620,23 @@ func internalClusterWorkerPublishModule(cmdCtx *cmdcontext.CmdCtx, args []string
617620
}
618621
workerPublishCtx.Src = data
619622

620-
workerPublishCtx.Username, workerPublishCtx.Password = clustercmd.ResolveWorkerCredentials(
621-
opts, workerPublishCtx.Username, workerPublishCtx.Password)
623+
prefix, hostName, workerName, err := clustercmd.ParseWorkerPath(opts.Prefix)
624+
if err != nil {
625+
return fmt.Errorf("failed to parse URL path: %w", err)
626+
}
627+
workerPublishCtx.Key = clustercmd.BuildWorkerStorageKey(prefix, hostName, workerName)
628+
629+
username, password := clustercmd.ResolveWorkerCredentials(
630+
opts, workerPublishUsername, workerPublishPassword)
631+
632+
stg, closeFunc, err := clustercmd.ConnectStorage(opts, username, password)
633+
if err != nil {
634+
return fmt.Errorf("failed to connect to storage: %w", err)
635+
}
636+
defer closeFunc()
637+
workerPublishCtx.Storage = stg
622638

623-
if err := clustercmd.WorkerPublish(args[0], workerPublishCtx); err != nil {
639+
if err := clustercmd.WorkerPublish(workerPublishCtx); err != nil {
624640
return fmt.Errorf("failed to publish worker configuration: %w", err)
625641
}
626642
return nil

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ require (
2929
github.com/stretchr/testify v1.11.1
3030
github.com/tarantool/cartridge-cli v0.0.0-00010101000000-000000000000
3131
github.com/tarantool/go-prompt v1.0.1
32+
github.com/tarantool/go-storage v1.1.2
3233
github.com/tarantool/go-tarantool v1.12.3
3334
github.com/tarantool/go-tarantool/v2 v2.4.2
3435
github.com/tarantool/tt/lib/cluster v0.0.0
@@ -40,6 +41,7 @@ require (
4041
go.etcd.io/etcd/client/pkg/v3 v3.6.8
4142
go.etcd.io/etcd/client/v3 v3.6.8
4243
go.etcd.io/etcd/tests/v3 v3.6.8
44+
go.uber.org/zap v1.27.1
4345
golang.org/x/crypto v0.49.0
4446
golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa
4547
golang.org/x/sys v0.42.0
@@ -129,7 +131,6 @@ require (
129131
github.com/tarantool/go-iproto v1.1.0 // indirect
130132
github.com/tarantool/go-openssl v1.2.1 // indirect
131133
github.com/tarantool/go-option v1.0.0 // indirect
132-
github.com/tarantool/go-storage v1.1.2 // indirect
133134
github.com/tarantool/go-tlsdialer v1.0.2 // indirect
134135
github.com/tklauser/go-sysconf v0.3.4 // indirect
135136
github.com/tklauser/numcpus v0.2.1 // indirect
@@ -155,7 +156,6 @@ require (
155156
go.opentelemetry.io/otel/trace v1.42.0 // indirect
156157
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
157158
go.uber.org/multierr v1.11.0 // indirect
158-
go.uber.org/zap v1.27.1 // indirect
159159
go.yaml.in/yaml/v2 v2.4.3 // indirect
160160
golang.org/x/net v0.52.0 // indirect
161161
golang.org/x/sync v0.20.0 // indirect

0 commit comments

Comments
 (0)