-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathwatchset_test.go
121 lines (100 loc) · 3.13 KB
/
watchset_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package statedb
import (
"context"
"testing"
"time"
"github.com/cilium/statedb/part"
"github.com/stretchr/testify/require"
)
func TestWatchSet(t *testing.T) {
t.Parallel()
// NOTE: TestMain calls goleak.VerifyTestMain so we know this test doesn't leak goroutines.
ws := NewWatchSet()
// Empty watch set, cancelled context.
ctx, cancel := context.WithCancel(context.Background())
go cancel()
ch, err := ws.Wait(ctx)
require.ErrorIs(t, err, context.Canceled)
require.Nil(t, ch)
// Few channels, cancelled context.
ch1 := make(chan struct{})
ch2 := make(chan struct{})
ch3 := make(chan struct{})
ws.Add(ch1, ch2, ch3)
ctx, cancel = context.WithCancel(context.Background())
go cancel()
ch, err = ws.Wait(ctx)
require.ErrorIs(t, err, context.Canceled)
require.Nil(t, ch)
// Many channels
for _, numChans := range []int{0, 1, 8, 12, 16, 31, 32, 61, 64, 121} {
for i := range numChans {
var chans []chan struct{}
var rchans []<-chan struct{}
for range numChans {
ch := make(chan struct{})
chans = append(chans, ch)
rchans = append(rchans, ch)
}
ws.Add(rchans...)
close(chans[i])
ctx, cancel = context.WithCancel(context.Background())
ch, err := ws.Wait(ctx)
require.NoError(t, err)
require.True(t, ch == chans[i])
cancel()
}
}
}
func TestWatchSetInQueries(t *testing.T) {
t.Parallel()
db, table := newTestDBWithMetrics(t, &NopMetrics{}, tagsIndex)
ws := NewWatchSet()
txn := db.ReadTxn()
_, watchAll := table.AllWatch(txn)
// Should timeout as watches should not have closed yet.
ws.Add(watchAll)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
ch, err := ws.Wait(ctx)
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Nil(t, ch)
cancel()
// Insert some objects
wtxn := db.WriteTxn(table)
table.Insert(wtxn, testObject{ID: 1})
table.Insert(wtxn, testObject{ID: 2})
table.Insert(wtxn, testObject{ID: 3})
txn = wtxn.Commit()
// The 'watchAll' channel should now have closed and Wait() returns.
ws.Add(watchAll)
ch, err = ws.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, ch, watchAll)
// Try watching specific objects for changes.
_, _, watch1, _ := table.GetWatch(txn, idIndex.Query(1))
_, _, watch2, _ := table.GetWatch(txn, idIndex.Query(2))
_, _, watch3, _ := table.GetWatch(txn, idIndex.Query(3))
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Millisecond)
ch, err = ws.Wait(ctx)
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Nil(t, ch)
cancel()
wtxn = db.WriteTxn(table)
table.Insert(wtxn, testObject{ID: 1, Tags: part.NewSet("foo")})
wtxn.Commit()
// Use a new WatchSet and merge it. This allows having "subsets" that we
// can then use to check whether the closed channel affected the subset.
ws2 := NewWatchSet()
ws2.Add(watch3, watch2, watch1)
// Merge into the larger WatchSet. This still leaves all the channels
// in ws2.
ws.Merge(ws2)
ch, err = ws.Wait(context.Background())
require.NoError(t, err)
require.True(t, ch == watch1)
require.True(t, ws2.Has(ch))
ws2.Clear()
require.False(t, ws2.Has(ch))
}