diff --git a/packages/portalnetwork/src/client/client.ts b/packages/portalnetwork/src/client/client.ts index b3a6e48dd..d271a6e4f 100644 --- a/packages/portalnetwork/src/client/client.ts +++ b/packages/portalnetwork/src/client/client.ts @@ -416,13 +416,18 @@ export class PortalNetwork extends EventEmitter { * @param packetBuffer uTP packet encoded to Buffer */ private handleUTP = async (src: INodeAddress, msg: ITalkReqMessage, packetBuffer: Buffer) => { + if (this.uTP.requestManagers[src.nodeId] === undefined) { + this.logger.extend('handleUTP').extend('error')(`Received uTP packet from peer with no uTP stream history: ${src.nodeId}. Blacklisting peer.`) + this.addToBlackList(src.socketAddr) + return + } await this.sendPortalNetworkResponse(src, msg.id, new Uint8Array()) try { - await this.uTP.handleUtpPacket(packetBuffer, src.nodeId) + await this.uTP.handleUtpPacket(packetBuffer, src) } catch (err: any) { this.logger.extend('error')( - - `handleUTP error: ${err.message}. SrcId: ${src.nodeId + `handleUTP error: ${err.message}. SrcId: ${ + src.nodeId } MultiAddr: ${src.socketAddr.toString()}`, ) } @@ -442,7 +447,10 @@ export class PortalNetwork extends EventEmitter { utpMessage?: boolean, ): Promise => { const messageNetwork = utpMessage !== undefined ? NetworkId.UTPNetwork : networkId - const remote = enr instanceof ENR ? enr : this.discv5.findEnr(enr.nodeId) ?? fromNodeAddress(enr.socketAddr.nodeAddress(), 'udp') + const remote = + enr instanceof ENR + ? enr + : (this.discv5.findEnr(enr.nodeId) ?? fromNodeAddress(enr.socketAddr.nodeAddress(), 'udp')) try { this.metrics?.totalBytesSent.inc(payload.length) const res = await this.discv5.sendTalkReq( @@ -454,10 +462,14 @@ export class PortalNetwork extends EventEmitter { return res } catch (err: any) { if (networkId === NetworkId.UTPNetwork || utpMessage === true) { - throw new Error(`Error sending uTP TALKREQ message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err.message}`) + throw new Error( + `Error sending uTP TALKREQ message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err.message}`, + ) } else { const messageType = PortalWireMessageType.deserialize(payload).selector - throw new Error(`Error sending TALKREQ ${MessageCodes[messageType]} message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err}. NetworkId: ${networkId} NodeId: ${enr.nodeId} MultiAddr: ${enr instanceof ENR ? enr.getLocationMultiaddr('udp')?.toString() : enr.socketAddr.toString()}`) + throw new Error( + `Error sending TALKREQ ${MessageCodes[messageType]} message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err}. NetworkId: ${networkId} NodeId: ${enr.nodeId} MultiAddr: ${enr instanceof ENR ? enr.getLocationMultiaddr('udp')?.toString() : enr.socketAddr.toString()}`, + ) } } } @@ -472,7 +484,9 @@ export class PortalNetwork extends EventEmitter { try { await this.discv5.sendTalkResp(src, requestId, payload) } catch (err: any) { - this.logger.extend('error')(`Error sending TALKRESP message: ${err}. SrcId: ${src.nodeId} MultiAddr: ${src.socketAddr.toString()}`) + this.logger.extend('error')( + `Error sending TALKRESP message: ${err}. SrcId: ${src.nodeId} MultiAddr: ${src.socketAddr.toString()}`, + ) } } diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts index 57b925fb8..5f2fa2ea0 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts @@ -1,4 +1,13 @@ +import type { + ContentRequestType, + INewRequest, + INodeAddress, + PortalNetwork, +} from '../../../index.js' import { + NetworkId, + Packet, + PacketType, RequestCode, UtpSocketType, createContentRequest, @@ -8,13 +17,6 @@ import { createUtpSocket } from '../Socket/index.js' import type { ENR } from '@chainsafe/enr' import type { Debugger } from 'debug' -import type { - ContentRequestType, - INewRequest, - INodeAddress, - NetworkId, - PortalNetwork, - } from '../../../index.js' import type { SocketType } from '../Socket/index.js' import { RequestManager } from './requestManager.js' @@ -118,11 +120,43 @@ export class PortalNetworkUTP { return newRequest } - async handleUtpPacket(packetBuffer: Buffer, srcId: string): Promise { - if (this.requestManagers[srcId] === undefined) { - throw new Error(`No request manager for ${srcId}`) + async handleUtpPacket(packetBuffer: Buffer, srcId: INodeAddress): Promise { + if (this.requestManagers[srcId.nodeId] === undefined) { + throw new Error(`No request manager for ${srcId.nodeId}`) + } + try { + await this.requestManagers[srcId.nodeId].handlePacket(packetBuffer) + } catch (err: any) { + switch (err.message) { + case `REQUEST_CLOSED`: { + // Packet arrived after request was closed. Send RESET packet to peer. + const packet = Packet.fromBuffer(packetBuffer) + const resetPacket = new Packet({ + header: { + connectionId: packet.header.connectionId, + pType: PacketType.ST_RESET, + ackNr: 0, + extension: 0, + version: 0, + timestampMicroseconds: 0, + timestampDifferenceMicroseconds: 0, + seqNr: 0, + wndSize: 0, + }, + }) + await this.send(srcId, resetPacket.encode(), NetworkId.UTPNetwork) + break + } + case `REQUEST_NOT_FOUND`: { + // Packet arrived for non-existent request. Treat as spam and blacklist peer. + this.client.addToBlackList(srcId.socketAddr) + break + } + default: { + throw err + } + } } - await this.requestManagers[srcId].handlePacket(packetBuffer) } async send(enr: ENR | INodeAddress, msg: Buffer, networkId: NetworkId) { diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts index ce06d5c87..323324033 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts @@ -15,7 +15,7 @@ const packetComparator: Comparator> = (a: Packet, } export class RequestManager { peerId: string - requestMap: Record + requestMap: Record logger: Debugger packetHeap: Heap> currentPacket: Packet | undefined @@ -34,8 +34,9 @@ export class RequestManager { * @param connectionId connectionId field from incoming packet header * @returns corresponding requestId */ - lookupRequest(connectionId: number): ContentRequest | undefined { - return this.requestMap[connectionId] ?? this.requestMap[connectionId - 1] ?? this.requestMap[connectionId + 1] + lookupRequest(connectionId: number): ContentRequest | 'closed' | undefined { + const request = this.requestMap[connectionId] ?? this.requestMap[connectionId - 1] ?? this.requestMap[connectionId + 1] + return request } /** @@ -73,29 +74,56 @@ export class RequestManager { await request.init() } - /** - * Handles an incoming uTP packet. - * @param packetBuffer buffer containing the incoming packet - */ - async handlePacket(packetBuffer: Buffer) { - const packet = Packet.fromBuffer(packetBuffer) - const request = this.lookupRequest(packet.header.connectionId) - if (request === undefined) { - this.logger.extend('HANDLE_PACKET')(`Request not found for packet - connectionId: ${packet.header.connectionId}`) - return + /** + * Handles an incoming uTP packet. + * @param packetBuffer buffer containing the incoming packet + */ + async handlePacket(packetBuffer: Buffer) { + const packet = Packet.fromBuffer(packetBuffer) + const request = this.lookupRequest(packet.header.connectionId) + switch (request) { + case 'closed': { + if (packet.header.pType === PacketType.ST_RESET) { + this.logger.extend('HANDLE_PACKET')( + `Received RESET for closed request - connectionId: ${packet.header.connectionId}.`, + ) + return } - if (packet.header.pType === PacketType.ST_SYN || packet.header.pType === PacketType.ST_RESET) { - await request.handleUtpPacket(packet) - return + this.logger.extend('HANDLE_PACKET')( + `Request closed - connectionId: ${packet.header.connectionId}. Sending RESET to peer.`, + ) + throw new Error(`REQUEST_CLOSED`) + } + case undefined: { + this.logger.extend('HANDLE_PACKET')( + `Request not found for packet - connectionId: ${packet.header.connectionId}. Blacklisting peer.`, + ) + throw new Error(`REQUEST_NOT_FOUND`) + } + default: { + if ( + packet.header.pType === PacketType.ST_SYN || + packet.header.pType === PacketType.ST_RESET + ) { + const packetType = packet.header.pType === PacketType.ST_SYN ? 'SYN' : 'RESET' + this.logger.extend('HANDLE_PACKET')( + `Processing ${packetType} packet for Req:${packet.header.connectionId}`, + ) + await request.handleUtpPacket(packet) + return } else { - this.packetHeap.push(packet) - } - this.logger.extend('HANDLE_PACKET')(`Adding ${PacketType[packet.header.pType]} [${packet.header.pType === PacketType.ST_STATE ? packet.header.ackNr : packet.header.seqNr}] for Req:${packet.header.connectionId} to queue (size: ${this.packetHeap.size()} packets)`) - if (this.currentPacket === undefined) { - this.currentPacket = this.packetHeap.pop() - await this.processCurrentPacket() + this.packetHeap.push(packet) + this.logger.extend('HANDLE_PACKET')( + `Adding ${PacketType[packet.header.pType]} [${packet.header.pType === PacketType.ST_STATE ? packet.header.ackNr : packet.header.seqNr}] for Req:${packet.header.connectionId} to queue (size: ${this.packetHeap.size()} packets)`, + ) } + } + } + if (this.currentPacket === undefined) { + this.currentPacket = this.packetHeap.pop() + await this.processCurrentPacket() } + } async processCurrentPacket(): Promise { this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet Queue Size: ${this.packetHeap.size()}`) @@ -110,8 +138,8 @@ export class RequestManager { } this.logger.extend('PROCESS_CURRENT_PACKET')(`Processing ${PacketType[this.currentPacket.header.pType]} [${this.currentPacket.header.pType === PacketType.ST_STATE ? this.currentPacket.header.ackNr : this.currentPacket.header.seqNr}] for Req:${this.currentPacket.header.connectionId}`) const request = this.lookupRequest(this.currentPacket.header.connectionId) - if (request === undefined) { - this.logger.extend('PROCESS_CURRENT_PACKET')(`Request not found for current packet - connectionId: ${this.currentPacket.header.connectionId}`) + if (request === undefined || request === 'closed') { + this.logger.extend('PROCESS_CURRENT_PACKET')(`Request: ${request} for current packet - connectionId: ${this.currentPacket.header.connectionId}. Skipping packet.`) this.currentPacket = this.packetHeap.pop() await this.processCurrentPacket() return @@ -150,7 +178,7 @@ export class RequestManager { } this.logger.extend('CLOSE_REQUEST')(`Closing request ${connectionId}`) this.removeRequestPackets(connectionId) - delete this.requestMap[connectionId] + this.requestMap[connectionId] = 'closed' } closeAllRequests() {