Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt to connect to Leader via all known servers simultaneously #290

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 63 additions & 35 deletions internal/protocol/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"fmt"
"io"
"net"
"sort"
"sync"
"time"

"github.com/Rican7/retry"
Expand Down Expand Up @@ -121,61 +121,89 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P
return nil, errors.Wrap(err, "get servers")
}

// Sort servers by Role, from low to high.
sort.Slice(servers, func(i, j int) bool {
return servers[i].Role < servers[j].Role
})
protocolChan := make(chan *Protocol, len(servers))
leaderChan := make(chan string, len(servers))

// Make an attempt for each address until we find the leader.
wg := &sync.WaitGroup{}
wg.Add(len(servers))

go func() {
wg.Wait()
close(protocolChan)
close(leaderChan)
}()

// Ask each one-hop server who the leader is concurrently
for _, server := range servers {
log := func(l logging.Level, format string, a ...interface{}) {
format = fmt.Sprintf("server %s: ", server.Address) + format
log(l, format, a...)
}
go func(server NodeInfo) {
defer wg.Done()

ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout)
defer cancel()
log := func(l logging.Level, format string, a ...interface{}) {
format = fmt.Sprintf("server %s: ", server.Address) + format
log(l, format, a...)
}

protocol, leader, err := c.connectAttemptOne(ctx, server.Address, log)
if err != nil {
// This server is unavailable, try with the next target.
log(logging.Warn, err.Error())
continue
}
ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout)
defer cancel()

protocol, leader, err := c.connectAttemptOne(ctx, server.Address, log)
if err != nil {
// This server is unavailable, no result
log(logging.Warn, err.Error())
return
}
if protocol != nil {
// We found the leader
log(logging.Debug, "connected")
protocolChan <- protocol
return
}
if leader == "" {
// This server does not know who the current leader is,
// no result
log(logging.Warn, "no known leader")
return
}

// This server claims to know the leader
leaderChan <- leader
}(server)
}

// If one of the connections returned the leader, we're done
var protocol *Protocol
for p := range protocolChan {
if protocol != nil {
// We found the leader
log(logging.Debug, "connected")
return protocol, nil
}
if leader == "" {
// This server does not know who the current leader is,
// try with the next target.
log(logging.Warn, "no known leader")
p.Close()
continue
}
protocol = p
}
if protocol != nil {
return protocol, nil
}

// If we get here, it means this server reported that another
// server is the leader, let's close the connection to this
// server and try with the suggested one.
for leader := range leaderChan {
// If we get here, it means one-hop servers reported that another
// server is the leader, let's try connecting to it
log(logging.Debug, "connect to reported leader %s", leader)

ctx, cancel = context.WithTimeout(ctx, c.config.AttemptTimeout)
ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout)
defer cancel()

protocol, _, err = c.connectAttemptOne(ctx, leader, log)
if err != nil {
// The leader reported by the previous server is
// unavailable, try with the next target.
// The leader reported by the one-hop server is unavailable
log(logging.Warn, "reported leader unavailable err=%v", err)
continue
}
if protocol == nil {
// The leader reported by the target server does not consider itself
// the leader, try with the next target.
// The leader reported by the one-hop server does not consider itself
// the leader
log(logging.Warn, "reported leader server is not the leader")
continue
}
log(logging.Debug, "connected")

return protocol, nil
}

Expand Down
Loading