-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnect.go
105 lines (91 loc) · 2.53 KB
/
connect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package main
import (
"log"
"strings"
"sync/atomic"
"time"
paho "github.com/eclipse/paho.mqtt.golang"
"github.com/nats-io/nuid"
)
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
const (
CleanSession = true
PersistentSession = false
)
var nextConnectServerIndex = atomic.Uint64{}
func connect(clientID string, cleanSession bool) (paho.Client, *Stat, func(), error) {
if clientID == "" {
clientID = ClientID
}
if clientID == "" {
clientID = Name + "-" + nuid.Next()
}
parseDial := func(in string) (u, p, s, c string) {
if in == "" {
return "", "", DefaultServer, ""
}
if i := strings.LastIndex(in, "#"); i != -1 {
c = in[i+1:]
in = in[:i]
}
if i := strings.LastIndex(in, "@"); i != -1 {
up := in[:i]
in = in[i+1:]
u = up
if i := strings.Index(up, ":"); i != -1 {
u = up[:i]
p = up[i+1:]
}
}
s = in
return u, p, s, c
}
// round-robin the servers. since we start at 0 and add first, subtract 1 to
// compensate and start at 0!
next := int((nextConnectServerIndex.Add(1) - 1) % uint64(len(Servers)))
u, p, s, c := parseDial(Servers[next])
cl := paho.NewClient(paho.NewClientOptions().
SetClientID(clientID).
SetCleanSession(cleanSession).
SetProtocolVersion(4).
AddBroker(s).
SetUsername(u).
SetPassword(p).
SetStore(paho.NewMemoryStore()).
SetAutoReconnect(false).
SetDefaultPublishHandler(func(client paho.Client, msg paho.Message) {
log.Fatalf("received an unexpected message on %q (default handler)", msg.Topic())
}))
disconnectedWG.Add(1)
start := time.Now()
if t := cl.Connect(); t.Wait() && t.Error() != nil {
disconnectedWG.Done()
return nil, nil, nil, t.Error()
}
if c != "" {
logOp(clientID, "CONN", time.Since(start), "Connected to %q (%s)\n", s, c)
} else {
logOp(clientID, "CONN", time.Since(start), "Connected to %q\n", s)
}
return cl,
&Stat{
Ops: 1,
NS: map[string]time.Duration{"conn": time.Since(start)},
},
func() {
cl.Disconnect(DisconnectCleanupTimeout)
disconnectedWG.Done()
},
nil
}