Skip to content

Commit 76d287f

Browse files
authored
Move all event handling to PortalNetwork class (#793)
* update event types * remove EventEmitter inheritance from Network classes * update tests
1 parent 0965f90 commit 76d287f

10 files changed

Lines changed: 66 additions & 51 deletions

File tree

packages/portalnetwork/src/client/types.ts

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,27 @@ export interface INodeAddress {
1717
nodeId: NodeId
1818
}
1919

20-
export interface PortalNetworkEvents {
21-
NodeAdded: (nodeId: NodeId, networkId: NetworkId) => void
22-
NodeRemoved: (nodeId: NodeId, networkId: NetworkId) => void
23-
ContentAdded: (key: Uint8Array, contentType: number, content: string) => void
20+
type ContentAddedEventName = `${NetworkId}:ContentAdded`
21+
type ContentAddedEventType = (key: Uint8Array, content: Uint8Array) => Promise<void | { content: Uint8Array; utp: boolean }>
22+
type ContentAddedEvents = {
23+
[K in ContentAddedEventName]: ContentAddedEventType
24+
}
25+
26+
type NodeAddedEventName = `${NetworkId}:NodeAdded`
27+
type NodeAddedEventType = (nodeId: NodeId) => void
28+
type NodeAddedEvents = {
29+
[K in NodeAddedEventName]: NodeAddedEventType
30+
}
31+
32+
type NodeRemovedEventName = `${NetworkId}:NodeRemoved`
33+
type NodeRemovedEventType = (nodeId: NodeId) => void
34+
type NodeRemovedEvents = {
35+
[K in NodeRemovedEventName]: NodeRemovedEventType
36+
}
37+
38+
type NetworkEvents = ContentAddedEvents & NodeAddedEvents & NodeRemovedEvents
39+
40+
export interface PortalNetworkEvents extends NetworkEvents {
2441
Verified: (key: Uint8Array, verified: boolean) => void
2542
SendTalkReq: (nodeId: string, requestId: string, payload: string) => void
2643
SendTalkResp: (nodeId: string, requestId: string, payload: string) => void

packages/portalnetwork/src/networks/beacon/beacon.ts

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ export class BeaconNetwork extends BaseNetwork {
9090
this.logger = debug(this.enr.nodeId.slice(0, 5)).extend('Portal').extend('BeaconNetwork')
9191
this.routingTable.setLogger(this.logger)
9292
this.forkDigest = Uint8Array.from([0, 0, 0, 0])
93-
this.on('ContentAdded', async (contentKey: Uint8Array) => {
93+
this.portal.on(`${this.networkId}:ContentAdded`, async (contentKey: Uint8Array) => {
9494
if (contentKey[0] === BeaconNetworkContentType.LightClientUpdate) {
9595
// don't gossip individual LightClientUpdates since they aren't officially supported
9696
return
@@ -111,14 +111,14 @@ export class BeaconNetwork extends BaseNetwork {
111111
switch (this.syncStrategy) {
112112
case SyncStrategy.PollNetwork:
113113
this.bootstrapFinder = new Map()
114-
this.portal.on('NodeAdded', this.getBootStrapVote)
114+
this.portal.on(`${this.networkId}:NodeAdded`, this.getBootStrapVote)
115115
break
116116
case SyncStrategy.TrustedBlockRoot:
117117
if (trustedBlockRoot === undefined)
118118
throw new Error('must provided trusted block root with SyncStrategy.TrustedBlockRoot')
119119
this.bootstrapFinder = new Map()
120120
this.trustedBlockRoot = trustedBlockRoot
121-
this.portal.on('NodeAdded', this.getBootstrap)
121+
this.portal.on(`${this.networkId}:NodeAdded`, this.getBootstrap)
122122
break
123123
}
124124
}
@@ -130,9 +130,7 @@ export class BeaconNetwork extends BaseNetwork {
130130
* @param nodeId NodeId for a peer that was just discovered by the Portal Network `client`
131131
* @param network the network ID for the node just discovered
132132
*/
133-
private getBootstrap = async (nodeId: string, network: NetworkId) => {
134-
// We check the network ID because NodeAdded is emitted regardless of network
135-
if (network !== this.networkId) return
133+
private getBootstrap = async (nodeId: string) => {
136134
const enr = getENR(this.routingTable, nodeId)
137135
if (enr === undefined) return
138136
const decoded = await this.sendFindContent(
@@ -153,7 +151,7 @@ export class BeaconNetwork extends BaseNetwork {
153151
)
154152
if (headerHash === this.trustedBlockRoot) {
155153
void this.initializeLightClient(headerHash)
156-
this.portal.removeListener('NodeAdded', this.getBootstrap)
154+
this.portal.removeListener(`${this.networkId}:NodeAdded`, this.getBootstrap)
157155
}
158156
}
159157
}
@@ -168,9 +166,9 @@ export class BeaconNetwork extends BaseNetwork {
168166
* @param nodeId NodeId for a peer that was just discovered by the Portal Network `client`
169167
* @param network the network ID for the node just discovered
170168
*/
171-
private getBootStrapVote = async (nodeId: string, network: NetworkId) => {
169+
private getBootStrapVote = async (nodeId: string) => {
172170
try {
173-
if (network === this.networkId) {
171+
174172
// We check the network ID because NodeAdded is emitted regardless of network
175173
if (this.bootstrapFinder.has(nodeId)) {
176174
return
@@ -257,7 +255,7 @@ export class BeaconNetwork extends BaseNetwork {
257255
ssz[fork].LightClientBootstrap.deserialize(res.content.slice(4))
258256
this.logger.extend('BOOTSTRAP')(`found a valid bootstrap - ${results[x][0]}`)
259257
await this.store(bootstrapKey, res.content)
260-
this.portal.removeListener('NodeAdded', this.getBootStrapVote)
258+
this.portal.removeListener(`${this.networkId}:NodeAdded`, this.getBootStrapVote)
261259
this.logger.extend('BOOTSTRAP')('Terminating Light Client bootstrap process')
262260
await this.initializeLightClient(results[x][0])
263261
return
@@ -274,7 +272,7 @@ export class BeaconNetwork extends BaseNetwork {
274272
this.bootstrapFinder.set(peer, {})
275273
}
276274
}
277-
}
275+
278276
} catch (err) {
279277
this.logger.extend('BOOTSTRAP')(err)
280278
}
@@ -287,8 +285,8 @@ export class BeaconNetwork extends BaseNetwork {
287285
*/
288286
public initializeLightClient = async (blockRoot: string) => {
289287
// Ensure bootstrap finder mechanism is disabled if currently running
290-
this.portal.removeListener('NodeAdded', this.getBootStrapVote)
291-
this.portal.removeListener('NodeAdded', this.getBootstrap)
288+
this.portal.removeListener(`${this.networkId}:NodeAdded`, this.getBootStrapVote)
289+
this.portal.removeListener(`${this.networkId}:NodeAdded`, this.getBootstrap)
292290

293291
// Setup the Lodestar light client logger using our debug logger
294292
const lcLogger = this.logger.extend('LightClient')
@@ -473,7 +471,7 @@ export class BeaconNetwork extends BaseNetwork {
473471
this.logger.extend('FOUNDCONTENT')(`received uTP Connection ID ${id}`)
474472
response = await new Promise((resolve, _reject) => {
475473
// TODO: Figure out how to clear this listener
476-
this.on('ContentAdded', (contentKey: Uint8Array, value) => {
474+
this.portal.on(`${this.networkId}:ContentAdded`, (contentKey: Uint8Array, value) => {
477475
if (equalsBytes(contentKey, key) === true) {
478476
this.logger.extend('FOUNDCONTENT')(`received content for uTP Connection ID ${id}`)
479477
resolve({ content: value, utp: true })
@@ -739,7 +737,7 @@ export class BeaconNetwork extends BaseNetwork {
739737
this.logger(
740738
`storing ${BeaconNetworkContentType[contentType]} content corresponding to ${bytesToHex(contentKey)}`,
741739
)
742-
this.emit('ContentAdded', contentKey, value)
740+
this.portal.emit(`${this.networkId}:ContentAdded`, contentKey, value)
743741
}
744742

745743
/**

packages/portalnetwork/src/networks/beacon/ultralightTransport.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ export class UltralightTransport implements LightClientTransport {
227227
}
228228

229229
onOptimisticUpdate(handler: (optimisticUpdate: LightClientOptimisticUpdate) => void): void {
230-
this.network.on('ContentAdded', (contentKey: Uint8Array, content: Uint8Array) => {
230+
this.network.portal.on(`${this.network.networkId}:ContentAdded`, (contentKey: Uint8Array, content: Uint8Array) => {
231231
const contentType = contentKey[0]
232232
if (contentType === BeaconNetworkContentType.LightClientOptimisticUpdate) {
233233
const forkhash = content.slice(0, 4)
@@ -240,11 +240,14 @@ export class UltralightTransport implements LightClientTransport {
240240
this.logger('something went wrong trying to process Optimistic Update')
241241
this.logger(err)
242242
}
243-
}
244-
})
243+
}
244+
},
245+
)
245246
}
246247
onFinalityUpdate(handler: (finalityUpdate: LightClientFinalityUpdate) => void): void {
247-
this.network.on('ContentAdded', (contentKey: Uint8Array, content: Uint8Array) => {
248+
this.network.portal.on(
249+
`${this.network.networkId}:ContentAdded`,
250+
(contentKey: Uint8Array, content: Uint8Array) => {
248251
const contentType = contentKey[0]
249252
if (contentType === BeaconNetworkContentType.LightClientFinalityUpdate) {
250253
const forkhash = content.slice(0, 4)

packages/portalnetwork/src/networks/history/history.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ export class HistoryNetwork extends BaseNetwork {
355355
this.logger.extend('FOUNDCONTENT')(`received uTP Connection ID ${id}`)
356356
response = await new Promise((resolve, _reject) => {
357357
// TODO: Figure out how to clear this listener
358-
this.on('ContentAdded', (contentKey: Uint8Array, value) => {
358+
this.portal.on(`${this.networkId}:ContentAdded`, (contentKey: Uint8Array, value: Uint8Array) => {
359359
if (equalsBytes(contentKey, key) === true) {
360360
this.logger.extend('FOUNDCONTENT')(`received content for uTP Connection ID ${id}`)
361361
resolve({ content: value, utp: true })
@@ -646,7 +646,7 @@ export class HistoryNetwork extends BaseNetwork {
646646
}
647647
}
648648

649-
this.emit('ContentAdded', contentKey, value)
649+
this.portal.emit(`${this.networkId}:ContentAdded`, contentKey, value)
650650
if (this.routingTable.values().length > 0) {
651651
if (
652652
contentType !== HistoryNetworkContentType.EphemeralHeader &&
@@ -692,7 +692,7 @@ export class HistoryNetwork extends BaseNetwork {
692692
const bodyContentKey = getContentKey(HistoryNetworkContentType.BlockBody, hashKey)
693693
if (block instanceof Block) {
694694
await this.put(bodyContentKey, bytesToHex(bodyBytes))
695-
this.emit('ContentAdded', bodyContentKey, bodyBytes)
695+
this.portal.emit(`${this.networkId}:ContentAdded`, bodyContentKey, bodyBytes)
696696

697697
// TODO: Decide when and if to build and store receipts.
698698
// Doing this here caused a bottleneck when same receipt is gossiped via uTP at the same time.
@@ -703,7 +703,7 @@ export class HistoryNetwork extends BaseNetwork {
703703
this.logger('Could not verify block content')
704704
this.logger('Adding anyway for testing...')
705705
await this.put(bodyContentKey, bytesToHex(bodyBytes))
706-
this.emit('ContentAdded', bodyContentKey, bodyBytes)
706+
this.portal.emit(`${this.networkId}:ContentAdded`, bodyContentKey, bodyBytes)
707707
// TODO: Decide what to do here. We shouldn't be storing block bodies without a corresponding header
708708
// as it's against spec
709709
return

packages/portalnetwork/src/networks/network.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import {
1212
hexToBytes,
1313
randomBytes,
1414
} from '@ethereumjs/util'
15-
import { EventEmitter } from 'eventemitter3'
1615

1716
import type { ITalkReqMessage } from '@chainsafe/discv5/message'
1817
import type { SignableENR } from '@chainsafe/enr'
@@ -61,7 +60,8 @@ import { PingPongPayloadExtensions } from '../wire/payloadExtensions.js'
6160
import { AcceptCode, FoundContent } from '../wire/types.js'
6261
import { GossipManager } from './gossip.js'
6362
import { NetworkDB } from './networkDB.js'
64-
export abstract class BaseNetwork extends EventEmitter {
63+
64+
export abstract class BaseNetwork {
6565
public capabilities: number[] = [
6666
PingPongPayloadExtensions.CLIENT_INFO_RADIUS_AND_CAPABILITIES,
6767
PingPongPayloadExtensions.BASIC_RADIUS_PAYLOAD,
@@ -94,7 +94,6 @@ export abstract class BaseNetwork extends EventEmitter {
9494
gossipCount,
9595
dbSize,
9696
}: BaseNetworkConfig) {
97-
super()
9897
this.bridge = bridge ?? false
9998
this.networkId = networkId
10099
this.logger = client.logger.extend(this.constructor.name)
@@ -118,7 +117,7 @@ export abstract class BaseNetwork extends EventEmitter {
118117
}
119118
}
120119
this.gossipManager = new GossipManager(this, gossipCount)
121-
this.on('ContentAdded', () => {
120+
this.portal.on(`${this.networkId}:ContentAdded`, () => {
122121
if (this.db.approximateSize / MEGABYTE > this.maxStorage) {
123122
this.logger(
124123
`Pruning due to size limit ${this.db.approximateSize / MEGABYTE} > ${this.maxStorage}`,
@@ -1045,14 +1044,14 @@ export abstract class BaseNetwork extends EventEmitter {
10451044
this.logger.extend('RoutingTable')(`adding ${shortId(nodeId)}`)
10461045
this.routingTable.insertOrUpdate(enr, EntryStatus.Connected)
10471046

1048-
this.portal.emit('NodeAdded', enr.nodeId, this.networkId)
1047+
this.portal.emit(`${this.networkId}:NodeAdded`, enr.nodeId)
10491048
} catch (err) {
10501049
this.logger(`Something went wrong: ${(err as any).message}`)
10511050
try {
10521051
this.routingTable.getWithPending(enr as any)?.value === undefined &&
10531052
this.logger(`adding ${enr as any} to ${this.networkName} routing table`)
10541053
this.routingTable.insertOrUpdate(enr, EntryStatus.Connected)
1055-
this.portal.emit('NodeAdded', enr.nodeId, this.networkId)
1054+
this.portal.emit(`${this.networkId}:NodeAdded`, enr.nodeId)
10561055
} catch (e) {
10571056
this.logger(`Something went wrong : ${(e as any).message}`)
10581057
}

packages/portalnetwork/src/networks/state/state.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ export class StateNetwork extends BaseNetwork {
111111
this.logger.extend('FOUNDCONTENT')(`received uTP Connection ID ${id}`)
112112
response = await new Promise((resolve, _reject) => {
113113
// TODO: Figure out how to clear this listener
114-
this.on('ContentAdded', (contentKey: Uint8Array, value) => {
114+
this.portal.on(`${this.networkId}:ContentAdded`, (contentKey, value) => {
115115
if (equalsBytes(contentKey, key) === true) {
116116
this.logger.extend('FOUNDCONTENT')(`received content for uTP Connection ID ${id}`)
117117
resolve({ content: value, utp: true })
@@ -192,7 +192,7 @@ export class StateNetwork extends BaseNetwork {
192192
await this.db.put(contentKey, content)
193193
}
194194
this.logger(`content added for: ${bytesToHex(contentKey)}`)
195-
this.emit('ContentAdded', contentKey, content)
195+
this.portal.emit(`${this.networkId}:ContentAdded`, contentKey, content)
196196
this.gossipManager.add(contentKey)
197197
} catch (err: any) {
198198
this.logger(`Error storing content: ${err.message}`)
@@ -257,7 +257,7 @@ export class StateNetwork extends BaseNetwork {
257257
}
258258
for (const { contentKey, dbContent } of interested) {
259259
await this.db.put(contentKey, dbContent)
260-
this.emit('ContentAdded', contentKey, dbContent)
260+
this.portal.emit(`${this.networkId}:ContentAdded`, contentKey, dbContent)
261261
}
262262
return { interested, notInterested }
263263
}
@@ -281,7 +281,7 @@ export class StateNetwork extends BaseNetwork {
281281
node: curRlp,
282282
})
283283
await this.db.put(contentKey, dbContent)
284-
this.emit('ContentAdded', contentKey, dbContent)
284+
this.portal.emit(`${this.networkId}:ContentAdded`, contentKey, dbContent)
285285
}
286286

287287
async storeStorageTrieNode(contentKey: Uint8Array, content: Uint8Array) {
@@ -291,7 +291,7 @@ export class StateNetwork extends BaseNetwork {
291291
node: curRlp,
292292
})
293293
await this.db.put(contentKey, dbContent)
294-
this.emit('ContentAdded', contentKey, dbContent)
294+
this.portal.emit(`${this.networkId}:ContentAdded`, contentKey, dbContent)
295295
}
296296

297297
async receiveStorageTrieNodeOffer(
@@ -362,7 +362,7 @@ export class StateNetwork extends BaseNetwork {
362362
}
363363
for (const { contentKey, dbContent } of interested) {
364364
await this.db.put(contentKey, dbContent)
365-
this.emit('ContentAdded', contentKey, dbContent)
365+
this.portal.emit(`${this.networkId}:ContentAdded`, contentKey, dbContent)
366366
}
367367
return { interested, notInterested }
368368
}
@@ -394,7 +394,7 @@ export class StateNetwork extends BaseNetwork {
394394
const codeContent = ContractRetrieval.serialize({ code })
395395
this.manager.trie.db.local.set(bytesToUnprefixedHex(codeHash), bytesToHex(contentKey))
396396
await this.db.put(contentKey, codeContent)
397-
this.emit('ContentAdded', contentKey, codeContent)
397+
this.portal.emit(`${this.networkId}:ContentAdded`, contentKey, codeContent)
398398
await this.receiveAccountTrieNodeOffer(
399399
...extractAccountProof(addressHash, accountProof, blockHash),
400400
)

packages/portalnetwork/test/integration/beacon.spec.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ describe('OFFER/ACCEPT tests', () => {
343343
)
344344

345345
await new Promise((resolve) => {
346-
network2.on('ContentAdded', (contentKey: Uint8Array) => {
346+
node2.on(`${network2.networkId}:ContentAdded`, (contentKey: Uint8Array) => {
347347
const contentType = contentKey[0]
348348
if (contentType === BeaconNetworkContentType.LightClientOptimisticUpdate)
349349
// Update the light client stub to report the new "optimistic head"
@@ -522,7 +522,7 @@ describe('OFFER/ACCEPT tests', () => {
522522
await network1.sendOffer(network2.enr.toENR(), [bootstrapKey])
523523

524524
await new Promise((resolve) => {
525-
network2.on('ContentAdded', (key: Uint8Array) => {
525+
node2.on(`${network2.networkId}:ContentAdded`, (key: Uint8Array) => {
526526
assert.deepEqual(key, bootstrapKey, 'successfully gossipped bootstrap')
527527
resolve(undefined)
528528
})
@@ -753,7 +753,7 @@ describe('beacon light client sync tests', () => {
753753
)
754754

755755
await new Promise((resolve) => {
756-
network2.portal.on('NodeAdded', (_nodeId) => {
756+
node2.on(`${network2.networkId}:NodeAdded`, (_nodeId) => {
757757
if (network2['bootstrapFinder'].values.length > 0) {
758758
resolve('undefined)')
759759
}

packages/portalnetwork/test/integration/ephemeralHeaders.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ describe('should offer headers to peers', () => {
232232
await node1.start()
233233
await node2.start()
234234
const history1 = node1.network()['0x500b']
235-
const history2 = node2.network()['0x500b']
235+
const history2 = node2.network()['0x500b']!
236236
await history2?.sendPing(node1.discv5.enr.toENR())
237237
await node1
238238
.network()
@@ -263,7 +263,7 @@ describe('should offer headers to peers', () => {
263263
)
264264
await new Promise((resolve) => {
265265
let count = 0
266-
history2?.on('ContentAdded', async (key, value) => {
266+
node2.on(`${history2.networkId}:ContentAdded`, async (key, value) => {
267267
count++
268268
if (count === 3) {
269269
const header = await history2.get(getEphemeralHeaderDbKey(headers[2].hash()))

packages/portalnetwork/test/integration/integration.spec.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,10 @@ describe('gossip test', async () => {
9494
// Fancy workaround to allow us to "await" an event firing as expected following this - https://github.com/ljharb/tape/pull/503#issuecomment-619358911
9595
const end = new EventEmitter()
9696
const addedHeaders: [string, string][] = []
97-
network2.on('ContentAdded', async (key: Uint8Array, content: Uint8Array) => {
97+
node2.on(`${network2.networkId}:ContentAdded`, async (key: Uint8Array, content: Uint8Array) => {
9898
network2.logger.extend('ContentAdded')(`Added Content for ${bytesToHex(key)}`)
9999
addedHeaders.push([bytesToHex(key), bytesToHex(content)])
100100
if (addedHeaders.length === headersWithProofs.length) {
101-
node2.removeAllListeners()
102101
void node1.stop()
103102
void node2.stop()
104103
end.emit('end()')
@@ -218,7 +217,6 @@ describe('FindContent', async () => {
218217
setHardfork: true,
219218
})
220219

221-
node2.removeAllListeners()
222220
void node1.stop()
223221
void node2.stop()
224222
it('should find content', () => {

0 commit comments

Comments
 (0)