From 1df7d316cc62f49822ab182ff39a07d8e7a78015 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Fri, 17 Jan 2025 09:53:44 -0700 Subject: [PATCH 1/6] uTP: mark requests as closed instead of deleting from requestMap --- .../wire/utp/PortalNetworkUtp/requestManager.ts | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts index ce06d5c87..cbed28e0f 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts @@ -15,7 +15,7 @@ const packetComparator: Comparator> = (a: Packet, } export class RequestManager { peerId: string - requestMap: Record + requestMap: Record logger: Debugger packetHeap: Heap> currentPacket: Packet | undefined @@ -34,8 +34,9 @@ export class RequestManager { * @param connectionId connectionId field from incoming packet header * @returns corresponding requestId */ - lookupRequest(connectionId: number): ContentRequest | undefined { - return this.requestMap[connectionId] ?? this.requestMap[connectionId - 1] ?? this.requestMap[connectionId + 1] + lookupRequest(connectionId: number): ContentRequest | 'closed' | undefined { + const request = this.requestMap[connectionId] ?? this.requestMap[connectionId - 1] ?? this.requestMap[connectionId + 1] + return request } /** @@ -80,9 +81,13 @@ export class RequestManager { async handlePacket(packetBuffer: Buffer) { const packet = Packet.fromBuffer(packetBuffer) const request = this.lookupRequest(packet.header.connectionId) + if (request === 'closed') { + this.logger.extend('HANDLE_PACKET')(`Request closed - connectionId: ${packet.header.connectionId}. Sending RESET to peer.`) + throw new Error(`REQUEST_CLOSED`) + } if (request === undefined) { - this.logger.extend('HANDLE_PACKET')(`Request not found for packet - connectionId: ${packet.header.connectionId}`) - return + this.logger.extend('HANDLE_PACKET')(`Request not found for packet - connectionId: ${packet.header.connectionId}. Blacklisting peer.`) + throw new Error(`REQUEST_NOT_FOUND`) } if (packet.header.pType === PacketType.ST_SYN || packet.header.pType === PacketType.ST_RESET) { await request.handleUtpPacket(packet) @@ -110,7 +115,7 @@ export class RequestManager { } this.logger.extend('PROCESS_CURRENT_PACKET')(`Processing ${PacketType[this.currentPacket.header.pType]} [${this.currentPacket.header.pType === PacketType.ST_STATE ? this.currentPacket.header.ackNr : this.currentPacket.header.seqNr}] for Req:${this.currentPacket.header.connectionId}`) const request = this.lookupRequest(this.currentPacket.header.connectionId) - if (request === undefined) { + if (request === undefined || request === 'closed') { this.logger.extend('PROCESS_CURRENT_PACKET')(`Request not found for current packet - connectionId: ${this.currentPacket.header.connectionId}`) this.currentPacket = this.packetHeap.pop() await this.processCurrentPacket() From be049073f8bce6bf04a1e3561435f0513ac4ae39 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Fri, 17 Jan 2025 09:55:22 -0700 Subject: [PATCH 2/6] uTP: use handlePacket error content to choose response --- packages/portalnetwork/src/client/client.ts | 33 ++++++++--- .../src/wire/utp/PortalNetworkUtp/index.ts | 56 +++++++++++++++---- 2 files changed, 71 insertions(+), 18 deletions(-) diff --git a/packages/portalnetwork/src/client/client.ts b/packages/portalnetwork/src/client/client.ts index b3a6e48dd..d3f88bb58 100644 --- a/packages/portalnetwork/src/client/client.ts +++ b/packages/portalnetwork/src/client/client.ts @@ -391,6 +391,11 @@ export class PortalNetwork extends EventEmitter { ) => { this.metrics?.totalBytesReceived.inc(message.request.length) if (bytesToHex(message.protocol) === NetworkId.UTPNetwork) { + if (!this.discv5.findEnr(nodeAddress.nodeId)) { + this.logger.extend('TalkReq').extend('error')(`Received uTP packet from unknown node: ${nodeAddress.nodeId}. Adding to blacklist.`) + this.addToBlackList(nodeAddress.socketAddr) + return + } await this.handleUTP(nodeAddress, message, message.request) return } @@ -416,13 +421,18 @@ export class PortalNetwork extends EventEmitter { * @param packetBuffer uTP packet encoded to Buffer */ private handleUTP = async (src: INodeAddress, msg: ITalkReqMessage, packetBuffer: Buffer) => { + if (this.uTP.requestManagers[src.nodeId] === undefined) { + this.logger.extend('handleUTP').extend('error')(`Received uTP packet from peer with no uTP stream history: ${src.nodeId}. Blacklisting peer.`) + this.addToBlackList(src.socketAddr) + return + } await this.sendPortalNetworkResponse(src, msg.id, new Uint8Array()) try { - await this.uTP.handleUtpPacket(packetBuffer, src.nodeId) + await this.uTP.handleUtpPacket(packetBuffer, src) } catch (err: any) { this.logger.extend('error')( - - `handleUTP error: ${err.message}. SrcId: ${src.nodeId + `handleUTP error: ${err.message}. SrcId: ${ + src.nodeId } MultiAddr: ${src.socketAddr.toString()}`, ) } @@ -442,7 +452,10 @@ export class PortalNetwork extends EventEmitter { utpMessage?: boolean, ): Promise => { const messageNetwork = utpMessage !== undefined ? NetworkId.UTPNetwork : networkId - const remote = enr instanceof ENR ? enr : this.discv5.findEnr(enr.nodeId) ?? fromNodeAddress(enr.socketAddr.nodeAddress(), 'udp') + const remote = + enr instanceof ENR + ? enr + : (this.discv5.findEnr(enr.nodeId) ?? fromNodeAddress(enr.socketAddr.nodeAddress(), 'udp')) try { this.metrics?.totalBytesSent.inc(payload.length) const res = await this.discv5.sendTalkReq( @@ -454,10 +467,14 @@ export class PortalNetwork extends EventEmitter { return res } catch (err: any) { if (networkId === NetworkId.UTPNetwork || utpMessage === true) { - throw new Error(`Error sending uTP TALKREQ message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err.message}`) + throw new Error( + `Error sending uTP TALKREQ message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err.message}`, + ) } else { const messageType = PortalWireMessageType.deserialize(payload).selector - throw new Error(`Error sending TALKREQ ${MessageCodes[messageType]} message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err}. NetworkId: ${networkId} NodeId: ${enr.nodeId} MultiAddr: ${enr instanceof ENR ? enr.getLocationMultiaddr('udp')?.toString() : enr.socketAddr.toString()}`) + throw new Error( + `Error sending TALKREQ ${MessageCodes[messageType]} message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err}. NetworkId: ${networkId} NodeId: ${enr.nodeId} MultiAddr: ${enr instanceof ENR ? enr.getLocationMultiaddr('udp')?.toString() : enr.socketAddr.toString()}`, + ) } } } @@ -472,7 +489,9 @@ export class PortalNetwork extends EventEmitter { try { await this.discv5.sendTalkResp(src, requestId, payload) } catch (err: any) { - this.logger.extend('error')(`Error sending TALKRESP message: ${err}. SrcId: ${src.nodeId} MultiAddr: ${src.socketAddr.toString()}`) + this.logger.extend('error')( + `Error sending TALKRESP message: ${err}. SrcId: ${src.nodeId} MultiAddr: ${src.socketAddr.toString()}`, + ) } } diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts index 57b925fb8..5f2fa2ea0 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts @@ -1,4 +1,13 @@ +import type { + ContentRequestType, + INewRequest, + INodeAddress, + PortalNetwork, +} from '../../../index.js' import { + NetworkId, + Packet, + PacketType, RequestCode, UtpSocketType, createContentRequest, @@ -8,13 +17,6 @@ import { createUtpSocket } from '../Socket/index.js' import type { ENR } from '@chainsafe/enr' import type { Debugger } from 'debug' -import type { - ContentRequestType, - INewRequest, - INodeAddress, - NetworkId, - PortalNetwork, - } from '../../../index.js' import type { SocketType } from '../Socket/index.js' import { RequestManager } from './requestManager.js' @@ -118,11 +120,43 @@ export class PortalNetworkUTP { return newRequest } - async handleUtpPacket(packetBuffer: Buffer, srcId: string): Promise { - if (this.requestManagers[srcId] === undefined) { - throw new Error(`No request manager for ${srcId}`) + async handleUtpPacket(packetBuffer: Buffer, srcId: INodeAddress): Promise { + if (this.requestManagers[srcId.nodeId] === undefined) { + throw new Error(`No request manager for ${srcId.nodeId}`) + } + try { + await this.requestManagers[srcId.nodeId].handlePacket(packetBuffer) + } catch (err: any) { + switch (err.message) { + case `REQUEST_CLOSED`: { + // Packet arrived after request was closed. Send RESET packet to peer. + const packet = Packet.fromBuffer(packetBuffer) + const resetPacket = new Packet({ + header: { + connectionId: packet.header.connectionId, + pType: PacketType.ST_RESET, + ackNr: 0, + extension: 0, + version: 0, + timestampMicroseconds: 0, + timestampDifferenceMicroseconds: 0, + seqNr: 0, + wndSize: 0, + }, + }) + await this.send(srcId, resetPacket.encode(), NetworkId.UTPNetwork) + break + } + case `REQUEST_NOT_FOUND`: { + // Packet arrived for non-existent request. Treat as spam and blacklist peer. + this.client.addToBlackList(srcId.socketAddr) + break + } + default: { + throw err + } + } } - await this.requestManagers[srcId].handlePacket(packetBuffer) } async send(enr: ENR | INodeAddress, msg: Buffer, networkId: NetworkId) { From 32f25cc9ceaf39c7af48fbf79973faf9984a4262 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Fri, 17 Jan 2025 11:31:11 -0700 Subject: [PATCH 3/6] uTP: mark closed as 'closed' --- .../src/wire/utp/PortalNetworkUtp/requestManager.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts index cbed28e0f..18302e351 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts @@ -155,7 +155,7 @@ export class RequestManager { } this.logger.extend('CLOSE_REQUEST')(`Closing request ${connectionId}`) this.removeRequestPackets(connectionId) - delete this.requestMap[connectionId] + this.requestMap[connectionId] = 'closed' } closeAllRequests() { From 3d0980554db31b90f418be99b975e2b41dbe4aa7 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Fri, 17 Jan 2025 11:31:31 -0700 Subject: [PATCH 4/6] uTP: do not send RESET on RESET --- .../utp/PortalNetworkUtp/requestManager.ts | 70 ++++++++++++------- 1 file changed, 46 insertions(+), 24 deletions(-) diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts index 18302e351..94c2ac23f 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts @@ -74,33 +74,55 @@ export class RequestManager { await request.init() } - /** - * Handles an incoming uTP packet. - * @param packetBuffer buffer containing the incoming packet - */ - async handlePacket(packetBuffer: Buffer) { - const packet = Packet.fromBuffer(packetBuffer) - const request = this.lookupRequest(packet.header.connectionId) - if (request === 'closed') { - this.logger.extend('HANDLE_PACKET')(`Request closed - connectionId: ${packet.header.connectionId}. Sending RESET to peer.`) - throw new Error(`REQUEST_CLOSED`) + /** + * Handles an incoming uTP packet. + * @param packetBuffer buffer containing the incoming packet + */ + async handlePacket(packetBuffer: Buffer) { + const packet = Packet.fromBuffer(packetBuffer) + const request = this.lookupRequest(packet.header.connectionId) + switch (request) { + case 'closed': { + if (packet.header.pType === PacketType.ST_RESET) { + this.logger.extend('HANDLE_PACKET')( + `Received RESET for closed request - connectionId: ${packet.header.connectionId}.`, + ) + return } - if (request === undefined) { - this.logger.extend('HANDLE_PACKET')(`Request not found for packet - connectionId: ${packet.header.connectionId}. Blacklisting peer.`) - throw new Error(`REQUEST_NOT_FOUND`) - } - if (packet.header.pType === PacketType.ST_SYN || packet.header.pType === PacketType.ST_RESET) { - await request.handleUtpPacket(packet) - return + this.logger.extend('HANDLE_PACKET')( + `Request closed - connectionId: ${packet.header.connectionId}. Sending RESET to peer.`, + ) + throw new Error(`REQUEST_CLOSED`) + } + case undefined: { + this.logger.extend('HANDLE_PACKET')( + `Request not found for packet - connectionId: ${packet.header.connectionId}. Blacklisting peer.`, + ) + throw new Error(`REQUEST_NOT_FOUND`) + } + default: { + if ( + packet.header.pType === PacketType.ST_SYN || + packet.header.pType === PacketType.ST_RESET + ) { + this.logger.extend('HANDLE_PACKET')( + `Processing SYN packet for Req:${packet.header.connectionId}`, + ) + await request.handleUtpPacket(packet) + return } else { - this.packetHeap.push(packet) - } - this.logger.extend('HANDLE_PACKET')(`Adding ${PacketType[packet.header.pType]} [${packet.header.pType === PacketType.ST_STATE ? packet.header.ackNr : packet.header.seqNr}] for Req:${packet.header.connectionId} to queue (size: ${this.packetHeap.size()} packets)`) - if (this.currentPacket === undefined) { - this.currentPacket = this.packetHeap.pop() - await this.processCurrentPacket() + this.packetHeap.push(packet) + this.logger.extend('HANDLE_PACKET')( + `Adding ${PacketType[packet.header.pType]} [${packet.header.pType === PacketType.ST_STATE ? packet.header.ackNr : packet.header.seqNr}] for Req:${packet.header.connectionId} to queue (size: ${this.packetHeap.size()} packets)`, + ) } + } + } + if (this.currentPacket === undefined) { + this.currentPacket = this.packetHeap.pop() + await this.processCurrentPacket() } + } async processCurrentPacket(): Promise { this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet Queue Size: ${this.packetHeap.size()}`) @@ -116,7 +138,7 @@ export class RequestManager { this.logger.extend('PROCESS_CURRENT_PACKET')(`Processing ${PacketType[this.currentPacket.header.pType]} [${this.currentPacket.header.pType === PacketType.ST_STATE ? this.currentPacket.header.ackNr : this.currentPacket.header.seqNr}] for Req:${this.currentPacket.header.connectionId}`) const request = this.lookupRequest(this.currentPacket.header.connectionId) if (request === undefined || request === 'closed') { - this.logger.extend('PROCESS_CURRENT_PACKET')(`Request not found for current packet - connectionId: ${this.currentPacket.header.connectionId}`) + this.logger.extend('PROCESS_CURRENT_PACKET')(`Request: ${request} for current packet - connectionId: ${this.currentPacket.header.connectionId}. Skipping packet.`) this.currentPacket = this.packetHeap.pop() await this.processCurrentPacket() return From 4c50c899cdefd66aac936f1a6a144e7d7f7c46b6 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Fri, 17 Jan 2025 14:22:40 -0700 Subject: [PATCH 5/6] correct packetType log --- .../src/wire/utp/PortalNetworkUtp/requestManager.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts index 94c2ac23f..323324033 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts @@ -105,8 +105,9 @@ export class RequestManager { packet.header.pType === PacketType.ST_SYN || packet.header.pType === PacketType.ST_RESET ) { + const packetType = packet.header.pType === PacketType.ST_SYN ? 'SYN' : 'RESET' this.logger.extend('HANDLE_PACKET')( - `Processing SYN packet for Req:${packet.header.connectionId}`, + `Processing ${packetType} packet for Req:${packet.header.connectionId}`, ) await request.handleUtpPacket(packet) return From 100afb06d273640a5542641f5b0d37954e959c57 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Fri, 17 Jan 2025 14:23:04 -0700 Subject: [PATCH 6/6] uTP: remove discv5 routing table check --- packages/portalnetwork/src/client/client.ts | 5 ----- 1 file changed, 5 deletions(-) diff --git a/packages/portalnetwork/src/client/client.ts b/packages/portalnetwork/src/client/client.ts index d3f88bb58..d271a6e4f 100644 --- a/packages/portalnetwork/src/client/client.ts +++ b/packages/portalnetwork/src/client/client.ts @@ -391,11 +391,6 @@ export class PortalNetwork extends EventEmitter { ) => { this.metrics?.totalBytesReceived.inc(message.request.length) if (bytesToHex(message.protocol) === NetworkId.UTPNetwork) { - if (!this.discv5.findEnr(nodeAddress.nodeId)) { - this.logger.extend('TalkReq').extend('error')(`Received uTP packet from unknown node: ${nodeAddress.nodeId}. Adding to blacklist.`) - this.addToBlackList(nodeAddress.socketAddr) - return - } await this.handleUTP(nodeAddress, message, message.request) return }