Skip to content

add discovering mvp #430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions box/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@
// Upstream information.
type Upstream struct {
// Status is replication status of the connection with the instance.
/*
connect: an instance is connecting to the master.
auth: authentication is being performed.
wait_snapshot: an instance is receiving metadata from the master. If join fails with a non-critical error at this stage (for example, ER_READONLY, ER_ACCESS_DENIED, or a network-related issue), an instance tries to find a new master to join.

Check failure on line 54 in box/info.go

View workflow job for this annotation

GitHub Actions / golangci-lint

The line is 248 characters long, which exceeds the maximum of 100 characters. (lll)
fetch_snapshot: an instance is receiving data from the master’s .snap files.
final_join: an instance is receiving new data added during fetch_snapshot.
sync: the master and replica are synchronizing to have the same data.
follow: the current instance’s role is replica. This means that the instance is read-only or acts as a replica for this remote peer in master-master configuration. The instance is receiving or able to receive data from the instance n’s (upstream) master.

Check failure on line 58 in box/info.go

View workflow job for this annotation

GitHub Actions / golangci-lint

The line is 261 characters long, which exceeds the maximum of 100 characters. (lll)
stopped: replication is stopped due to a replication error (for example, duplicate key).
disconnected: an instance is not connected to the replica set (for example, due to network issues, not replication errors).

Check failure on line 60 in box/info.go

View workflow job for this annotation

GitHub Actions / golangci-lint

The line is 130 characters long, which exceeds the maximum of 100 characters. (lll)
*/
Status string `msgpack:"status"`
// Idle is the time (in seconds) since the last event was received.
Idle float64 `msgpack:"idle"`
Expand All @@ -61,6 +72,8 @@
Message string `msgpack:"message,omitempty"`
// SystemMessage contains an error message in case of a degraded state; otherwise, it is nil.
SystemMessage string `msgpack:"system_message,omitempty"`
// Name - instance name, required only for tarantool 3.
Name string `msgpack:"name,omitempty"`
}

// Downstream information.
Expand Down
19 changes: 18 additions & 1 deletion pool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"time"

"github.com/tarantool/go-iproto"

"github.com/tarantool/go-tarantool/v2"
)

Expand Down Expand Up @@ -75,12 +74,17 @@ type Instance struct {

// Opts provides additional options (configurable via ConnectWithOpts).
type Opts struct {
DiscoveringDialer tarantool.NetDialer
// Timeout for timer to reopen connections that have been closed by some
// events and to relocate connection between subpools if ro/rw role has
// been updated.
CheckTimeout time.Duration
// ConnectionHandler provides an ability to handle connection updates.
ConnectionHandler ConnectionHandler
// WatchTopology start pool auto discovering by replication option
EnableDiscovery bool
// DiscoveryTimeout timout for discovering function
DiscoveryTimeout time.Duration
}

/*
Expand All @@ -103,6 +107,9 @@ Main features:
- Automatic master discovery by mode parameter.
*/
type ConnectionPool struct {
// root background connection pool ctx
ctx context.Context

ends map[string]*endpoint
endsMutex sync.RWMutex

Expand All @@ -115,6 +122,8 @@ type ConnectionPool struct {
anyPool *roundRobinStrategy
poolsMutex sync.RWMutex
watcherContainer watcherContainer

discoveringCancel context.CancelFunc
}

var _ Pooler = (*ConnectionPool)(nil)
Expand Down Expand Up @@ -171,6 +180,7 @@ func ConnectWithOpts(ctx context.Context, instances []Instance,
anyPool := newRoundRobinStrategy(size)

connPool := &ConnectionPool{
ctx: ctx,
ends: make(map[string]*endpoint),
opts: opts,
state: connectedState,
Expand All @@ -192,6 +202,13 @@ func ConnectWithOpts(ctx context.Context, instances []Instance,
go connPool.controller(endpointCtx, endpoint)
}

if opts.EnableDiscovery {
err := connPool.StartDiscovery()
if err != nil { // it strange if there is error, but we need to check that one
return nil, err
}
}

return connPool, nil
}

Expand Down
95 changes: 95 additions & 0 deletions pool/discovering.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package pool

import (
"context"
"errors"
"log"
"time"

"github.com/tarantool/go-tarantool/v2/box"
)

func (p *ConnectionPool) StartDiscovery() error {
if p.discoveringCancel != nil {
return errors.New("discovering already started")
}

ctx, cancel := context.WithCancel(p.ctx)

t := time.NewTicker(p.opts.DiscoveryTimeout)

go func() {
for {
select {
case <-t.C:
// we use any connection, because master can be unavailable
info, err := box.New(NewConnectorAdapter(p, ANY)).Info()
if err != nil {
log.Printf("tarantool: watch topology failed: %s\n", err)
continue
}

for _, replication := range info.Replication {
upstream := replication.Upstream

if upstream.Status != "follow" {
log.Printf("found replication instance (%s:%s) in non-follow state; skip discovering\n",

Check failure on line 36 in pool/discovering.go

View workflow job for this annotation

GitHub Actions / golangci-lint

The line is 112 characters long, which exceeds the maximum of 100 characters. (lll)
upstream.Name, upstream.Peer)
continue
}

addr := upstream.Peer

if addr == "" { // itself instance
continue
}

name := upstream.Name

if name == "" {
name = replication.UUID
}

p.endsMutex.Lock()
_, exists := p.ends[addr]
p.endsMutex.Unlock()

if !exists {
dialer := p.opts.DiscoveringDialer
dialer.Address = addr

i := Instance{
Name: name,
Dialer: dialer,
}

err = p.Add(ctx, i)
if err != nil {
log.Printf("tarantool: add to pool failed: %s\n", err)
continue
}
}
}

continue
case <-ctx.Done():
return
}
}
}()

p.discoveringCancel = cancel

return nil
}

func (p *ConnectionPool) StopDiscovery() error {
if p.discoveringCancel != nil {
p.discoveringCancel()
p.discoveringCancel = nil

return nil
}

return errors.New("discovering not started yet")
}
Loading