diff --git a/internal/protocol/connector.go b/internal/protocol/connector.go index 3195008f..7e0caf43 100644 --- a/internal/protocol/connector.go +++ b/internal/protocol/connector.go @@ -6,7 +6,7 @@ import ( "fmt" "io" "net" - "sort" + "sync" "time" "github.com/Rican7/retry" @@ -121,61 +121,89 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P return nil, errors.Wrap(err, "get servers") } - // Sort servers by Role, from low to high. - sort.Slice(servers, func(i, j int) bool { - return servers[i].Role < servers[j].Role - }) + protocolChan := make(chan *Protocol, len(servers)) + leaderChan := make(chan string, len(servers)) - // Make an attempt for each address until we find the leader. + wg := &sync.WaitGroup{} + wg.Add(len(servers)) + + go func() { + wg.Wait() + close(protocolChan) + close(leaderChan) + }() + + // Ask each one-hop server who the leader is concurrently for _, server := range servers { - log := func(l logging.Level, format string, a ...interface{}) { - format = fmt.Sprintf("server %s: ", server.Address) + format - log(l, format, a...) - } + go func(server NodeInfo) { + defer wg.Done() - ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout) - defer cancel() + log := func(l logging.Level, format string, a ...interface{}) { + format = fmt.Sprintf("server %s: ", server.Address) + format + log(l, format, a...) + } - protocol, leader, err := c.connectAttemptOne(ctx, server.Address, log) - if err != nil { - // This server is unavailable, try with the next target. - log(logging.Warn, err.Error()) - continue - } + ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout) + defer cancel() + + protocol, leader, err := c.connectAttemptOne(ctx, server.Address, log) + if err != nil { + // This server is unavailable, no result + log(logging.Warn, err.Error()) + return + } + if protocol != nil { + // We found the leader + log(logging.Debug, "connected") + protocolChan <- protocol + return + } + if leader == "" { + // This server does not know who the current leader is, + // no result + log(logging.Warn, "no known leader") + return + } + + // This server claims to know the leader + leaderChan <- leader + }(server) + } + + // If one of the connections returned the leader, we're done + var protocol *Protocol + for p := range protocolChan { if protocol != nil { - // We found the leader - log(logging.Debug, "connected") - return protocol, nil - } - if leader == "" { - // This server does not know who the current leader is, - // try with the next target. - log(logging.Warn, "no known leader") + p.Close() continue } + protocol = p + } + if protocol != nil { + return protocol, nil + } - // If we get here, it means this server reported that another - // server is the leader, let's close the connection to this - // server and try with the suggested one. + for leader := range leaderChan { + // If we get here, it means one-hop servers reported that another + // server is the leader, let's try connecting to it log(logging.Debug, "connect to reported leader %s", leader) - ctx, cancel = context.WithTimeout(ctx, c.config.AttemptTimeout) + ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout) defer cancel() protocol, _, err = c.connectAttemptOne(ctx, leader, log) if err != nil { - // The leader reported by the previous server is - // unavailable, try with the next target. + // The leader reported by the one-hop server is unavailable log(logging.Warn, "reported leader unavailable err=%v", err) continue } if protocol == nil { - // The leader reported by the target server does not consider itself - // the leader, try with the next target. + // The leader reported by the one-hop server does not consider itself + // the leader log(logging.Warn, "reported leader server is not the leader") continue } - log(logging.Debug, "connected") + return protocol, nil }