@@ -82,17 +82,39 @@ class EventChannelManager {
82
82
// TODO(cshcomcom): Handle MAX connections.
83
83
84
84
logger . info ( `[${ LOG_HEADER } ] New connection (${ channelId } )` ) ;
85
+
85
86
webSocket . on ( 'message' , ( message ) => {
86
- this . handleMessage ( channel , message ) ;
87
+ try {
88
+ const parsedMessage = JSON . parse ( message ) ;
89
+ const messageType = parsedMessage . type ;
90
+ if ( ! messageType ) {
91
+ throw new EventHandlerError ( EventHandlerErrorCode . MISSING_MESSAGE_TYPE_IN_MSG ,
92
+ `No message type in (${ JSON . stringify ( message ) } )` ) ;
93
+ }
94
+ const messageData = parsedMessage . data ;
95
+ if ( ! messageData ) {
96
+ throw new EventHandlerError ( EventHandlerErrorCode . MISSING_MESSAGE_DATA_IN_MSG ,
97
+ `No message data in (${ JSON . stringify ( message ) } )` ) ;
98
+ }
99
+ // NOTE(platfowner): A custom ping-pong (see https://github.com/ainblockchain/ain-js/issues/171).
100
+ if ( messageType === BlockchainEventMessageTypes . PONG ) {
101
+ this . handlePong ( webSocket ) ;
102
+ } else {
103
+ this . handleMessage ( channel , messageType , messageData ) ;
104
+ }
105
+ } catch ( err ) {
106
+ logger . error ( `[${ LOG_HEADER } ] Error while process message ` +
107
+ `(message: ${ JSON . stringify ( message , null , 2 ) } , ` +
108
+ `error message: ${ err . message } )` ) ;
109
+ this . handleEventError ( channel , err ) ;
110
+ }
87
111
} ) ;
112
+
88
113
webSocket . on ( 'close' , ( _ ) => {
89
114
this . closeChannel ( channel ) ;
90
115
} ) ;
91
116
92
- // Heartbeat
93
- webSocket . on ( 'pong' , ( _ ) => {
94
- webSocket . isAlive = true ;
95
- } )
117
+
96
118
webSocket . isAlive = true ;
97
119
} catch ( err ) {
98
120
webSocket . terminate ( ) ;
@@ -222,36 +244,28 @@ class EventChannelManager {
222
244
}
223
245
}
224
246
225
- handleMessage ( channel , message ) { // TODO(cshcomcom): Manage EVENT_PROTOCOL_VERSION.
226
- const LOG_HEADER = 'handleMessage' ;
227
- try {
228
- const parsedMessage = JSON . parse ( message ) ;
229
- const messageType = parsedMessage . type ;
230
- if ( ! messageType ) {
231
- throw new EventHandlerError ( EventHandlerErrorCode . MISSING_MESSAGE_TYPE_IN_MSG ,
232
- `Can't find type from message (${ JSON . stringify ( message ) } )` ) ;
233
- }
234
- const messageData = parsedMessage . data ;
235
- if ( ! messageData ) {
236
- throw new EventHandlerError ( EventHandlerErrorCode . MISSING_MESSAGE_DATA_IN_MSG ,
237
- `Can't find data from message (${ JSON . stringify ( message ) } )` ) ;
238
- }
239
- switch ( messageType ) {
240
- case BlockchainEventMessageTypes . REGISTER_FILTER :
241
- this . handleRegisterFilterMessage ( channel , messageData ) ;
242
- break ;
243
- case BlockchainEventMessageTypes . DEREGISTER_FILTER :
244
- this . handleDeregisterFilterMessage ( channel , messageData ) ;
245
- break ;
246
- default :
247
- throw new EventHandlerError ( EventHandlerErrorCode . INVALID_MESSAGE_TYPE ,
248
- `Invalid message type (${ messageType } )` ) ;
249
- }
250
- } catch ( err ) {
251
- logger . error ( `[${ LOG_HEADER } ] Error while process message ` +
252
- `(message: ${ JSON . stringify ( message , null , 2 ) } , ` +
253
- `error message: ${ err . message } )` ) ;
254
- this . handleEventError ( channel , err ) ;
247
+ /**
248
+ * Handles a pong message.
249
+ */
250
+ handlePong ( webSocket ) {
251
+ webSocket . isAlive = true ;
252
+ }
253
+
254
+ /**
255
+ * Handles a (non-pong) message from the channel.
256
+ */
257
+ // TODO(cshcomcom): Manage EVENT_PROTOCOL_VERSION.
258
+ handleMessage ( channel , messageType , messageData ) {
259
+ switch ( messageType ) {
260
+ case BlockchainEventMessageTypes . REGISTER_FILTER :
261
+ this . handleRegisterFilterMessage ( channel , messageData ) ;
262
+ break ;
263
+ case BlockchainEventMessageTypes . DEREGISTER_FILTER :
264
+ this . handleDeregisterFilterMessage ( channel , messageData ) ;
265
+ break ;
266
+ default :
267
+ throw new EventHandlerError ( EventHandlerErrorCode . INVALID_MESSAGE_TYPE ,
268
+ `Invalid message type (${ messageType } )` ) ;
255
269
}
256
270
}
257
271
@@ -325,11 +339,16 @@ class EventChannelManager {
325
339
return ws . terminate ( ) ;
326
340
}
327
341
ws . isAlive = false ;
328
- ws . ping ( ) ;
342
+ this . sendPing ( ws ) ;
329
343
} ) ;
330
344
} , NodeConfigs . EVENT_HANDLER_HEARTBEAT_INTERVAL_MS || 15000 ) ;
331
345
}
332
346
347
+ sendPing ( webSocket ) {
348
+ const pingMessage = this . makeMessage ( BlockchainEventMessageTypes . PING , { } ) ;
349
+ webSocket . send ( JSON . stringify ( pingMessage ) ) ;
350
+ }
351
+
333
352
stopHeartbeat ( ) {
334
353
clearInterval ( this . heartbeatInterval ) ;
335
354
}
0 commit comments