-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathwebsocket.go
301 lines (251 loc) · 7.73 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
// Package rewebsocket adds autoreconnect to Gorilla WebSocket.
package rewebsocket
import (
"fmt"
"io"
"sync"
"time"
"github.com/gorilla/websocket"
)
type state int
// ensure we implement io.Closer
var (
_ io.Closer = &TextClient{}
)
const (
stateFresh state = iota
stateOpen
stateClosed
)
// Dialer will be called to establish the WebSocket connection. The operation
// might will be retried using the Retry function. If this function blocks it
// will halt (re)connect and close progress. The given cancel channel will be
// closed when the TextClient is closed, allowing the user to cancel
// long running operations.
type Dialer func(cancel chan struct{}) (*websocket.Conn, error)
// TextClient is a WebSocket text client that automatically reconnects
// to the remote service. All fields that you decide to set must be set before
// calling Open and are not safe to be modified after. When a connection
// problem occurs, writes will fail and some incoming messages may be lost. The
// client can only be opened and closed once.
type TextClient struct {
// OnReadMessage will be called for each incoming message. Messages will not
// be processed concurrently unless the implementing function runs its logic
// in a different goroutine. If this function blocks it will block the read
// loop (which allows for flow control) and delay closing until it is
// unblocked. If not set, messages will still be read but ignored. The given
// cancel channel will be closed when the TextClient is closed,
// allowing the user to cancel long running operations.
OnReadMessage func(cancel chan struct{}, msg []byte)
// debug logs
logln func(...interface{})
// OnError is called when there is a non-fatal error (typically failing to
// read a message) This function will run in its own goroutine. If not set
// the event will be lost.
OnError func(error)
// OnFatal is called when there is a fatal error (we can't reconnect after
// retrying using the retry function). When there is a fatal error the client
// will be closed automatically, right before calling OnFatal. If not set the
// client will still automatically close but the event will be lost. This
// function will run in its own goroutine.
OnFatal func(error)
// Retry is a function that retries the given function until it gives up and
// returns an error when the given channel is closed. It is used when
// reconnecting. When the function returns an error, OnFatal will be called
// and the client will be closed automatically. If not set reconnect
// operations will only be attempted once. If this function blocks it will
// halt reconnect progress. It will be called from a single goroutine.
Retry func(chan struct{}, func() error) error
// dialer is set by Open() and then guarded by the reconnect loop
// Dialer will be called to establish the WebSocket connection. The operation
// might will be retried using the Retry function. If this function blocks it
// will halt (re)connect and close progress. The given cancel channel will be
// closed when the TextClient is closed, allowing the user to cancel
// long running operations.
dialer Dialer
// never reassigned after Open
close chan struct{}
closeWG sync.WaitGroup
reconnectCh chan bool
// guarded by connMutex
conn *websocket.Conn
connMutex sync.RWMutex
// guarded by stateMutex
state state
stateMutex sync.Mutex
}
// Open opens the connection to the given URL and starts receiving messages
func (c *TextClient) Open(dialer Dialer) error {
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
switch c.state {
case stateClosed:
return fmt.Errorf("already closed")
case stateOpen:
return fmt.Errorf("already open")
}
if c.logln == nil {
c.logln = func(...interface{}) {}
}
if c.Retry == nil {
c.Retry = func(done chan struct{}, f func() error) error {
return f()
}
}
if c.OnReadMessage == nil {
c.OnReadMessage = func(chan struct{}, []byte) {}
}
if c.OnError == nil {
c.OnError = func(error) {}
}
c.dialer = dialer
c.close = make(chan struct{})
{
conn, err := dialer(c.close)
if err != nil {
return err
}
c.connMutex.Lock()
c.conn = conn
c.connMutex.Unlock()
}
// we rely on this being not buffered so only one reconnect can happen at a time
c.reconnectCh = make(chan bool)
go c.reconnectLoop()
c.state = stateOpen
go c.readLoop()
return nil
}
// Close sends a close frame and then closes the underlying connection. It will
// block until a full shutdown has been achieved.
func (c *TextClient) Close() error {
// This needs to happen before taking the locks because the loops use them too and
// they will potentially be deadlocked if we wait while holding the locks.
defer c.closeWG.Wait()
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
if err := c.checkOpen(); err != nil {
return err
}
c.state = stateClosed
close(c.close)
c.connMutex.RLock()
defer c.connMutex.RUnlock()
if err := c.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(1*time.Second)); err != nil {
// TODO(robbiev): preserve both errors?
_ = c.conn.Close()
return err
}
return c.conn.Close()
}
// WriteTextMessage writes a text message to the WebSocket
func (c *TextClient) WriteTextMessage(msg []byte) error {
c.connMutex.RLock()
conn := c.conn
c.connMutex.RUnlock()
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
c.tryReconnect()
return err
}
return nil
}
// trigger a reconnect unless one is already in progress
func (c *TextClient) tryReconnect() {
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
if err := c.checkOpen(); err != nil {
return
}
select {
case c.reconnectCh <- true:
default:
}
}
// should run in its own goroutine
func (c *TextClient) reconnectLoop() {
c.closeWG.Add(1)
defer c.closeWG.Done()
for {
select {
case <-c.close:
goto Exit
case _ = <-c.reconnectCh:
c.logln("reconnect")
c.connMutex.RLock()
_ = c.conn.Close()
c.connMutex.RUnlock()
var conn *websocket.Conn
var err error
err = c.Retry(c.close, func() error {
conn, err = c.dialer(c.close)
return err
})
if err != nil {
if c.OnFatal != nil {
go func() {
c.Close()
c.OnFatal(err)
}()
} else {
go c.Close()
}
goto Exit
}
c.connMutex.Lock()
c.conn = conn
c.connMutex.Unlock()
// if we closed in the meanwhile, disconnect
c.stateMutex.Lock()
if c.state == stateClosed {
c.stateMutex.Unlock()
goto Exit
}
go c.readLoop()
c.stateMutex.Unlock()
}
}
Exit:
c.logln("exit reconnect")
}
func (c *TextClient) readLoop() {
c.closeWG.Add(1)
defer c.closeWG.Done()
for {
select {
case <-c.close:
return
default:
c.connMutex.RLock()
conn := c.conn
c.connMutex.RUnlock()
msgType, msg, err := conn.ReadMessage()
if err != nil {
go c.OnError(err)
c.tryReconnect()
return
}
if msgType != websocket.TextMessage {
continue
}
// Note that the reason we don't send a reader (using gorilla's
// NextReader) because it complicates processing; the reader should be
// processed synchronously in the read loop because calling NextReader
// again invalidates the previous reader. So then the first part of the
// user's code must be synchronous but the rest can run in a goroutine,
// which is easy to forget. Of course this costs us an extra allocation,
// if this ever becomes a problem we can add an additional callback that
// does take a reader and if set, only call that one.
c.OnReadMessage(c.close, msg)
}
}
}
// need to take stateMutex when accessing this function
func (c *TextClient) checkOpen() error {
switch c.state {
case stateClosed:
return fmt.Errorf("already closed")
case stateFresh:
return fmt.Errorf("not open")
}
return nil
}