Skip to content

Commit 37cd8a1

Browse files
committed
Implement heartbeat with custom ping-pong for event handler
1 parent f827128 commit 37cd8a1

File tree

1 file changed

+55
-36
lines changed

1 file changed

+55
-36
lines changed

event-handler/event-channel-manager.js

Lines changed: 55 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -82,17 +82,39 @@ class EventChannelManager {
8282
// TODO(cshcomcom): Handle MAX connections.
8383

8484
logger.info(`[${LOG_HEADER}] New connection (${channelId})`);
85+
8586
webSocket.on('message', (message) => {
86-
this.handleMessage(channel, message);
87+
try {
88+
const parsedMessage = JSON.parse(message);
89+
const messageType = parsedMessage.type;
90+
if (!messageType) {
91+
throw new EventHandlerError(EventHandlerErrorCode.MISSING_MESSAGE_TYPE_IN_MSG,
92+
`No message type in (${JSON.stringify(message)})`);
93+
}
94+
const messageData = parsedMessage.data;
95+
if (!messageData) {
96+
throw new EventHandlerError(EventHandlerErrorCode.MISSING_MESSAGE_DATA_IN_MSG,
97+
`No message data in (${JSON.stringify(message)})`);
98+
}
99+
// NOTE(platfowner): A custom ping-pong (see https://github.com/ainblockchain/ain-js/issues/171).
100+
if (messageType === BlockchainEventMessageTypes.PONG) {
101+
this.handlePong(webSocket);
102+
} else {
103+
this.handleMessage(channel, messageType, messageData);
104+
}
105+
} catch (err) {
106+
logger.error(`[${LOG_HEADER}] Error while process message ` +
107+
`(message: ${JSON.stringify(message, null, 2)}, ` +
108+
`error message: ${err.message})`);
109+
this.handleEventError(channel, err);
110+
}
87111
});
112+
88113
webSocket.on('close', (_) => {
89114
this.closeChannel(channel);
90115
});
91116

92-
// Heartbeat
93-
webSocket.on('pong', (_) => {
94-
webSocket.isAlive = true;
95-
})
117+
96118
webSocket.isAlive = true;
97119
} catch (err) {
98120
webSocket.terminate();
@@ -222,36 +244,28 @@ class EventChannelManager {
222244
}
223245
}
224246

225-
handleMessage(channel, message) { // TODO(cshcomcom): Manage EVENT_PROTOCOL_VERSION.
226-
const LOG_HEADER = 'handleMessage';
227-
try {
228-
const parsedMessage = JSON.parse(message);
229-
const messageType = parsedMessage.type;
230-
if (!messageType) {
231-
throw new EventHandlerError(EventHandlerErrorCode.MISSING_MESSAGE_TYPE_IN_MSG,
232-
`Can't find type from message (${JSON.stringify(message)})`);
233-
}
234-
const messageData = parsedMessage.data;
235-
if (!messageData) {
236-
throw new EventHandlerError(EventHandlerErrorCode.MISSING_MESSAGE_DATA_IN_MSG,
237-
`Can't find data from message (${JSON.stringify(message)})`);
238-
}
239-
switch (messageType) {
240-
case BlockchainEventMessageTypes.REGISTER_FILTER:
241-
this.handleRegisterFilterMessage(channel, messageData);
242-
break;
243-
case BlockchainEventMessageTypes.DEREGISTER_FILTER:
244-
this.handleDeregisterFilterMessage(channel, messageData);
245-
break;
246-
default:
247-
throw new EventHandlerError(EventHandlerErrorCode.INVALID_MESSAGE_TYPE,
248-
`Invalid message type (${messageType})`);
249-
}
250-
} catch (err) {
251-
logger.error(`[${LOG_HEADER}] Error while process message ` +
252-
`(message: ${JSON.stringify(message, null, 2)}, ` +
253-
`error message: ${err.message})`);
254-
this.handleEventError(channel, err);
247+
/**
248+
* Handles a pong message.
249+
*/
250+
handlePong(webSocket) {
251+
webSocket.isAlive = true;
252+
}
253+
254+
/**
255+
* Handles a (non-pong) message from the channel.
256+
*/
257+
// TODO(cshcomcom): Manage EVENT_PROTOCOL_VERSION.
258+
handleMessage(channel, messageType, messageData) {
259+
switch (messageType) {
260+
case BlockchainEventMessageTypes.REGISTER_FILTER:
261+
this.handleRegisterFilterMessage(channel, messageData);
262+
break;
263+
case BlockchainEventMessageTypes.DEREGISTER_FILTER:
264+
this.handleDeregisterFilterMessage(channel, messageData);
265+
break;
266+
default:
267+
throw new EventHandlerError(EventHandlerErrorCode.INVALID_MESSAGE_TYPE,
268+
`Invalid message type (${messageType})`);
255269
}
256270
}
257271

@@ -325,11 +339,16 @@ class EventChannelManager {
325339
return ws.terminate();
326340
}
327341
ws.isAlive = false;
328-
ws.ping();
342+
this.sendPing(ws);
329343
});
330344
}, NodeConfigs.EVENT_HANDLER_HEARTBEAT_INTERVAL_MS || 15000);
331345
}
332346

347+
sendPing(webSocket) {
348+
const pingMessage = this.makeMessage(BlockchainEventMessageTypes.PING, {});
349+
webSocket.send(JSON.stringify(pingMessage));
350+
}
351+
333352
stopHeartbeat() {
334353
clearInterval(this.heartbeatInterval);
335354
}

0 commit comments

Comments
 (0)