Skip to content

Commit 38f6a1a

Browse files
committed
client: queue TalkReq priority 2 and TalkResp priority 1
1 parent 316f058 commit 38f6a1a

File tree

1 file changed

+54
-25
lines changed

1 file changed

+54
-25
lines changed

packages/portalnetwork/src/client/client.ts

+54-25
Original file line numberDiff line numberDiff line change
@@ -429,38 +429,67 @@ export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEvent
429429
networkId: NetworkId,
430430
utpMessage?: boolean,
431431
): Promise<Uint8Array> => {
432-
const messageNetwork = utpMessage !== undefined ? NetworkId.UTPNetwork : networkId
433-
const remote = enr instanceof ENR ? enr : this.discv5.findEnr(enr.nodeId) ?? fromNodeAddress(enr.socketAddr.nodeAddress(), 'udp')
434-
try {
435-
this.metrics?.totalBytesSent.inc(payload.length)
436-
const res = await this.discv5.sendTalkReq(
437-
remote,
438-
Buffer.from(payload),
439-
hexToBytes(messageNetwork),
440-
)
441-
this.eventLog && this.emit('SendTalkReq', enr.nodeId, bytesToHex(res), bytesToHex(payload))
442-
return res
443-
} catch (err: any) {
444-
if (networkId === NetworkId.UTPNetwork || utpMessage === true) {
445-
throw new Error(`Error sending uTP TALKREQ message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err.message}`)
446-
} else {
447-
const messageType = PortalWireMessageType.deserialize(payload).selector
448-
throw new Error(`Error sending TALKREQ ${MessageCodes[messageType]} message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err}. NetworkId: ${networkId} NodeId: ${enr.nodeId} MultiAddr: ${enr instanceof ENR ? enr.getLocationMultiaddr('udp')?.toString() : enr.socketAddr.toString()}`)
432+
// Queue requests with normal priority (0 is default)
433+
return this.messageQueue.add(async () => {
434+
const messageNetwork = utpMessage !== undefined ? NetworkId.UTPNetwork : networkId
435+
const remote =
436+
enr instanceof ENR
437+
? enr
438+
: (this.discv5.findEnr(enr.nodeId) ??
439+
fromNodeAddress(enr.socketAddr.nodeAddress(), 'udp'))
440+
try {
441+
this.metrics?.totalBytesSent.inc(payload.length)
442+
const res = await this.discv5.sendTalkReq(
443+
remote,
444+
Buffer.from(payload),
445+
hexToBytes(messageNetwork),
446+
)
447+
this.eventLog && this.emit('SendTalkReq', enr.nodeId, bytesToHex(res), bytesToHex(payload))
448+
return res
449+
} catch (err: any) {
450+
if (networkId === NetworkId.UTPNetwork || utpMessage === true) {
451+
throw new Error(
452+
`Error sending uTP TALKREQ message using ${
453+
enr instanceof ENR ? 'ENR' : 'MultiAddr'
454+
}: ${err.message}`,
455+
)
456+
} else {
457+
const messageType = PortalWireMessageType.deserialize(payload).selector
458+
throw new Error(
459+
`Error sending TALKREQ ${MessageCodes[messageType]} message using ${
460+
enr instanceof ENR ? 'ENR' : 'MultiAddr'
461+
}: ${err}. NetworkId: ${networkId} NodeId: ${enr.nodeId} MultiAddr: ${
462+
enr instanceof ENR
463+
? enr.getLocationMultiaddr('udp')?.toString()
464+
: enr.socketAddr.toString()
465+
}`,
466+
)
467+
}
449468
}
450-
}
469+
}) as Promise<Uint8Array>
451470
}
452471

453472
public sendPortalNetworkResponse = async (
454473
src: INodeAddress,
455474
requestId: bigint,
456475
payload: Uint8Array,
457476
) => {
458-
this.eventLog &&
459-
this.emit('SendTalkResp', src.nodeId, requestId.toString(16), bytesToHex(payload))
460-
try {
461-
await this.discv5.sendTalkResp(src, requestId, payload)
462-
} catch (err: any) {
463-
this.logger.extend('error')(`Error sending TALKRESP message: ${err}. SrcId: ${src.nodeId} MultiAddr: ${src.socketAddr.toString()}`)
464-
}
477+
// Queue responses with higher priority (1)
478+
return this.messageQueue.add(
479+
async () => {
480+
this.eventLog &&
481+
this.emit('SendTalkResp', src.nodeId, requestId.toString(16), bytesToHex(payload))
482+
try {
483+
await this.discv5.sendTalkResp(src, requestId, payload)
484+
} catch (err: any) {
485+
this.logger.extend('error')(
486+
`Error sending TALKRESP message: ${err}. SrcId: ${
487+
src.nodeId
488+
} MultiAddr: ${src.socketAddr.toString()}`,
489+
)
490+
}
491+
},
492+
{ priority: 1 },
493+
)
465494
}
466495
}

0 commit comments

Comments
 (0)