Skip to content

Commit bdd3ef5

Browse files
nurzhan-saktaganovKaymeKaydex
authored andcommitted
test: return back TestConncurrentTopologyChange
* test was deleted in #46
1 parent 09267a9 commit bdd3ef5

File tree

2 files changed

+192
-0
lines changed

2 files changed

+192
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ CHANGES:
1616
TESTS:
1717
* Fixed etcd overlapping ports.
1818
* Fixed Pooler mocks generation.
19+
* Return back TestConncurrentTopologyChange.
1920

2021
## v2.0.4
2122

concurrent_test.go

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
package vshard_router_test
2+
3+
import (
4+
"context"
5+
"math/rand"
6+
"sync"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/require"
11+
vshardrouter "github.com/tarantool/go-vshard-router/v2"
12+
)
13+
14+
type concurrentTopologyProvider struct {
15+
done chan struct{}
16+
closed chan struct{}
17+
t *testing.T
18+
}
19+
20+
func (c *concurrentTopologyProvider) Init(tc vshardrouter.TopologyController) error {
21+
ctx := context.Background()
22+
23+
var cfg = make(map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo)
24+
for k, v := range topology {
25+
cfg[k] = v
26+
}
27+
28+
err := tc.AddReplicasets(ctx, cfg)
29+
require.NoError(c.t, err)
30+
31+
c.done = make(chan struct{})
32+
c.closed = make(chan struct{})
33+
34+
added := cfg
35+
removed := make(map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo)
36+
37+
go func() {
38+
defer close(c.closed)
39+
//nolint:errcheck
40+
defer tc.AddReplicasets(ctx, removed)
41+
42+
type actiont int
43+
44+
const add actiont = 0
45+
const remove actiont = 1
46+
47+
for {
48+
select {
49+
case <-c.done:
50+
return
51+
default:
52+
}
53+
54+
canAdd := len(removed) > 0
55+
canRemove := len(added) > 0
56+
57+
var action actiont
58+
59+
switch {
60+
case canAdd && canRemove:
61+
//nolint:gosec
62+
action = actiont(rand.Int() % 2)
63+
case canAdd:
64+
action = add
65+
case canRemove:
66+
action = remove
67+
default:
68+
require.Failf(c.t, "unreachable case", "%v, %v", added, removed)
69+
}
70+
71+
switch action {
72+
case add:
73+
var keys []vshardrouter.ReplicasetInfo
74+
for k := range removed {
75+
keys = append(keys, k)
76+
}
77+
//nolint:gosec
78+
key := keys[rand.Int()%len(keys)]
79+
80+
added[key] = removed[key]
81+
delete(removed, key)
82+
83+
_ = tc.AddReplicaset(ctx, key, added[key])
84+
case remove:
85+
var keys []vshardrouter.ReplicasetInfo
86+
for k := range added {
87+
keys = append(keys, k)
88+
}
89+
//nolint:gosec
90+
key := keys[rand.Int()%len(keys)]
91+
92+
removed[key] = added[key]
93+
delete(added, key)
94+
95+
_ = tc.RemoveReplicaset(ctx, key.UUID.String())
96+
default:
97+
require.Fail(c.t, "unreachable case")
98+
}
99+
}
100+
}()
101+
102+
return nil
103+
}
104+
105+
func (c *concurrentTopologyProvider) Close() {
106+
close(c.done)
107+
<-c.closed
108+
}
109+
110+
func TestConncurrentTopologyChange(t *testing.T) {
111+
/* What we do:
112+
1) Addreplicaset + Removereplicaset by random in one goroutine
113+
2) Call ReplicaCall, MapRw and etc. in another goroutines
114+
*/
115+
116+
// Don't run this parallel with other tests, because this test is heavy and used to detect data races.
117+
// Therefore this test may impact other ones.
118+
// t.Parallel()
119+
120+
tc := &concurrentTopologyProvider{t: t}
121+
122+
router, err := vshardrouter.NewRouter(context.Background(), vshardrouter.Config{
123+
TopologyProvider: tc,
124+
DiscoveryTimeout: 5 * time.Second,
125+
DiscoveryMode: vshardrouter.DiscoveryModeOn,
126+
TotalBucketCount: totalBucketCount,
127+
User: username,
128+
})
129+
130+
require.Nil(t, err, "NewRouter finished successfully")
131+
132+
wg := sync.WaitGroup{}
133+
134+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
135+
defer cancel()
136+
137+
const concurrentCalls = 100
138+
callCntArr := make([]int, concurrentCalls)
139+
for i := 0; i < 100; i++ {
140+
i := i
141+
wg.Add(1)
142+
go func() {
143+
defer wg.Done()
144+
145+
for {
146+
select {
147+
case <-ctx.Done():
148+
return
149+
default:
150+
}
151+
152+
bucketID := randBucketID(totalBucketCount)
153+
args := []interface{}{"arg1"}
154+
155+
callOpts := vshardrouter.CallOpts{}
156+
157+
_, _ = router.Call(ctx, bucketID, vshardrouter.CallModeBRO, "echo", args, callOpts)
158+
callCntArr[i]++
159+
}
160+
}()
161+
}
162+
163+
var mapCnt int
164+
wg.Add(1)
165+
go func() {
166+
defer wg.Done()
167+
168+
for {
169+
select {
170+
case <-ctx.Done():
171+
return
172+
default:
173+
}
174+
175+
args := []interface{}{"arg1"}
176+
_, _ = vshardrouter.RouterMapCallRW[interface{}](router, ctx, "echo", args, vshardrouter.RouterMapCallRWOptions{})
177+
mapCnt++
178+
}
179+
}()
180+
181+
wg.Wait()
182+
183+
var callCnt int
184+
for _, v := range callCntArr {
185+
callCnt += v
186+
}
187+
188+
t.Logf("Call cnt=%d, map cnt=%d", callCnt, mapCnt)
189+
190+
tc.Close()
191+
}

0 commit comments

Comments
 (0)