-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathintegration_test.go
90 lines (77 loc) · 3.25 KB
/
integration_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package main
import (
"testing"
"time"
"github.com/eagraf/synchronizer/aggregator"
"github.com/eagraf/synchronizer/coordinator"
"github.com/eagraf/synchronizer/dataserver"
"github.com/eagraf/synchronizer/messenger"
"github.com/eagraf/synchronizer/selector"
"github.com/eagraf/synchronizer/service"
)
func TestCoordinatorGetWorkersFromSelectors(t *testing.T) {
sp := service.NewServicePool(5000, service.DefaultTopology)
selector.NewSelector(sp)
selector.NewSelector(sp)
selector.NewSelector(sp)
messenger.NewTestClient("http://localhost:5000/websocket/", "client1")
messenger.NewTestClient("http://localhost:5002/websocket/", "client2")
messenger.NewTestClient("http://localhost:5004/websocket/", "client3")
coordinator.NewCoordinator(sp)
time.Sleep(time.Second)
if count := service.CountTags(sp, "GetWorkersSend"); count != 3 {
t.Errorf("Incorrect number of worker sends: %d", count)
}
if count := service.CountTags(sp, "GetWorkersRecv"); count != 1 {
t.Errorf("Incorrect number of worker recvs: %d", count)
}
if exists := service.LogExists(sp, "GetWorkersRecv", "Receiving 3 workers from 3 selectors"); !exists {
t.Errorf("Correct receiving log not found")
}
}
// TODO func TestCoordinatorSchedule()
func TestCoordinatorSchedule(t *testing.T) {
sp := service.NewServicePool(5100, service.DefaultTopology)
// Three selectors, aggregators and data servers
selector.NewSelector(sp)
selector.NewSelector(sp)
selector.NewSelector(sp)
aggregator.NewAggregator(sp)
aggregator.NewAggregator(sp)
aggregator.NewAggregator(sp)
dataserver.NewDataServer(sp)
dataserver.NewDataServer(sp)
dataserver.NewDataServer(sp)
// Nine workers evenly distributed across selectors
messenger.NewTestClient("http://localhost:5100/websocket/", "client1")
messenger.NewTestClient("http://localhost:5102/websocket/", "client2")
messenger.NewTestClient("http://localhost:5104/websocket/", "client3")
messenger.NewTestClient("http://localhost:5100/websocket/", "client4")
messenger.NewTestClient("http://localhost:5102/websocket/", "client5")
messenger.NewTestClient("http://localhost:5104/websocket/", "client6")
messenger.NewTestClient("http://localhost:5100/websocket/", "client7")
messenger.NewTestClient("http://localhost:5102/websocket/", "client8")
messenger.NewTestClient("http://localhost:5104/websocket/", "client9")
// One coordinator
coordinator.NewCoordinator(sp)
time.Sleep(time.Second)
if count := service.CountTags(sp, "GetWorkersSend"); count != 3 {
t.Errorf("Incorrect number of worker sends: %d", count)
}
if count := service.CountTags(sp, "GetWorkersRecv"); count != 1 {
t.Errorf("Incorrect number of worker recvs: %d", count)
}
if exists := service.LogExists(sp, "GetWorkersRecv", "Receiving 9 workers from 3 selectors"); !exists {
t.Errorf("Correct receiving log not found")
}
// Test scheduling receives
if count := service.CountTags(sp, "DataServerReceiveSchedule"); count != 3 {
t.Errorf("Incorrect number of DataServerReceiveSchedule logs: %d", count)
}
if count := service.CountTags(sp, "AggregatorReceiveSchedule"); count != 3 {
t.Errorf("Incorrect number of AggregatorReceiveSchedule logs: %d", count)
}
if count := service.CountTags(sp, "SchedulingError"); count != 0 {
t.Errorf("Incorrect number of SchedulingError logs: %d", count)
}
}