Skip to content

Commit

Permalink
uTP: use handlePacket error content to choose response
Browse files Browse the repository at this point in the history
  • Loading branch information
ScottyPoi committed Jan 17, 2025
1 parent 1df7d31 commit be04907
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 18 deletions.
33 changes: 26 additions & 7 deletions packages/portalnetwork/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,11 @@ export class PortalNetwork extends EventEmitter<PortalNetworkEvents> {
) => {
this.metrics?.totalBytesReceived.inc(message.request.length)
if (bytesToHex(message.protocol) === NetworkId.UTPNetwork) {
if (!this.discv5.findEnr(nodeAddress.nodeId)) {
this.logger.extend('TalkReq').extend('error')(`Received uTP packet from unknown node: ${nodeAddress.nodeId}. Adding to blacklist.`)
this.addToBlackList(nodeAddress.socketAddr)
return
}
await this.handleUTP(nodeAddress, message, message.request)
return
}
Expand All @@ -416,13 +421,18 @@ export class PortalNetwork extends EventEmitter<PortalNetworkEvents> {
* @param packetBuffer uTP packet encoded to Buffer
*/
private handleUTP = async (src: INodeAddress, msg: ITalkReqMessage, packetBuffer: Buffer) => {
if (this.uTP.requestManagers[src.nodeId] === undefined) {
this.logger.extend('handleUTP').extend('error')(`Received uTP packet from peer with no uTP stream history: ${src.nodeId}. Blacklisting peer.`)
this.addToBlackList(src.socketAddr)
return
}
await this.sendPortalNetworkResponse(src, msg.id, new Uint8Array())
try {
await this.uTP.handleUtpPacket(packetBuffer, src.nodeId)
await this.uTP.handleUtpPacket(packetBuffer, src)
} catch (err: any) {
this.logger.extend('error')(

`handleUTP error: ${err.message}. SrcId: ${src.nodeId
`handleUTP error: ${err.message}. SrcId: ${
src.nodeId
} MultiAddr: ${src.socketAddr.toString()}`,
)
}
Expand All @@ -442,7 +452,10 @@ export class PortalNetwork extends EventEmitter<PortalNetworkEvents> {
utpMessage?: boolean,
): Promise<Uint8Array> => {
const messageNetwork = utpMessage !== undefined ? NetworkId.UTPNetwork : networkId
const remote = enr instanceof ENR ? enr : this.discv5.findEnr(enr.nodeId) ?? fromNodeAddress(enr.socketAddr.nodeAddress(), 'udp')
const remote =
enr instanceof ENR
? enr
: (this.discv5.findEnr(enr.nodeId) ?? fromNodeAddress(enr.socketAddr.nodeAddress(), 'udp'))
try {
this.metrics?.totalBytesSent.inc(payload.length)
const res = await this.discv5.sendTalkReq(
Expand All @@ -454,10 +467,14 @@ export class PortalNetwork extends EventEmitter<PortalNetworkEvents> {
return res
} catch (err: any) {
if (networkId === NetworkId.UTPNetwork || utpMessage === true) {
throw new Error(`Error sending uTP TALKREQ message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err.message}`)
throw new Error(
`Error sending uTP TALKREQ message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err.message}`,
)
} else {
const messageType = PortalWireMessageType.deserialize(payload).selector
throw new Error(`Error sending TALKREQ ${MessageCodes[messageType]} message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err}. NetworkId: ${networkId} NodeId: ${enr.nodeId} MultiAddr: ${enr instanceof ENR ? enr.getLocationMultiaddr('udp')?.toString() : enr.socketAddr.toString()}`)
throw new Error(
`Error sending TALKREQ ${MessageCodes[messageType]} message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err}. NetworkId: ${networkId} NodeId: ${enr.nodeId} MultiAddr: ${enr instanceof ENR ? enr.getLocationMultiaddr('udp')?.toString() : enr.socketAddr.toString()}`,
)
}
}
}
Expand All @@ -472,7 +489,9 @@ export class PortalNetwork extends EventEmitter<PortalNetworkEvents> {
try {
await this.discv5.sendTalkResp(src, requestId, payload)
} catch (err: any) {
this.logger.extend('error')(`Error sending TALKRESP message: ${err}. SrcId: ${src.nodeId} MultiAddr: ${src.socketAddr.toString()}`)
this.logger.extend('error')(
`Error sending TALKRESP message: ${err}. SrcId: ${src.nodeId} MultiAddr: ${src.socketAddr.toString()}`,
)
}
}

Expand Down
56 changes: 45 additions & 11 deletions packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
import type {
ContentRequestType,
INewRequest,
INodeAddress,
PortalNetwork,
} from '../../../index.js'
import {
NetworkId,
Packet,
PacketType,
RequestCode,
UtpSocketType,
createContentRequest,
Expand All @@ -8,13 +17,6 @@ import { createUtpSocket } from '../Socket/index.js'

import type { ENR } from '@chainsafe/enr'
import type { Debugger } from 'debug'
import type {
ContentRequestType,
INewRequest,
INodeAddress,
NetworkId,
PortalNetwork,
} from '../../../index.js'
import type { SocketType } from '../Socket/index.js'
import { RequestManager } from './requestManager.js'

Expand Down Expand Up @@ -118,11 +120,43 @@ export class PortalNetworkUTP {
return newRequest
}

async handleUtpPacket(packetBuffer: Buffer, srcId: string): Promise<void> {
if (this.requestManagers[srcId] === undefined) {
throw new Error(`No request manager for ${srcId}`)
async handleUtpPacket(packetBuffer: Buffer, srcId: INodeAddress): Promise<void> {
if (this.requestManagers[srcId.nodeId] === undefined) {
throw new Error(`No request manager for ${srcId.nodeId}`)
}
try {
await this.requestManagers[srcId.nodeId].handlePacket(packetBuffer)
} catch (err: any) {
switch (err.message) {
case `REQUEST_CLOSED`: {
// Packet arrived after request was closed. Send RESET packet to peer.
const packet = Packet.fromBuffer(packetBuffer)
const resetPacket = new Packet({
header: {
connectionId: packet.header.connectionId,
pType: PacketType.ST_RESET,
ackNr: 0,
extension: 0,
version: 0,
timestampMicroseconds: 0,
timestampDifferenceMicroseconds: 0,
seqNr: 0,
wndSize: 0,
},
})
await this.send(srcId, resetPacket.encode(), NetworkId.UTPNetwork)
break
}
case `REQUEST_NOT_FOUND`: {
// Packet arrived for non-existent request. Treat as spam and blacklist peer.
this.client.addToBlackList(srcId.socketAddr)
break
}
default: {
throw err
}
}
}
await this.requestManagers[srcId].handlePacket(packetBuffer)
}

async send(enr: ENR | INodeAddress, msg: Buffer, networkId: NetworkId) {
Expand Down

0 comments on commit be04907

Please sign in to comment.