From b857af8c684c81d1dbd1a8a65ba0eb7bc8eddc8d Mon Sep 17 00:00:00 2001 From: "maksim.konovalov" Date: Mon, 10 Feb 2025 01:47:13 +0300 Subject: [PATCH] add discovering mvp --- box/info.go | 13 ++++++ pool/connection_pool.go | 19 ++++++++- pool/discovering.go | 95 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 126 insertions(+), 1 deletion(-) create mode 100644 pool/discovering.go diff --git a/box/info.go b/box/info.go index aabfd65e3..0b692ec0d 100644 --- a/box/info.go +++ b/box/info.go @@ -48,6 +48,17 @@ type Replication struct { // Upstream information. type Upstream struct { // Status is replication status of the connection with the instance. + /* + connect: an instance is connecting to the master. + auth: authentication is being performed. + wait_snapshot: an instance is receiving metadata from the master. If join fails with a non-critical error at this stage (for example, ER_READONLY, ER_ACCESS_DENIED, or a network-related issue), an instance tries to find a new master to join. + fetch_snapshot: an instance is receiving data from the master’s .snap files. + final_join: an instance is receiving new data added during fetch_snapshot. + sync: the master and replica are synchronizing to have the same data. + follow: the current instance’s role is replica. This means that the instance is read-only or acts as a replica for this remote peer in master-master configuration. The instance is receiving or able to receive data from the instance n’s (upstream) master. + stopped: replication is stopped due to a replication error (for example, duplicate key). + disconnected: an instance is not connected to the replica set (for example, due to network issues, not replication errors). + */ Status string `msgpack:"status"` // Idle is the time (in seconds) since the last event was received. Idle float64 `msgpack:"idle"` @@ -61,6 +72,8 @@ type Upstream struct { Message string `msgpack:"message,omitempty"` // SystemMessage contains an error message in case of a degraded state; otherwise, it is nil. SystemMessage string `msgpack:"system_message,omitempty"` + // Name - instance name, required only for tarantool 3. + Name string `msgpack:"name,omitempty"` } // Downstream information. diff --git a/pool/connection_pool.go b/pool/connection_pool.go index 9be46665e..21dc15582 100644 --- a/pool/connection_pool.go +++ b/pool/connection_pool.go @@ -19,7 +19,6 @@ import ( "time" "github.com/tarantool/go-iproto" - "github.com/tarantool/go-tarantool/v2" ) @@ -75,12 +74,17 @@ type Instance struct { // Opts provides additional options (configurable via ConnectWithOpts). type Opts struct { + DiscoveringDialer tarantool.NetDialer // Timeout for timer to reopen connections that have been closed by some // events and to relocate connection between subpools if ro/rw role has // been updated. CheckTimeout time.Duration // ConnectionHandler provides an ability to handle connection updates. ConnectionHandler ConnectionHandler + // WatchTopology start pool auto discovering by replication option + EnableDiscovery bool + // DiscoveryTimeout timout for discovering function + DiscoveryTimeout time.Duration } /* @@ -103,6 +107,9 @@ Main features: - Automatic master discovery by mode parameter. */ type ConnectionPool struct { + // root background connection pool ctx + ctx context.Context + ends map[string]*endpoint endsMutex sync.RWMutex @@ -115,6 +122,8 @@ type ConnectionPool struct { anyPool *roundRobinStrategy poolsMutex sync.RWMutex watcherContainer watcherContainer + + discoveringCancel context.CancelFunc } var _ Pooler = (*ConnectionPool)(nil) @@ -171,6 +180,7 @@ func ConnectWithOpts(ctx context.Context, instances []Instance, anyPool := newRoundRobinStrategy(size) connPool := &ConnectionPool{ + ctx: ctx, ends: make(map[string]*endpoint), opts: opts, state: connectedState, @@ -192,6 +202,13 @@ func ConnectWithOpts(ctx context.Context, instances []Instance, go connPool.controller(endpointCtx, endpoint) } + if opts.EnableDiscovery { + err := connPool.StartDiscovery() + if err != nil { // it strange if there is error, but we need to check that one + return nil, err + } + } + return connPool, nil } diff --git a/pool/discovering.go b/pool/discovering.go new file mode 100644 index 000000000..6b0bfbfc2 --- /dev/null +++ b/pool/discovering.go @@ -0,0 +1,95 @@ +package pool + +import ( + "context" + "errors" + "log" + "time" + + "github.com/tarantool/go-tarantool/v2/box" +) + +func (p *ConnectionPool) StartDiscovery() error { + if p.discoveringCancel != nil { + return errors.New("discovering already started") + } + + ctx, cancel := context.WithCancel(p.ctx) + + t := time.NewTicker(p.opts.DiscoveryTimeout) + + go func() { + for { + select { + case <-t.C: + // we use any connection, because master can be unavailable + info, err := box.New(NewConnectorAdapter(p, ANY)).Info() + if err != nil { + log.Printf("tarantool: watch topology failed: %s\n", err) + continue + } + + for _, replication := range info.Replication { + upstream := replication.Upstream + + if upstream.Status != "follow" { + log.Printf("found replication instance (%s:%s) in non-follow state; skip discovering\n", + upstream.Name, upstream.Peer) + continue + } + + addr := upstream.Peer + + if addr == "" { // itself instance + continue + } + + name := upstream.Name + + if name == "" { + name = replication.UUID + } + + p.endsMutex.Lock() + _, exists := p.ends[addr] + p.endsMutex.Unlock() + + if !exists { + dialer := p.opts.DiscoveringDialer + dialer.Address = addr + + i := Instance{ + Name: name, + Dialer: dialer, + } + + err = p.Add(ctx, i) + if err != nil { + log.Printf("tarantool: add to pool failed: %s\n", err) + continue + } + } + } + + continue + case <-ctx.Done(): + return + } + } + }() + + p.discoveringCancel = cancel + + return nil +} + +func (p *ConnectionPool) StopDiscovery() error { + if p.discoveringCancel != nil { + p.discoveringCancel() + p.discoveringCancel = nil + + return nil + } + + return errors.New("discovering not started yet") +}