Skip to content

Commit 8a6df9e

Browse files
ceyonurDarioush Jalali
andauthored
Pausable uptime manager (#1372)
* add validator state * add pausable uptime manager * remove stuttering name * rename state listener * Update plugin/evm/validators/state.go Co-authored-by: Darioush Jalali <[email protected]> Signed-off-by: Ceyhun Onur <[email protected]> * use update enum * Update plugin/evm/validators/state.go Co-authored-by: Darioush Jalali <[email protected]> Signed-off-by: Ceyhun Onur <[email protected]> * Update plugin/evm/validators/state.go Co-authored-by: Darioush Jalali <[email protected]> Signed-off-by: Ceyhun Onur <[email protected]> * respond to comments * update avalanchego dep branch * reviews * reword errs * fix test changes * fix upgrades after deactivating latest in context * use branch commit for ava version * reviews * add listener mock * remove errs from resume and pause * check after stopping * use expectedTime in tests * reviews * fix requires * underscore unused params --------- Signed-off-by: Ceyhun Onur <[email protected]> Co-authored-by: Darioush Jalali <[email protected]>
1 parent 951e875 commit 8a6df9e

File tree

2 files changed

+385
-0
lines changed

2 files changed

+385
-0
lines changed

plugin/evm/uptime/pausable_manager.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package uptime
5+
6+
import (
7+
"errors"
8+
9+
"github.com/ava-labs/subnet-evm/plugin/evm/validators"
10+
"github.com/ethereum/go-ethereum/log"
11+
12+
"github.com/ava-labs/avalanchego/ids"
13+
"github.com/ava-labs/avalanchego/snow/uptime"
14+
"github.com/ava-labs/avalanchego/utils/set"
15+
)
16+
17+
var _ validators.StateCallbackListener = &pausableManager{}
18+
19+
var errPausedDisconnect = errors.New("paused node cannot be disconnected")
20+
21+
type PausableManager interface {
22+
uptime.Manager
23+
validators.StateCallbackListener
24+
IsPaused(nodeID ids.NodeID) bool
25+
}
26+
27+
type pausableManager struct {
28+
uptime.Manager
29+
pausedVdrs set.Set[ids.NodeID]
30+
// connectedVdrs is a set of nodes that are connected to the manager.
31+
// This is used to immediately connect nodes when they are unpaused.
32+
connectedVdrs set.Set[ids.NodeID]
33+
}
34+
35+
// NewPausableManager takes an uptime.Manager and returns a PausableManager
36+
func NewPausableManager(manager uptime.Manager) PausableManager {
37+
return &pausableManager{
38+
pausedVdrs: make(set.Set[ids.NodeID]),
39+
connectedVdrs: make(set.Set[ids.NodeID]),
40+
Manager: manager,
41+
}
42+
}
43+
44+
// Connect connects the node with the given ID to the uptime.Manager
45+
// If the node is paused, it will not be connected
46+
func (p *pausableManager) Connect(nodeID ids.NodeID) error {
47+
p.connectedVdrs.Add(nodeID)
48+
if !p.IsPaused(nodeID) && !p.Manager.IsConnected(nodeID) {
49+
return p.Manager.Connect(nodeID)
50+
}
51+
return nil
52+
}
53+
54+
// Disconnect disconnects the node with the given ID from the uptime.Manager
55+
// If the node is paused, it will not be disconnected
56+
// Invariant: we should never have a connected paused node that is disconnecting
57+
func (p *pausableManager) Disconnect(nodeID ids.NodeID) error {
58+
p.connectedVdrs.Remove(nodeID)
59+
if p.Manager.IsConnected(nodeID) {
60+
if p.IsPaused(nodeID) {
61+
// We should never see this case
62+
return errPausedDisconnect
63+
}
64+
return p.Manager.Disconnect(nodeID)
65+
}
66+
return nil
67+
}
68+
69+
// StartTracking starts tracking uptime for the nodes with the given IDs
70+
// If a node is paused, it will not be tracked
71+
func (p *pausableManager) StartTracking(nodeIDs []ids.NodeID) error {
72+
activeNodeIDs := make([]ids.NodeID, 0, len(nodeIDs))
73+
for _, nodeID := range nodeIDs {
74+
if !p.IsPaused(nodeID) {
75+
activeNodeIDs = append(activeNodeIDs, nodeID)
76+
}
77+
}
78+
return p.Manager.StartTracking(activeNodeIDs)
79+
}
80+
81+
// OnValidatorAdded is called when a validator is added.
82+
// If the node is inactive, it will be paused.
83+
func (p *pausableManager) OnValidatorAdded(_ ids.ID, nodeID ids.NodeID, _ uint64, isActive bool) {
84+
if !isActive {
85+
err := p.pause(nodeID)
86+
if err != nil {
87+
log.Error("failed to handle added validator %s: %s", nodeID, err)
88+
}
89+
}
90+
}
91+
92+
// OnValidatorRemoved is called when a validator is removed.
93+
// If the node is already paused, it will be resumed.
94+
func (p *pausableManager) OnValidatorRemoved(_ ids.ID, nodeID ids.NodeID) {
95+
if p.IsPaused(nodeID) {
96+
err := p.resume(nodeID)
97+
if err != nil {
98+
log.Error("failed to handle validator removed %s: %s", nodeID, err)
99+
}
100+
}
101+
}
102+
103+
// OnValidatorStatusUpdated is called when the status of a validator is updated.
104+
// If the node is active, it will be resumed. If the node is inactive, it will be paused.
105+
func (p *pausableManager) OnValidatorStatusUpdated(_ ids.ID, nodeID ids.NodeID, isActive bool) {
106+
var err error
107+
if isActive {
108+
err = p.resume(nodeID)
109+
} else {
110+
err = p.pause(nodeID)
111+
}
112+
if err != nil {
113+
log.Error("failed to update status for node %s: %s", nodeID, err)
114+
}
115+
}
116+
117+
// IsPaused returns true if the node with the given ID is paused.
118+
func (p *pausableManager) IsPaused(nodeID ids.NodeID) bool {
119+
return p.pausedVdrs.Contains(nodeID)
120+
}
121+
122+
// pause pauses uptime tracking for the node with the given ID
123+
// pause can disconnect the node from the uptime.Manager if it is connected.
124+
func (p *pausableManager) pause(nodeID ids.NodeID) error {
125+
p.pausedVdrs.Add(nodeID)
126+
if p.Manager.IsConnected(nodeID) {
127+
// If the node is connected, then we need to disconnect it from
128+
// manager
129+
// This should be fine in case tracking has not started yet since
130+
// the inner manager should handle disconnects accordingly
131+
return p.Manager.Disconnect(nodeID)
132+
}
133+
return nil
134+
}
135+
136+
// resume resumes uptime tracking for the node with the given ID
137+
// resume can connect the node to the uptime.Manager if it was connected.
138+
func (p *pausableManager) resume(nodeID ids.NodeID) error {
139+
p.pausedVdrs.Remove(nodeID)
140+
if p.connectedVdrs.Contains(nodeID) && !p.Manager.IsConnected(nodeID) {
141+
return p.Manager.Connect(nodeID)
142+
}
143+
return nil
144+
}
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package uptime
5+
6+
import (
7+
"testing"
8+
"time"
9+
10+
"github.com/ava-labs/avalanchego/ids"
11+
"github.com/ava-labs/avalanchego/snow/uptime"
12+
"github.com/ava-labs/avalanchego/utils/timer/mockable"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
func TestPausableManager(t *testing.T) {
17+
vID := ids.GenerateTestID()
18+
nodeID0 := ids.GenerateTestNodeID()
19+
startTime := time.Now()
20+
21+
tests := []struct {
22+
name string
23+
testFunc func(t *testing.T, up PausableManager, clk *mockable.Clock, s uptime.State)
24+
}{
25+
{
26+
name: "Case 1: Connect, pause, start tracking",
27+
testFunc: func(t *testing.T, up PausableManager, clk *mockable.Clock, s uptime.State) {
28+
require := require.New(t)
29+
30+
// Connect before tracking
31+
require.NoError(up.Connect(nodeID0))
32+
addTime(clk, time.Second)
33+
34+
// Pause before tracking
35+
up.OnValidatorStatusUpdated(vID, nodeID0, false)
36+
require.True(up.IsPaused(nodeID0))
37+
38+
// Elapse Time
39+
addTime(clk, time.Second)
40+
41+
// Start tracking
42+
require.NoError(up.StartTracking([]ids.NodeID{nodeID0}))
43+
currentTime := addTime(clk, time.Second)
44+
// Uptime should not have increased since the node was paused
45+
expectedUptime := 0 * time.Second
46+
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
47+
48+
// Disconnect
49+
require.NoError(up.Disconnect(nodeID0))
50+
// Uptime should not have increased
51+
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
52+
},
53+
},
54+
{
55+
name: "Case 2: Start tracking, connect, pause, re-connect, resume",
56+
testFunc: func(t *testing.T, up PausableManager, clk *mockable.Clock, s uptime.State) {
57+
require := require.New(t)
58+
59+
// Start tracking
60+
require.NoError(up.StartTracking([]ids.NodeID{nodeID0}))
61+
62+
// Connect
63+
addTime(clk, 1*time.Second)
64+
require.NoError(up.Connect(nodeID0))
65+
66+
// Pause
67+
addTime(clk, 1*time.Second)
68+
up.OnValidatorStatusUpdated(vID, nodeID0, false)
69+
require.True(up.IsPaused(nodeID0))
70+
71+
// Elapse time
72+
currentTime := addTime(clk, 2*time.Second)
73+
// Uptime should be 1 second since the node was paused after 1 sec
74+
expectedUptime := 1 * time.Second
75+
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
76+
77+
// Disconnect and check uptime
78+
currentTime = addTime(clk, 3*time.Second)
79+
require.NoError(up.Disconnect(nodeID0))
80+
// Uptime should not have increased since the node was paused
81+
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
82+
83+
// Connect again and check uptime
84+
addTime(clk, 4*time.Second)
85+
require.NoError(up.Connect(nodeID0))
86+
currentTime = addTime(clk, 5*time.Second)
87+
// Uptime should not have increased since the node was paused
88+
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
89+
90+
// Resume and check uptime
91+
currentTime = addTime(clk, 6*time.Second)
92+
up.OnValidatorStatusUpdated(vID, nodeID0, true)
93+
require.False(up.IsPaused(nodeID0))
94+
// Uptime should not have increased since the node was paused
95+
// and we just resumed it
96+
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
97+
98+
// Elapsed time check
99+
currentTime = addTime(clk, 7*time.Second)
100+
// Uptime should increase by 7 seconds above since the node was resumed
101+
expectedUptime += 7 * time.Second
102+
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
103+
},
104+
},
105+
{
106+
name: "Case 3: Pause, start tracking, connect, re-connect, resume",
107+
testFunc: func(t *testing.T, up PausableManager, clk *mockable.Clock, s uptime.State) {
108+
require := require.New(t)
109+
110+
// Pause before tracking
111+
up.OnValidatorStatusUpdated(vID, nodeID0, false)
112+
require.True(up.IsPaused(nodeID0))
113+
114+
// Start tracking
115+
addTime(clk, time.Second)
116+
require.NoError(up.StartTracking([]ids.NodeID{nodeID0}))
117+
118+
// Connect and check uptime
119+
addTime(clk, 1*time.Second)
120+
require.NoError(up.Connect(nodeID0))
121+
122+
currentTime := addTime(clk, 2*time.Second)
123+
// Uptime should not have increased since the node was paused
124+
expectedUptime := 0 * time.Second
125+
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
126+
127+
// Disconnect and check uptime
128+
currentTime = addTime(clk, 3*time.Second)
129+
require.NoError(up.Disconnect(nodeID0))
130+
// Uptime should not have increased since the node was paused
131+
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
132+
133+
// Connect again and resume
134+
addTime(clk, 4*time.Second)
135+
require.NoError(up.Connect(nodeID0))
136+
addTime(clk, 5*time.Second)
137+
up.OnValidatorStatusUpdated(vID, nodeID0, true)
138+
require.False(up.IsPaused(nodeID0))
139+
140+
// Check uptime after resume
141+
currentTime = addTime(clk, 6*time.Second)
142+
// Uptime should have increased by 6 seconds since the node was resumed
143+
expectedUptime += 6 * time.Second
144+
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
145+
},
146+
},
147+
{
148+
name: "Case 4: Start tracking, connect, pause, stop tracking, resume tracking",
149+
testFunc: func(t *testing.T, up PausableManager, clk *mockable.Clock, s uptime.State) {
150+
require := require.New(t)
151+
152+
// Start tracking and connect
153+
require.NoError(up.StartTracking([]ids.NodeID{nodeID0}))
154+
addTime(clk, time.Second)
155+
require.NoError(up.Connect(nodeID0))
156+
157+
// Pause and check uptime
158+
currentTime := addTime(clk, 2*time.Second)
159+
up.OnValidatorStatusUpdated(vID, nodeID0, false)
160+
require.True(up.IsPaused(nodeID0))
161+
// Uptime should be 2 seconds since the node was paused after 2 seconds
162+
expectedUptime := 2 * time.Second
163+
164+
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
165+
166+
// Stop tracking and reinitialize manager
167+
currentTime = addTime(clk, 3*time.Second)
168+
require.NoError(up.StopTracking([]ids.NodeID{nodeID0}))
169+
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
170+
up = NewPausableManager(uptime.NewManager(s, clk))
171+
172+
// Uptime should not have increased since the node was paused
173+
// and we have not started tracking again
174+
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
175+
176+
// Pause and check uptime
177+
up.OnValidatorStatusUpdated(vID, nodeID0, false)
178+
require.True(up.IsPaused(nodeID0))
179+
// Uptime should not have increased since the node was paused
180+
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
181+
182+
// Resume and check uptime
183+
currentTime = addTime(clk, 5*time.Second)
184+
up.OnValidatorStatusUpdated(vID, nodeID0, true)
185+
require.False(up.IsPaused(nodeID0))
186+
// Uptime should have increased by 5 seconds since the node was resumed
187+
expectedUptime += 5 * time.Second
188+
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
189+
190+
// Start tracking and check elapsed time
191+
currentTime = addTime(clk, 6*time.Second)
192+
require.NoError(up.StartTracking([]ids.NodeID{nodeID0}))
193+
// Uptime should have increased by 6 seconds since we started tracking
194+
// and node was resumed (we assume the node was online until we started tracking)
195+
expectedUptime += 6 * time.Second
196+
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
197+
198+
// Elapsed time
199+
currentTime = addTime(clk, 7*time.Second)
200+
// Uptime should not have increased since the node was not connected
201+
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
202+
203+
// Connect and final uptime check
204+
require.NoError(up.Connect(nodeID0))
205+
currentTime = addTime(clk, 8*time.Second)
206+
// Uptime should have increased by 8 seconds since the node was connected
207+
expectedUptime += 8 * time.Second
208+
checkUptime(t, up, nodeID0, expectedUptime, currentTime)
209+
},
210+
},
211+
}
212+
213+
for _, test := range tests {
214+
t.Run(test.name, func(t *testing.T) {
215+
up, clk, s := setupTestEnv(nodeID0, startTime)
216+
test.testFunc(t, up, clk, s)
217+
})
218+
}
219+
}
220+
221+
func setupTestEnv(nodeID ids.NodeID, startTime time.Time) (PausableManager, *mockable.Clock, uptime.State) {
222+
clk := mockable.Clock{}
223+
clk.Set(startTime)
224+
s := uptime.NewTestState()
225+
s.AddNode(nodeID, startTime)
226+
up := NewPausableManager(uptime.NewManager(s, &clk))
227+
return up, &clk, s
228+
}
229+
230+
func addTime(clk *mockable.Clock, duration time.Duration) time.Time {
231+
clk.Set(clk.Time().Add(duration))
232+
return clk.Time()
233+
}
234+
235+
func checkUptime(t *testing.T, up PausableManager, nodeID ids.NodeID, expectedUptime time.Duration, expectedLastUpdate time.Time) {
236+
t.Helper()
237+
uptime, lastUpdated, err := up.CalculateUptime(nodeID)
238+
require.NoError(t, err)
239+
require.Equal(t, expectedLastUpdate.Unix(), lastUpdated.Unix())
240+
require.Equal(t, expectedUptime, uptime)
241+
}

0 commit comments

Comments
 (0)