-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathapp_test.go
123 lines (113 loc) · 3.07 KB
/
app_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package cloud
import (
"crypto/tls"
"encoding/json"
"net/http"
"net/url"
"sync/atomic"
"testing"
"time"
"github.com/cisco-pxgrid/cloud-sdk-go/internal/pubsub/test"
"github.com/cisco-pxgrid/cloud-sdk-go/internal/rpc"
"github.com/cisco-pxgrid/websocket"
"github.com/stretchr/testify/require"
)
func TestReconnect(t *testing.T) {
// Prepare test
reconnectBackoff = 1 * time.Second
reconnectDelay = 1 * time.Second
message := `{
"readStream":[
{
"msgId": "msg1",
"headers": {
"messageType": "data",
"tenant": "tenant1",
"device": "device1",
"key1": "val1"
},
"payload": "VGhpcyBpcyBhIHRlc3Q="
}
]
}`
var messageChan = make(chan string, 1)
var closeChan = make(chan struct{}, 1)
var connCount atomic.Uint32
s, _ := test.NewRPCServer(t, test.Config{
ConnHandler: func() int {
connCount.Add(1)
return http.StatusOK
},
ConsumeHandler: func(conn *websocket.Conn, req *rpc.Request) *rpc.Response {
var payload string
select {
case <-closeChan:
conn.Close(websocket.StatusNormalClosure, "")
return nil
case payload = <-messageChan:
default:
payload = "{}"
}
result := `{
"consumeContext": "ctx1",
"subscriptionId": "sub1",
"messages":` + payload + `}`
return &rpc.Response{
Version: "1.0",
ID: req.ID,
Result: json.RawMessage(result),
}
},
})
defer s.Close()
// Create New app
var messageCount atomic.Uint32
u, _ := url.Parse(s.URL)
config := Config{
ID: "appId",
RegionalFQDN: u.Host,
GlobalFQDN: u.Host,
WriteStreamID: "writeStream",
ReadStreamID: "readStream",
GetCredentials: func() (*Credentials, error) {
return &Credentials{ApiKey: []byte("dummy")}, nil
},
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
DeviceMessageHandler: func(messageID string, device *Device, stream string, payload []byte) {
messageCount.Add(1)
},
}
app, err := New(config)
require.NoError(t, err)
tenant, err := app.LinkTenant("dummy")
require.NoError(t, err)
// wait until devices are populated
require.Eventually(t, func() bool {
devices, err := tenant.GetDevices()
require.NoError(t, err)
return len(devices) == 1
}, 5*time.Second, 1*time.Second)
// Check received messages
messageChan <- message
require.Eventually(t, func() bool { return messageCount.Load() == 1 }, 5*time.Second, 1*time.Second)
// Check reconnect
require.True(t, connCount.Load() == 1)
closeChan <- struct{}{}
// wait for the connection to be re-established
require.Eventually(t, func() bool { return connCount.Load() == 2 }, 5*time.Second, 1*time.Second)
// wait until devices are populated
require.Eventually(t, func() bool {
devices, err := tenant.GetDevices()
require.NoError(t, err)
return len(devices) == 1
}, 5*time.Second, 1*time.Second)
// Check received messages after reconnect
messageChan <- message
require.Eventually(t, func() bool { return messageCount.Load() == 2 }, 5*time.Second, 1*time.Second)
_ = app.UnlinkTenant(tenant)
app.Close()
}