Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return ephemeral headers in FOUNDCONTENT #744

Merged
merged 13 commits into from
Mar 1, 2025
8 changes: 6 additions & 2 deletions packages/portalnetwork/src/networks/beacon/beacon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import debug from 'debug'
import { getENR, shortId } from '../../util/util.js'
import {
FoundContent,
MAX_PACKET_SIZE,
MAX_UDP_PACKET_SIZE,
RequestCode,
encodeWithVariantPrefix,
getTalkReqOverhead,
randUint16,
} from '../../wire/index.js'
import { ContentMessageType, MessageCodes, PortalWireMessageType } from '../../wire/types.js'
Expand Down Expand Up @@ -573,7 +574,10 @@ export class BeaconLightClientNetwork extends BaseNetwork {
const value = await this.findContentLocally(decodedContentMessage.contentKey)
if (!value) {
await this.enrResponse(decodedContentMessage.contentKey, src, requestId)
} else if (value !== undefined && value.length < MAX_PACKET_SIZE) {
} else if (
value !== undefined &&
value.length < MAX_UDP_PACKET_SIZE - getTalkReqOverhead(hexToBytes(this.networkId).byteLength)
) {
this.logger(
'Found value for requested content ' +
bytesToHex(decodedContentMessage.contentKey) +
Expand Down
6 changes: 5 additions & 1 deletion packages/portalnetwork/src/networks/beacon/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ export const HistoricalSummariesKey = new ContainerType({ epoch: new UintBigintT

export const HistoricalSummariesStateProof = new VectorCompositeType(Bytes32Type, 5)

export const HistoricalSummariesWithProof = new ContainerType(
export const HistoricalSummariesWithProof = new ContainerType<{
epoch: UintBigintType
historicalSummaries: typeof ssz.capella.BeaconState.fields.historicalSummaries
proof: typeof HistoricalSummariesStateProof
}>(
{
epoch: new UintBigintType(8),
historicalSummaries: ssz.capella.BeaconState.fields.historicalSummaries,
Expand Down
112 changes: 88 additions & 24 deletions packages/portalnetwork/src/networks/history/history.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,27 @@ import debug from 'debug'
import type {
BaseNetworkConfig,
ContentLookupResponse,
EphemeralHeaderKeyValues,
FindContentMessage,
INodeAddress,
} from '../../index.js'
import {
BasicRadius,
BiMap,
ClientInfoAndCapabilities,
ContentMessageType,
FoundContent,
HistoricalSummariesBlockProof,
HistoryRadius,
MAX_PACKET_SIZE,
MAX_UDP_PACKET_SIZE,
MessageCodes,
PingPongPayloadExtensions,
PortalWireMessageType,
RequestCode,
decodeHistoryNetworkContentKey,
decodeReceipts,
encodeClientInfo,
getTalkReqOverhead,
randUint16,
reassembleBlock,
saveReceipts,
Expand Down Expand Up @@ -59,7 +62,7 @@ export class HistoryNetwork extends BaseNetwork {
networkId: NetworkId.HistoryNetwork
networkName = 'HistoryNetwork'
logger: Debugger
public ephemeralHeaderIndex: Map<bigint, string> // Map of slot numbers to hashes
public ephemeralHeaderIndex: BiMap<bigint, string> // Map of block number to block hashes
public blockHashIndex: Map<string, string>
constructor({ client, db, radius, maxStorage }: BaseNetworkConfig) {
super({ client, networkId: NetworkId.HistoryNetwork, db, radius, maxStorage })
Expand All @@ -71,7 +74,7 @@ export class HistoryNetwork extends BaseNetwork {
this.logger = debug(this.enr.nodeId.slice(0, 5)).extend('Portal').extend('HistoryNetwork')
this.routingTable.setLogger(this.logger)
this.blockHashIndex = new Map()
this.ephemeralHeaderIndex = new Map()
this.ephemeralHeaderIndex = new BiMap()
}

public blockNumberToHash(blockNumber: bigint): Uint8Array | undefined {
Expand Down Expand Up @@ -237,7 +240,6 @@ export class HistoryNetwork extends BaseNetwork {
let deserializedProof: ReturnType<typeof HistoricalSummariesBlockProof.deserialize>
try {
deserializedProof = HistoricalSummariesBlockProof.deserialize(proof)
console.log(HistoricalSummariesBlockProof.toJson(deserializedProof))
} catch (err: any) {
this.logger(`invalid proof for block ${bytesToHex(header.hash())}`)
throw new Error(`invalid proof for block ${bytesToHex(header.hash())}`)
Expand Down Expand Up @@ -305,18 +307,6 @@ export class HistoryNetwork extends BaseNetwork {
* @returns the value of the FOUNDCONTENT response or undefined
*/
public sendFindContent = async (enr: ENR, key: Uint8Array) => {
if (key[0] === HistoryNetworkContentType.EphemeralHeader) {
const beacon = this.portal.network()['0x500c']
if (
beacon === undefined ||
beacon.lightClient?.status === RunStatusCode.uninitialized ||
beacon.lightClient?.status === RunStatusCode.stopped
) {
const errorMessage = 'Cannot verify ephemeral headers when beacon network is not running'
this.logger.extend('FINDCONTENT')(errorMessage)
throw new Error(errorMessage)
}
}
this.portal.metrics?.findContentMessagesSent.inc()
const findContentMsg: FindContentMessage = { contentKey: key }
const payload = PortalWireMessageType.serialize({
Expand Down Expand Up @@ -394,20 +384,90 @@ export class HistoryNetwork extends BaseNetwork {
)}`,
)

// TODO: Add specific support for retrieving ephemeral headers
const value = await this.findContentLocally(decodedContentMessage.contentKey)
const contentKey = decodeHistoryNetworkContentKey(decodedContentMessage.contentKey)
let value: Uint8Array | undefined
if (contentKey.contentType === HistoryNetworkContentType.EphemeralHeader) {
if (contentKey.keyOpt.ancestorCount < 0 || contentKey.keyOpt.ancestorCount > 255) {
const errorMessage = `received invalid ephemeral headers request with invalid ancestorCount: expected 0 <= 255, got ${contentKey.keyOpt.ancestorCount}`
this.logger.extend('FOUNDCONTENT')(errorMessage)
throw new Error(errorMessage)
}
this.logger.extend('FOUNDCONTENT')(
`Received ephemeral headers request for block ${bytesToHex(contentKey.keyOpt.blockHash)} with ancestorCount ${contentKey.keyOpt.ancestorCount}`,
)
// Retrieve the starting header from the FINDCONTENT request
const headerKey = getEphemeralHeaderDbKey(contentKey.keyOpt.blockHash)
const firstHeader = await this.findContentLocally(headerKey)

if (firstHeader === undefined) {
// If we don't have the requested header, send an empty payload
// We never send an ENRs response for ephemeral headers
value = undefined
const emptyHeaderPayload = EphemeralHeaderPayload.serialize([])
const messagePayload = ContentMessageType.serialize({
selector: FoundContent.CONTENT,
value: emptyHeaderPayload,
})
this.logger.extend('FOUNDCONTENT')(
`Header not found for ${bytesToHex(contentKey.keyOpt.blockHash)}, sending empty ephemeral headers response to ${shortId(src.nodeId)}`,
)
await this.sendResponse(
src,
requestId,
concatBytes(Uint8Array.from([MessageCodes.CONTENT]), messagePayload),
)
return
} else {
this.logger.extend('FOUNDCONTENT')(
`Header found for ${bytesToHex(contentKey.keyOpt.blockHash)}, assembling ephemeral headers response to ${shortId(src.nodeId)}`,
)
// We have the requested header so begin assembling the payload
const headersList = [firstHeader]
const firstHeaderNumber = this.ephemeralHeaderIndex.getByValue(
bytesToHex(contentKey.keyOpt.blockHash),
)
for (let x = 1; x <= contentKey.keyOpt.ancestorCount; x++) {
// Determine if we have the ancestor header at block number `firstHeaderNumber - x`
const ancestorNumber = firstHeaderNumber! - BigInt(x)
const ancestorHash = this.ephemeralHeaderIndex.getByKey(ancestorNumber)
if (ancestorHash === undefined)
break // Stop looking for more ancestors if we don't have the current one in the index
else {
const ancestorKey = getEphemeralHeaderDbKey(hexToBytes(ancestorHash))
const ancestorHeader = await this.findContentLocally(ancestorKey)
if (ancestorHeader === undefined) {
// This would only happen if our index gets out of sync with the DB
// Stop looking for more ancestors if we don't have the current one in the DB
this.ephemeralHeaderIndex.delete(ancestorNumber)
break
} else {
headersList.push(ancestorHeader)
}
}
}
this.logger.extend('FOUNDCONTENT')(
`found ${headersList.length - 1} ancestor headers for ${bytesToHex(contentKey.keyOpt.blockHash)}`,
)
value = EphemeralHeaderPayload.serialize(headersList)
}
} else {
value = await this.findContentLocally(decodedContentMessage.contentKey)
}
if (!value) {
await this.enrResponse(decodedContentMessage.contentKey, src, requestId)
} else if (value instanceof Uint8Array && value.length < MAX_PACKET_SIZE) {
this.logger(
} else if (
value instanceof Uint8Array &&
value.length < MAX_UDP_PACKET_SIZE - getTalkReqOverhead(hexToBytes(this.networkId).byteLength)
) {
this.logger.extend('FOUNDCONTENT')(
'Found value for requested content ' +
bytesToHex(decodedContentMessage.contentKey) +
' ' +
bytesToHex(value.slice(0, 10)) +
`...`,
)
const payload = ContentMessageType.serialize({
selector: 1,
selector: FoundContent.CONTENT,
value,
})
this.logger.extend('CONTENT')(`Sending requested content to ${src.nodeId}`)
Expand Down Expand Up @@ -496,14 +556,18 @@ export class HistoryNetwork extends BaseNetwork {

case HistoryNetworkContentType.EphemeralHeader: {
const payload = EphemeralHeaderPayload.deserialize(value)
if (payload.length === 0) {
this.logger.extend('STORE')('Received empty ephemeral header payload')
return
}
try {
// Verify first header matches requested header
const firstHeader = BlockHeader.fromRLPSerializedHeader(payload[0], { setHardfork: true })
const requestedHeaderHash = decodeHistoryNetworkContentKey(contentKey)
.keyOpt as Uint8Array
if (!equalsBytes(firstHeader.hash(), requestedHeaderHash)) {
.keyOpt as EphemeralHeaderKeyValues
if (!equalsBytes(firstHeader.hash(), requestedHeaderHash.blockHash)) {
// TODO: Should we ban/mark down the score of peers who send junk payload?
const errorMessage = `invalid ephemeral header payload; requested ${bytesToHex(requestedHeaderHash)}, got ${bytesToHex(firstHeader.hash())}`
const errorMessage = `invalid ephemeral header payload; requested ${bytesToHex(requestedHeaderHash.blockHash)}, got ${bytesToHex(firstHeader.hash())}`
this.logger(errorMessage)
throw new Error(errorMessage)
}
Expand Down
5 changes: 5 additions & 0 deletions packages/portalnetwork/src/networks/history/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,8 @@ export const EphemeralHeaderPayload = new ListCompositeType(
BlockHeader,
MAX_EPHEMERAL_HEADERS_PAYLOAD,
)

export type EphemeralHeaderKeyValues = {
blockHash: Uint8Array
ancestorCount: number
}
10 changes: 7 additions & 3 deletions packages/portalnetwork/src/networks/history/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import type { WithdrawalBytes } from '@ethereumjs/util'
import type { ForkConfig } from '@lodestar/config'
import type { HistoryNetwork } from './history.js'
import type { BlockBodyContent, Witnesses } from './types.js'
import type { EphemeralHeaderKeyValues } from '../history/types.js'

export const BlockHeaderByNumberKey = (blockNumber: bigint) => {
return Uint8Array.from([
Expand All @@ -57,7 +58,7 @@ export const BlockHeaderByNumberKey = (blockNumber: bigint) => {
*/
export const getContentKey = (
contentType: HistoryNetworkContentType,
key: Uint8Array | bigint | { blockHash: Uint8Array; ancestorCount: number },
key: Uint8Array | bigint | EphemeralHeaderKeyValues,
): Uint8Array => {
let encodedKey
switch (contentType) {
Expand Down Expand Up @@ -120,12 +121,15 @@ export const decodeHistoryNetworkContentKey = (
| HistoryNetworkContentType.BlockHeader
| HistoryNetworkContentType.BlockBody
| HistoryNetworkContentType.Receipt
| HistoryNetworkContentType.EphemeralHeader
keyOpt: Uint8Array
}
| {
contentType: HistoryNetworkContentType.BlockHeaderByNumber
keyOpt: bigint
}
| {
contentType: HistoryNetworkContentType.EphemeralHeader
keyOpt: EphemeralHeaderKeyValues
} => {
const contentType: HistoryNetworkContentType = contentKey[0]
switch (contentType) {
Expand All @@ -140,7 +144,7 @@ export const decodeHistoryNetworkContentKey = (
const key = EphemeralHeaderKey.deserialize(contentKey.slice(1))
return {
contentType,
keyOpt: key.blockHash,
keyOpt: key,
}
}
default: {
Expand Down
19 changes: 14 additions & 5 deletions packages/portalnetwork/src/networks/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import {
ContentMessageType,
ErrorPayload,
HistoryRadius,
MAX_PACKET_SIZE,
MAX_UDP_PACKET_SIZE,
MessageCodes,
NodeLookup,
PingPongErrorCodes,
Expand All @@ -48,6 +48,7 @@ import {
encodeClientInfo,
encodeWithVariantPrefix,
generateRandomNodeIdAtDistance,
getTalkReqOverhead,
randUint16,
shortId,
} from '../index.js'
Expand Down Expand Up @@ -211,7 +212,6 @@ export abstract class BaseNetwork extends EventEmitter {

public async handle(message: ITalkReqMessage, src: INodeAddress) {
const id = message.id
const network = message.protocol
const request = message.request
const deserialized = PortalWireMessageType.deserialize(request)
const decoded = deserialized.value
Expand Down Expand Up @@ -378,7 +378,9 @@ export abstract class BaseNetwork extends EventEmitter {
if (this.capabilities.includes(pingMessage.payloadType)) {
switch (pingMessage.payloadType) {
case PingPongPayloadExtensions.CLIENT_INFO_RADIUS_AND_CAPABILITIES: {
const { DataRadius, Capabilities, ClientInfo } = ClientInfoAndCapabilities.deserialize(pingMessage.customPayload)
const { DataRadius, Capabilities, ClientInfo } = ClientInfoAndCapabilities.deserialize(
pingMessage.customPayload,
)
this.routingTable.updateRadius(src.nodeId, DataRadius)
this.portal.enrCache.updateNodeFromPing(src, this.networkId, {
capabilities: Capabilities,
Expand Down Expand Up @@ -761,7 +763,10 @@ export abstract class BaseNetwork extends EventEmitter {
const value = await this.findContentLocally(decodedContentMessage.contentKey)
if (!value) {
await this.enrResponse(decodedContentMessage.contentKey, src, requestId)
} else if (value instanceof Uint8Array && value.length < MAX_PACKET_SIZE) {
} else if (
value instanceof Uint8Array &&
value.length < MAX_UDP_PACKET_SIZE - getTalkReqOverhead(hexToBytes(this.networkId).byteLength)
) {
this.logger(
'Found value for requested content ' +
bytesToHex(decodedContentMessage.contentKey) +
Expand Down Expand Up @@ -817,7 +822,11 @@ export abstract class BaseNetwork extends EventEmitter {
if (encodedEnrs.length > 0) {
this.logger.extend('FINDCONTENT')(`Found ${encodedEnrs.length} closer to content`)
// TODO: Add capability to send multiple TALKRESP messages if # ENRs exceeds packet size
while (encodedEnrs.length > 0 && arrayByteLength(encodedEnrs) > MAX_PACKET_SIZE) {
while (
encodedEnrs.length > 0 &&
arrayByteLength(encodedEnrs) >
MAX_UDP_PACKET_SIZE - getTalkReqOverhead(hexToBytes(this.networkId).byteLength)
) {
// Remove ENRs until total ENRs less than 1200 bytes
encodedEnrs.pop()
}
Expand Down
Loading