Skip to content

Commit 7faa1b0

Browse files
authored
Merge pull request #614 from hashicorp/feature/per-node-reconnect-timeout
2 parents 9309b94 + 943cc43 commit 7faa1b0

File tree

8 files changed

+201
-30
lines changed

8 files changed

+201
-30
lines changed

coordinate/performance_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func TestPerformance_Height(t *testing.T) {
7575

7676
// Make sure the height looks reasonable with the regular nodes all in a
7777
// plane, and the center node up above.
78-
for i, _ := range clients {
78+
for i := range clients {
7979
coord := clients[i].GetCoordinate()
8080
if i == 0 {
8181
if coord.Height < 0.97*radius.Seconds() {
@@ -146,7 +146,7 @@ func TestPerformance_Drift(t *testing.T) {
146146
}
147147

148148
mid := make([]float64, config.Dimensionality)
149-
for i, _ := range mid {
149+
for i := range mid {
150150
mid[i] = min.Vec[i] + (max.Vec[i]-min.Vec[i])/2
151151
}
152152
return magnitude(mid)

coordinate/phantom.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
// given config.
1212
func GenerateClients(nodes int, config *Config) ([]*Client, error) {
1313
clients := make([]*Client, nodes)
14-
for i, _ := range clients {
14+
for i := range clients {
1515
client, err := NewClient(config)
1616
if err != nil {
1717
return nil, err
@@ -146,7 +146,7 @@ func Simulate(clients []*Client, truth [][]time.Duration, cycles int) {
146146

147147
nodes := len(clients)
148148
for cycle := 0; cycle < cycles; cycle++ {
149-
for i, _ := range clients {
149+
for i := range clients {
150150
if j := rand.Intn(nodes); j != i {
151151
c := clients[j].GetCoordinate()
152152
rtt := truth[i][j]

coordinate/util_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func verifyEqualVectors(t *testing.T, vec1 []float64, vec2 []float64) {
2121
t.Fatalf("vector length mismatch, %d != %d", len(vec1), len(vec2))
2222
}
2323

24-
for i, _ := range vec1 {
24+
for i := range vec1 {
2525
verifyEqualFloats(t, vec1[i], vec2[i])
2626
}
2727
}

serf/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,10 @@ type Config struct {
254254
// WARNING: this should ONLY be used in tests
255255
messageDropper func(typ messageType) bool
256256

257+
// ReconnectTimeoutOverride is an optional interface which when present allows
258+
// the application to cause reaping of a node to happen when it otherwise wouldn't
259+
ReconnectTimeoutOverride ReconnectTimeoutOverrider
260+
257261
// ValidateNodeNames controls whether nodenames only
258262
// contain alphanumeric, dashes and '.'characters
259263
// and sets maximum length to 128 characters

serf/merge_delegate.go

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package serf
33
import (
44
"fmt"
55
"net"
6-
"strconv"
76

87
"github.com/hashicorp/memberlist"
98
)
@@ -61,26 +60,12 @@ func (m *mergeDelegate) nodeToMember(n *memberlist.Node) (*Member, error) {
6160

6261
// validateMemberInfo checks that the data we are sending is valid
6362
func (m *mergeDelegate) validateMemberInfo(n *memberlist.Node) error {
64-
if err := m.serf.ValidateNodeNames(); err != nil {
63+
if err := m.serf.validateNodeName(n.Name); err != nil {
6564
return err
6665
}
6766

68-
host, port, err := net.SplitHostPort(string(n.Addr))
69-
if err != nil {
70-
return err
71-
}
72-
73-
ip := net.ParseIP(host)
74-
if ip == nil || (ip.To4() == nil && ip.To16() == nil) {
75-
return fmt.Errorf("%v is not a valid IPv4 or IPv6 address\n", ip)
76-
}
77-
78-
p, err := strconv.Atoi(port)
79-
if err != nil {
80-
return err
81-
}
82-
if p < 0 || p > 65535 {
83-
return fmt.Errorf("invalid port %v , port must be a valid number from 0-65535", p)
67+
if len(n.Addr) != 4 && len(n.Addr) != 16 {
68+
return fmt.Errorf("IP byte length is invalid: %d bytes is not either 4 or 16", len(n.Addr))
8469
}
8570

8671
if len(n.Meta) > memberlist.MetaMaxSize {

serf/merge_delegate_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package serf
2+
3+
import (
4+
"net"
5+
"strings"
6+
"testing"
7+
8+
"github.com/hashicorp/memberlist"
9+
)
10+
11+
func TestValidateMemberInfo(t *testing.T) {
12+
type testCase struct {
13+
name string
14+
addr net.IP
15+
meta []byte
16+
validateNodeNames bool
17+
err string
18+
}
19+
20+
cases := map[string]testCase{
21+
"invalid-name-chars": {
22+
name: "space not allowed",
23+
addr: []byte{1, 2, 3, 4},
24+
validateNodeNames: true,
25+
err: "Node name contains invalid characters",
26+
},
27+
"invalid-name-chars-not-validated": {
28+
name: "space not allowed",
29+
addr: []byte{1, 2, 3, 4},
30+
validateNodeNames: false,
31+
},
32+
"invalid-name-len": {
33+
name: strings.Repeat("abcd", 33),
34+
addr: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
35+
validateNodeNames: true,
36+
err: "Node name is 132 characters.", // 33 * 4
37+
},
38+
"invalid-name-len-not-validated": {
39+
name: strings.Repeat("abcd", 33),
40+
addr: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
41+
validateNodeNames: false,
42+
},
43+
"invalid-ip": {
44+
name: "test",
45+
addr: []byte{1, 2}, // length has to be 4 or 16
46+
err: "IP byte length is invalid",
47+
},
48+
"invalid-ip-2": {
49+
name: "test",
50+
addr: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}, // length has to be 4 or 16
51+
err: "IP byte length is invalid",
52+
},
53+
"meta-too-long": {
54+
name: "test",
55+
addr: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
56+
meta: []byte(strings.Repeat("a", 513)),
57+
err: "Encoded length of tags exceeds limit",
58+
},
59+
"ipv4-okay": {
60+
name: "test",
61+
addr: []byte{1, 1, 1, 1},
62+
},
63+
"ipv6-okay": {
64+
name: "test",
65+
addr: []byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
66+
},
67+
}
68+
69+
for name, tcase := range cases {
70+
t.Run(name, func(t *testing.T) {
71+
72+
delegate := mergeDelegate{
73+
serf: &Serf{
74+
config: &Config{
75+
ValidateNodeNames: tcase.validateNodeNames,
76+
},
77+
},
78+
}
79+
80+
node := &memberlist.Node{
81+
Name: tcase.name,
82+
Addr: tcase.addr,
83+
Meta: tcase.meta,
84+
}
85+
86+
err := delegate.validateMemberInfo(node)
87+
88+
if tcase.err == "" {
89+
if err != nil {
90+
t.Fatalf("Encountered an unexpected error when validating member info: %v", err)
91+
}
92+
} else {
93+
if err == nil {
94+
t.Fatalf("Did not encounter the expected error of %q", tcase.err)
95+
}
96+
if !strings.Contains(err.Error(), tcase.err) {
97+
t.Fatalf("Member info validation failed with a different error than expected. Expected: %q, Actual: %q", tcase.err, err.Error())
98+
}
99+
}
100+
})
101+
}
102+
}

serf/serf.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ func init() {
5050
rand.Seed(time.Now().UnixNano())
5151
}
5252

53+
// ReconnectTimeoutOverrider is an interface that can be implemented to allow overriding
54+
// the reconnect timeout for individual members.
55+
type ReconnectTimeoutOverrider interface {
56+
ReconnectTimeout(member *Member, timeout time.Duration) time.Duration
57+
}
58+
5359
// Serf is a single node that is part of a single cluster that gets
5460
// events about joins/leaves/failures/etc. It is created with the Create
5561
// method.
@@ -1577,8 +1583,13 @@ func (s *Serf) reap(old []*memberState, now time.Time, timeout time.Duration) []
15771583
for i := 0; i < n; i++ {
15781584
m := old[i]
15791585

1586+
memberTimeout := timeout
1587+
if s.config.ReconnectTimeoutOverride != nil {
1588+
memberTimeout = s.config.ReconnectTimeoutOverride.ReconnectTimeout(&m.Member, memberTimeout)
1589+
}
1590+
15801591
// Skip if the timeout is not yet reached
1581-
if now.Sub(m.leaveTime) <= timeout {
1592+
if now.Sub(m.leaveTime) <= memberTimeout {
15821593
continue
15831594
}
15841595

@@ -1894,15 +1905,19 @@ func (s *Serf) NumNodes() (numNodes int) {
18941905
// ValidateNodeNames verifies the NodeName contains
18951906
// only alphanumeric, -, or . and is under 128 chracters
18961907
func (s *Serf) ValidateNodeNames() error {
1908+
return s.validateNodeName(s.config.NodeName)
1909+
}
1910+
1911+
func (s *Serf) validateNodeName(name string) error {
18971912
if s.config.ValidateNodeNames {
18981913
var InvalidNameRe = regexp.MustCompile(`[^A-Za-z0-9\-\.]+`)
1899-
if InvalidNameRe.MatchString(s.config.NodeName) {
1900-
return fmt.Errorf("NodeName contains invalid characters %v , Valid characters include "+
1901-
"all alpha-numerics and dashes and '.' ", s.config.NodeName)
1914+
if InvalidNameRe.MatchString(name) {
1915+
return fmt.Errorf("Node name contains invalid characters %v , Valid characters include "+
1916+
"all alpha-numerics and dashes and '.' ", name)
19021917
}
1903-
if len(s.config.NodeName) > MaxNodeNameLength {
1904-
return fmt.Errorf("NodeName is %v characters. "+
1905-
"Valid length is between 1 and 128 characters", len(s.config.NodeName))
1918+
if len(name) > MaxNodeNameLength {
1919+
return fmt.Errorf("Node name is %v characters. "+
1920+
"Valid length is between 1 and 128 characters", len(name))
19061921
}
19071922
}
19081923
return nil

serf/serf_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2999,3 +2999,68 @@ func TestSerf_ValidateNodeName(t *testing.T) {
29992999
}
30003000

30013001
}
3002+
3003+
type reconnectOverride struct {
3004+
timeout time.Duration
3005+
called bool
3006+
}
3007+
3008+
func (r *reconnectOverride) ReconnectTimeout(_ *Member, _ time.Duration) time.Duration {
3009+
r.called = true
3010+
return r.timeout
3011+
}
3012+
3013+
func TestSerf_perNodeReconnectTimeout(t *testing.T) {
3014+
ip1, returnFn1 := testutil.TakeIP()
3015+
defer returnFn1()
3016+
3017+
ip2, returnFn2 := testutil.TakeIP()
3018+
defer returnFn2()
3019+
3020+
override := reconnectOverride{timeout: 1 * time.Microsecond}
3021+
3022+
// Create the s1 config with an event channel so we can listen
3023+
eventCh := make(chan Event, 4)
3024+
s1Config := testConfig(t, ip1)
3025+
s1Config.ReconnectTimeout = 30 * time.Second
3026+
s1Config.ReconnectTimeoutOverride = &override
3027+
s1Config.EventCh = eventCh
3028+
3029+
s2Config := testConfig(t, ip2)
3030+
3031+
s1, err := Create(s1Config)
3032+
if err != nil {
3033+
t.Fatalf("err: %v", err)
3034+
}
3035+
defer s1.Shutdown()
3036+
3037+
s2, err := Create(s2Config)
3038+
if err != nil {
3039+
t.Fatalf("err: %v", err)
3040+
}
3041+
defer s2.Shutdown()
3042+
3043+
waitUntilNumNodes(t, 1, s1, s2)
3044+
3045+
_, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false)
3046+
if err != nil {
3047+
t.Fatalf("err: %v", err)
3048+
}
3049+
3050+
waitUntilNumNodes(t, 2, s1, s2)
3051+
3052+
err = s2.Shutdown()
3053+
if err != nil {
3054+
t.Fatalf("err: %v", err)
3055+
}
3056+
3057+
waitUntilNumNodes(t, 1, s1)
3058+
3059+
// Since s2 shutdown, we check the events to make sure we got failures.
3060+
testEvents(t, eventCh, s2Config.NodeName,
3061+
[]EventType{EventMemberJoin, EventMemberFailed, EventMemberReap})
3062+
3063+
if !override.called {
3064+
t.Fatalf("The reconnect override was not used")
3065+
}
3066+
}

0 commit comments

Comments
 (0)