@@ -232,6 +232,9 @@ class VrpcClient extends EventEmitter {
232
232
this . _agents [ agent ] . status = status
233
233
this . _agents [ agent ] . hostname = hostname
234
234
this . _agents [ agent ] . version = version
235
+ if ( status === 'offline' ) {
236
+ this . _clearCachedSubscriptions ( { lostAgent : agent } )
237
+ }
235
238
this . emit ( 'agent' , { domain, agent, status, hostname, version } )
236
239
// ClassInfo message
237
240
} else if (
@@ -251,6 +254,7 @@ class VrpcClient extends EventEmitter {
251
254
json
252
255
if ( removed . length !== 0 ) {
253
256
this . emit ( 'instanceGone' , removed , { domain, agent, className } )
257
+ this . _clearCachedSubscriptions ( { lostInstance : removed } )
254
258
}
255
259
if ( added . length !== 0 ) {
256
260
this . emit ( 'instanceNew' , added , { domain, agent, className } )
@@ -1179,6 +1183,29 @@ class VrpcClient extends EventEmitter {
1179
1183
return id
1180
1184
}
1181
1185
1186
+ _clearCachedSubscriptions ( { lostAgent, lostInstance } ) {
1187
+ const obsoleteTopics = Object . keys ( this . _cachedSubscriptions ) . filter (
1188
+ topic => {
1189
+ const [ , agent , , instanceEvent ] = topic . split ( '/' )
1190
+ const [ instance ] = instanceEvent . split ( ':' )
1191
+ if ( agent === lostAgent || instance === lostInstance ) {
1192
+ return true
1193
+ }
1194
+ return false
1195
+ }
1196
+ )
1197
+ obsoleteTopics . forEach ( topic => {
1198
+ const id = `__e__${ topic } `
1199
+ this . _log . info ( `Clearing subscriptions for obsolete topic: ${ topic } ` )
1200
+ const subscriptions = this . _cachedSubscriptions [ topic ]
1201
+ subscriptions . forEach ( ( { handler } ) => {
1202
+ this . _eventEmitter . removeListener ( id , handler )
1203
+ } )
1204
+ this . _mqttUnsubscribe ( topic )
1205
+ delete this . _cachedSubscriptions [ topic ]
1206
+ } )
1207
+ }
1208
+
1182
1209
_stripSignature ( method ) {
1183
1210
const pos = method . indexOf ( '-' )
1184
1211
if ( pos > 0 ) return method . substring ( 0 , pos )
0 commit comments