Skip to content

Commit 626d09a

Browse files
committed
Merge remote-tracking branch 'origin/master' into post-capella-block-verification
2 parents 7634601 + 208409a commit 626d09a

File tree

9 files changed

+182
-24
lines changed

9 files changed

+182
-24
lines changed

packages/cli/src/rpc/modules/portal.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -804,15 +804,15 @@ export class portal {
804804
}
805805
async statePutContent(params: [string, string]) {
806806
const [contentKey, content] = params.map((param) => hexToBytes(param))
807-
const contentId = this._history.contentKeyToId(contentKey)
807+
const contentId = this._state.contentKeyToId(contentKey)
808808
const d = distance(contentId, this._client.discv5.enr.nodeId)
809809
let storedLocally = false
810810
try {
811-
if (d <= this._history.nodeRadius) {
812-
await this._history.store(contentKey, content)
811+
if (d <= this._state.nodeRadius) {
812+
await this._state.store(contentKey, content)
813813
storedLocally = true
814814
}
815-
const peerCount = await this._history.gossipContent(contentKey, content)
815+
const peerCount = await this._state.gossipContent(contentKey, content)
816816
return {
817817
peerCount,
818818
storedLocally,
@@ -826,14 +826,19 @@ export class portal {
826826
}
827827
async beaconPutContent(params: [string, string]) {
828828
const [contentKey, content] = params.map((param) => hexToBytes(param))
829-
const contentId = this._history.contentKeyToId(contentKey)
829+
const contentId = this._beacon.contentKeyToId(contentKey)
830830
const d = distance(contentId, this._client.discv5.enr.nodeId)
831831
let storedLocally = false
832832
try {
833-
if (d <= this._history.nodeRadius) {
834-
await this._history.store(contentKey, content)
833+
if (d <= this._beacon.nodeRadius) {
834+
await this._beacon.store(contentKey, content)
835835
storedLocally = true
836836
}
837+
const peerCount = await this._beacon.gossipContent(contentKey, content)
838+
return {
839+
peerCount,
840+
storedLocally,
841+
}
837842
} catch {
838843
return {
839844
peerCount: 0,
@@ -969,7 +974,6 @@ export class portal {
969974
this.logger.extend('stateGetContent')(`request received for ${contentKey}`)
970975
const lookup = new ContentLookup(this._state, hexToBytes(contentKey))
971976
const res = await lookup.startLookup()
972-
this.logger.extend('stateGetContent')(`request returned ${JSON.stringify(res)}`)
973977
if (!res) {
974978
this.logger.extend('stateGetContent')(`request returned { enrs: [] }`)
975979
throw new Error('No content found')

packages/portalnetwork/src/client/client.ts

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import { EventEmitter } from 'eventemitter3'
2-
import { Discv5 } from '@chainsafe/discv5'
2+
import { Discv5, UDPTransportService } from '@chainsafe/discv5'
33
import { ENR, SignableENR } from '@chainsafe/enr'
44
import { bytesToHex, hexToBytes } from '@ethereumjs/util'
55
import { keys } from '@libp2p/crypto'
6+
import type { Multiaddr } from '@multiformats/multiaddr'
67
import { fromNodeAddress, multiaddr } from '@multiformats/multiaddr'
78
import debug from 'debug'
89

@@ -32,6 +33,7 @@ import type {
3233
PortalNetworkOpts,
3334
} from './types.js'
3435
import { MessageCodes, PortalWireMessageType } from '../wire/types.js'
36+
import { RateLimiter } from '../transports/rateLimiter.js'
3537

3638
export class PortalNetwork extends EventEmitter<PortalNetworkEvents> {
3739
eventLog: boolean
@@ -125,13 +127,18 @@ export class PortalNetwork extends EventEmitter<PortalNetworkEvents> {
125127
switch (opts.transport) {
126128
case TransportLayer.WEB: {
127129
opts.proxyAddress = opts.proxyAddress ?? 'ws://127.0.0.1:5050'
128-
config.transport = new WebSocketTransportService(ma, config.enr.nodeId, opts.proxyAddress)
130+
config.transport = new WebSocketTransportService(ma, config.enr.nodeId, opts.proxyAddress, new RateLimiter())
129131
break
130132
}
131133
case TransportLayer.MOBILE:
132134
config.transport = new CapacitorUDPTransportService(ma, config.enr.nodeId)
133135
break
134136
case TransportLayer.NODE:
137+
config.transport = new UDPTransportService({
138+
bindAddrs: config.bindAddrs,
139+
nodeId: config.enr.nodeId,
140+
rateLimiter: new RateLimiter(),
141+
})
135142
break
136143
}
137144

@@ -468,4 +475,22 @@ export class PortalNetwork extends EventEmitter<PortalNetworkEvents> {
468475
this.logger.extend('error')(`Error sending TALKRESP message: ${err}. SrcId: ${src.nodeId} MultiAddr: ${src.socketAddr.toString()}`)
469476
}
470477
}
478+
479+
public addToBlackList = (ma: Multiaddr) => {
480+
(<RateLimiter>(
481+
(<any>this.discv5.sessionService.transport)['rateLimiter']
482+
)).addToBlackList(ma.nodeAddress().address)
483+
}
484+
485+
public isBlackListed = (ma: Multiaddr) => {
486+
return (<RateLimiter>(
487+
(<any>this.discv5.sessionService.transport)['rateLimiter']
488+
)).isBlackListed(ma.nodeAddress().address)
489+
}
490+
491+
public removeFromBlackList = (ma: Multiaddr) => {
492+
(<RateLimiter>(
493+
(<any>this.discv5.sessionService.transport)['rateLimiter']
494+
)).removeFromBlackList(ma.nodeAddress().address)
495+
}
471496
}

packages/portalnetwork/src/networks/beacon/beacon.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -391,10 +391,10 @@ export class BeaconLightClientNetwork extends BaseNetwork {
391391
)
392392
if (value !== undefined) {
393393
const decoded = hexToBytes(value)
394-
// const forkhash = decoded.slice(0, 4) as Uint8Array
395-
// const forkname = this.beaconConfig.forkDigest2ForkName(forkhash) as LightClientForkName
394+
const forkHash = decoded.slice(0, 4) as Uint8Array
395+
const forkName = this.beaconConfig.forkDigest2ForkName(forkHash) as LightClientForkName
396396
if (
397-
ssz[ForkName.capella].LightClientFinalityUpdate.deserialize(decoded.slice(4))
397+
ssz[ForkName[forkName]].LightClientFinalityUpdate.deserialize(decoded.slice(4))
398398
.finalizedHeader.beacon.slot < Number(key.finalitySlot)
399399
) {
400400
// If what we have stored locally is older than the finality update requested, don't send it
@@ -732,9 +732,9 @@ export class BeaconLightClientNetwork extends BaseNetwork {
732732
if (typeof input === 'number') {
733733
period = input
734734
} else {
735-
// const forkhash = input.slice(0, 4) as Uint8Array
736-
// const forkname = this.beaconConfig.forkDigest2ForkName(forkhash) as LightClientForkName
737-
const deserializedUpdate = ssz[ForkName.capella].LightClientUpdate.deserialize(
735+
const forkhash = input.slice(0, 4) as Uint8Array
736+
const forkName = this.beaconConfig.forkDigest2ForkName(forkhash) as LightClientForkName
737+
const deserializedUpdate = ssz[ForkName[forkName]].LightClientUpdate.deserialize(
738738
input.slice(4),
739739
) as LightClientUpdate
740740
period = computeSyncPeriodAtSlot(deserializedUpdate.attestedHeader.beacon.slot)

packages/portalnetwork/src/transports/capacitorUdp.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { decodePacket, encodePacket } from '@chainsafe/discv5/packet'
44
import { UDP } from '@frontall/capacitor-udp'
55
import { multiaddr as ma } from '@multiformats/multiaddr'
66

7+
import type { IRateLimiter } from './rateLimiter.js'
78
import type { SocketAddress } from '@chainsafe/discv5'
89
import type { IPacket } from '@chainsafe/discv5/packet'
910
import type {
@@ -33,11 +34,13 @@ export class CapacitorUDPTransportService
3334
ip4: true,
3435
ip6: false,
3536
}
36-
public constructor(multiaddr: Multiaddr, srcId: string) {
37+
private rateLimiter?: IRateLimiter
38+
public constructor(multiaddr: Multiaddr, srcId: string, rateLimiter?: IRateLimiter) {
3739
//eslint-disable-next-line constructor-super
3840
super()
3941
this.bindAddrs = [multiaddr]
4042
this.srcId = srcId
43+
this.rateLimiter = rateLimiter
4144
}
4245

4346
public async start(): Promise<void> {
@@ -74,6 +77,9 @@ export class CapacitorUDPTransportService
7477
}
7578

7679
public handleIncoming = (data: Uint8Array, rinfo: IRemoteInfo): void => {
80+
if (this.rateLimiter && !this.rateLimiter.allowEncodedPacket(rinfo.address)) {
81+
return
82+
}
7783
const multiaddr = ma(
7884
`/${rinfo.family === 'IPv4' ? 'ip4' : 'ip6'}/${rinfo.address}/udp/${rinfo.port}`,
7985
)
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
type IPAddress = string
2+
3+
export interface IRateLimiter {
4+
allowEncodedPacket(ip: IPAddress): boolean
5+
addExpectedResponse(ip: IPAddress): void
6+
removeExpectedResponse(ip: IPAddress): void
7+
}
8+
9+
const DEFAULT_BLACKLIST_DURATION = 1000 * 60 * 60 * 24 // 24 hours
10+
11+
export class RateLimiter implements IRateLimiter {
12+
private blackListed: Set<IPAddress> = new Set<IPAddress>()
13+
private blackListTimeouts: Map<IPAddress, ReturnType<typeof setTimeout>> = new Map<
14+
IPAddress,
15+
ReturnType<typeof setTimeout>
16+
>()
17+
constructor() {}
18+
19+
public allowEncodedPacket(ip: IPAddress): boolean {
20+
return !this.blackListed.has(ip)
21+
}
22+
23+
public addExpectedResponse(_ip: IPAddress): void {
24+
return
25+
}
26+
27+
public removeExpectedResponse(_ip: IPAddress): void {
28+
return
29+
}
30+
31+
public addToBlackList(ip: IPAddress): void {
32+
this.blackListed.add(ip)
33+
this.blackListTimeouts.set(
34+
ip,
35+
setTimeout(() => {
36+
this.blackListed.delete(ip)
37+
this.blackListTimeouts.delete(ip)
38+
}, DEFAULT_BLACKLIST_DURATION),
39+
)
40+
}
41+
42+
public removeFromBlackList(ip: IPAddress): void {
43+
clearTimeout(this.blackListTimeouts.get(ip))
44+
this.blackListTimeouts.delete(ip)
45+
this.blackListed.delete(ip)
46+
}
47+
48+
public isBlackListed(ip: IPAddress): boolean {
49+
return this.blackListed.has(ip)
50+
}
51+
}

packages/portalnetwork/src/transports/websockets.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import type { ENR } from '@chainsafe/enr'
1818
import type { Multiaddr } from '@multiformats/multiaddr'
1919
import type { Debugger } from 'debug'
2020
import type StrictEventEmitter from 'strict-event-emitter-types/types/src'
21-
21+
import type { IRateLimiter } from './rateLimiter.js'
2222
const log = debug('discv5:transport')
2323

2424
interface WebSocketTransportEvents extends ITransportEvents {
@@ -40,12 +40,13 @@ export class WebSocketTransportService
4040
private socket: WebSocketAsPromised
4141
private srcId: string
4242
private log: Debugger
43+
private rateLimiter?: IRateLimiter
4344
ipMode: IPMode = {
4445
ip4: true,
4546
ip6: false,
4647
}
4748
bindAddrs: Multiaddr[] = []
48-
public constructor(multiaddr: Multiaddr, srcId: string, proxyAddress: string) {
49+
public constructor(multiaddr: Multiaddr, srcId: string, proxyAddress: string, rateLimiter?: IRateLimiter) {
4950
//eslint-disable-next-line constructor-super
5051
super()
5152
this.log = debug('Portal').extend('WebSocketTransportService')
@@ -58,6 +59,7 @@ export class WebSocketTransportService
5859
createWebSocket: (url) => new WebSocket(url),
5960
extractMessageData: (event) => event,
6061
})
62+
this.rateLimiter = rateLimiter
6163
}
6264

6365
public async start(): Promise<void> {
@@ -108,6 +110,9 @@ export class WebSocketTransportService
108110
const rinfo = JSON.parse(
109111
new TextDecoder().decode(data.slice(2, rinfoLength + 2)),
110112
) as IRemoteInfo
113+
if (this.rateLimiter && !this.rateLimiter.allowEncodedPacket(rinfo.address)) {
114+
return;
115+
}
111116
const multiaddr = ma(
112117
`/${rinfo.family === 'IPv4' ? 'ip4' : 'ip6'}/${rinfo.address}/udp/${rinfo.port}`,
113118
)

packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import { Heap } from "heap-js";
66
import { MAX_IN_FLIGHT_PACKETS, type RequestId } from "./types.js";
77

88
const packetComparator: Comparator<Packet<PacketType>> = (a: Packet<PacketType>, b: Packet<PacketType>) => {
9-
// If packets belong to the same connection, sort by sequence number
9+
// If packets belong to the same connection, sort by sequence number (or ackNr for ST_STATE packets)
1010
if (a.header.connectionId === b.header.connectionId) {
11-
return a.header.seqNr - b.header.seqNr;
11+
return a.header.pType === PacketType.ST_STATE ? a.header.ackNr - b.header.ackNr : a.header.seqNr - b.header.seqNr;
1212
}
1313
// Otherwise, sort by timestamp
1414
return a.header.timestampMicroseconds - b.header.timestampMicroseconds;
@@ -90,7 +90,7 @@ export class RequestManager {
9090
} else {
9191
this.packetHeap.push(packet)
9292
}
93-
this.logger.extend('HANDLE_PACKET')(`Adding ${PacketType[packet.header.pType]} [${packet.header.seqNr}] for Req:${packet.header.connectionId} to queue (size: ${this.packetHeap.size()} packets)`)
93+
this.logger.extend('HANDLE_PACKET')(`Adding ${PacketType[packet.header.pType]} [${packet.header.pType === PacketType.ST_STATE ? packet.header.ackNr : packet.header.seqNr}] for Req:${packet.header.connectionId} to queue (size: ${this.packetHeap.size()} packets)`)
9494
if (this.currentPacket === undefined) {
9595
this.currentPacket = this.packetHeap.pop()
9696
await this.processCurrentPacket()
@@ -108,7 +108,7 @@ export class RequestManager {
108108
await this.processCurrentPacket()
109109
return
110110
}
111-
this.logger.extend('PROCESS_CURRENT_PACKET')(`Processing ${PacketType[this.currentPacket.header.pType]} [${this.currentPacket.header.seqNr}] for Req:${this.currentPacket.header.connectionId}`)
111+
this.logger.extend('PROCESS_CURRENT_PACKET')(`Processing ${PacketType[this.currentPacket.header.pType]} [${this.currentPacket.header.pType === PacketType.ST_STATE ? this.currentPacket.header.ackNr : this.currentPacket.header.seqNr}] for Req:${this.currentPacket.header.connectionId}`)
112112
const request = this.lookupRequest(this.currentPacket.header.connectionId)
113113
if (request === undefined) {
114114
this.logger.extend('PROCESS_CURRENT_PACKET')(`Request not found for current packet - connectionId: ${this.currentPacket.header.connectionId}`)

packages/portalnetwork/src/wire/utp/Socket/WriteSocket.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ export class WriteSocket extends UtpSocket {
7171
this._clearTimeout()
7272
}
7373
logProgress() {
74-
const needed = this.writer!.dataChunks.filter((n) => !this.ackNrs.includes(n[0]))
74+
const needed = this.writer!.dataChunks.filter((n) => !this.ackNrs.includes(n[0])).map((n) => n[0])
7575
this.logger(
7676
`AckNr's received (${this.ackNrs.length}/${
7777
this.writer!.sentChunks.length
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import { SignableENR } from "@chainsafe/enr"
2+
import { keys } from "@libp2p/crypto"
3+
import { multiaddr } from "@multiformats/multiaddr"
4+
import { hexToBytes } from "ethereum-cryptography/utils"
5+
import { assert, describe, it } from "vitest"
6+
import type { HistoryNetwork } from "../../src";
7+
import { NetworkId, PortalNetwork, TransportLayer } from "../../src/index.js"
8+
const privateKeys = [
9+
'0x0a2700250802122102273097673a2948af93317235d2f02ad9cf3b79a34eeb37720c5f19e09f11783c12250802122102273097673a2948af93317235d2f02ad9cf3b79a34eeb37720c5f19e09f11783c1a2408021220aae0fff4ac28fdcdf14ee8ecb591c7f1bc78651206d86afe16479a63d9cb73bd',
10+
'0x0a27002508021221039909a8a7e81dbdc867480f0eeb7468189d1e7a1dd7ee8a13ee486c8cbd743764122508021221039909a8a7e81dbdc867480f0eeb7468189d1e7a1dd7ee8a13ee486c8cbd7437641a2408021220c6eb3ae347433e8cfe7a0a195cc17fc8afcd478b9fb74be56d13bccc67813130',
11+
]
12+
const pk1 = keys.privateKeyFromProtobuf(hexToBytes(privateKeys[0]).slice(-36))
13+
const enr1 = SignableENR.createFromPrivateKey(pk1)
14+
const pk2 = keys.privateKeyFromProtobuf(hexToBytes(privateKeys[1]).slice(-36))
15+
const enr2 = SignableENR.createFromPrivateKey(pk2)
16+
17+
describe('black list test', async () => {
18+
const initMa: any = multiaddr(`/ip4/127.0.0.1/udp/5030`)
19+
enr1.setLocationMultiaddr(initMa)
20+
const initMa2: any = multiaddr(`/ip4/127.0.0.1/udp/5031`)
21+
enr2.setLocationMultiaddr(initMa2)
22+
const node1 = await PortalNetwork.create({
23+
transport: TransportLayer.NODE,
24+
supportedNetworks: [{ networkId: NetworkId.HistoryNetwork }],
25+
config: {
26+
enr: enr1,
27+
bindAddrs: {
28+
ip4: initMa,
29+
},
30+
privateKey: pk1,
31+
},
32+
})
33+
const node2 = await PortalNetwork.create({
34+
transport: TransportLayer.NODE,
35+
supportedNetworks: [{ networkId: NetworkId.HistoryNetwork }],
36+
config: {
37+
enr: enr2,
38+
bindAddrs: {
39+
ip4: initMa2,
40+
},
41+
privateKey: pk2,
42+
},
43+
})
44+
45+
await node1.start()
46+
await node2.start()
47+
const network1 = node1.networks.get(NetworkId.HistoryNetwork) as HistoryNetwork
48+
const network2 = node2.networks.get(NetworkId.HistoryNetwork) as HistoryNetwork
49+
const pong = await network1?.sendPing(network2?.enr!.toENR())
50+
it('should ping peer', () => {
51+
assert.isDefined(pong)
52+
})
53+
node1.addToBlackList(node2.discv5.enr.getLocationMultiaddr('udp')!)
54+
it('should blacklist peer', () => {
55+
assert.isTrue(node1.isBlackListed(node2.discv5.enr.getLocationMultiaddr('udp')!))
56+
})
57+
const pong2 = await network2?.sendPing(network1?.enr!.toENR())
58+
it('blacklisted peer should not be able to ping', () => {
59+
assert.isUndefined(pong2)
60+
})
61+
it('should remove peer from blacklist', async () => {
62+
node1.removeFromBlackList(node2.discv5.enr.getLocationMultiaddr('udp')!)
63+
assert.isFalse(node1.isBlackListed(node2.discv5.enr.getLocationMultiaddr('udp')!))
64+
const pong3 = await network2?.sendPing(network1?.enr!.toENR())
65+
assert.isDefined(pong3)
66+
})
67+
})

0 commit comments

Comments
 (0)