Skip to content

Commit 0a3a8ca

Browse files
authored
Implement handleOffer for ephemeral headers (#784)
* WIP * wip * wip * handling offers implemented * comment out non-working gossip handler * fix doc link
1 parent 572c047 commit 0a3a8ca

9 files changed

Lines changed: 1996 additions & 37 deletions

File tree

packages/portalnetwork/src/networks/beacon/beacon.ts

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -360,8 +360,7 @@ export class BeaconNetwork extends BaseNetwork {
360360
hexToBytes(intToHex(BeaconNetworkContentType.LightClientOptimisticUpdate)),
361361
)
362362
this.logger.extend('FINDLOCALLY')(
363-
`light client is not running, retrieving whatever we have - ${
364-
value !== undefined ? short(value) : 'nothing found'
363+
`light client is not running, retrieving whatever we have - ${value !== undefined ? short(value) : 'nothing found'
365364
}`,
366365
)
367366
} else {
@@ -371,10 +370,8 @@ export class BeaconNetwork extends BaseNetwork {
371370
case BeaconNetworkContentType.LightClientFinalityUpdate:
372371
key = LightClientFinalityUpdateKey.deserialize(contentKey.slice(1))
373372
this.logger.extend('FINDLOCALLY')(
374-
`looking for finality update for slot - ${
375-
key.finalitySlot
376-
} and local finalized update is for slot - ${
377-
this.lightClient?.getFinalized().beacon.slot ?? 'unavailable'
373+
`looking for finality update for slot - ${key.finalitySlot
374+
} and local finalized update is for slot - ${this.lightClient?.getFinalized().beacon.slot ?? 'unavailable'
378375
}`,
379376
)
380377
if (
@@ -589,10 +586,10 @@ export class BeaconNetwork extends BaseNetwork {
589586
) {
590587
this.logger(
591588
'Found value for requested content ' +
592-
bytesToHex(decodedContentMessage.contentKey) +
593-
' ' +
594-
bytesToHex(value.slice(0, 10)) +
595-
'...',
589+
bytesToHex(decodedContentMessage.contentKey) +
590+
' ' +
591+
bytesToHex(value.slice(0, 10)) +
592+
'...',
596593
)
597594
const payload = ContentMessageType.serialize({
598595
selector: 1,
@@ -650,7 +647,7 @@ export class BeaconNetwork extends BaseNetwork {
650647

651648
/**
652649
* The generalized `store` method used to put data into the DB
653-
* @param contentType the content type being stored (defined in @link { BeaconNetworkContentType })
650+
* @param contentType the content type being stored (defined in {@link BeaconNetworkContentType }
654651
* @param contentKey the network level content key formatted as a prefixed hex string
655652
* @param value the Uint8Array corresponding to the SSZ serialized value being stored
656653
*/
@@ -845,11 +842,11 @@ export class BeaconNetwork extends BaseNetwork {
845842
const requestedKeys: Uint8Array[] =
846843
version === 0
847844
? contentKeys.filter(
848-
(n, idx) => (<AcceptMessage<0>>msg).contentKeys.get(idx) === true,
849-
)
845+
(n, idx) => (<AcceptMessage<0>>msg).contentKeys.get(idx) === true,
846+
)
850847
: contentKeys.filter(
851-
(n, idx) => (<AcceptMessage<1>>msg).contentKeys[idx] === AcceptCode.ACCEPT,
852-
)
848+
(n, idx) => (<AcceptMessage<1>>msg).contentKeys[idx] === AcceptCode.ACCEPT,
849+
)
853850
if (requestedKeys.length === 0) {
854851
// Don't start uTP stream if no content ACCEPTed
855852
this.logger.extend('ACCEPT')(`No content ACCEPTed by ${shortId(enr.nodeId)}`)
@@ -911,8 +908,7 @@ export class BeaconNetwork extends BaseNetwork {
911908
*/
912909
override handleOffer = async (src: INodeAddress, requestId: Uint8Array, msg: OfferMessage) => {
913910
this.logger.extend('OFFER')(
914-
`Received from ${shortId(src.nodeId, this.routingTable)} with ${
915-
msg.contentKeys.length
911+
`Received from ${shortId(src.nodeId, this.routingTable)} with ${msg.contentKeys.length
916912
} pieces of content.`,
917913
)
918914
try {

packages/portalnetwork/src/networks/history/history.ts

Lines changed: 121 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,16 @@ import debug from 'debug'
55

66
import type {
77
BaseNetworkConfig,
8+
BeaconNetwork,
89
ContentLookupResponse,
910
EphemeralHeaderKeyValues,
1011
FindContentMessage,
1112
INodeAddress,
13+
OfferMessage,
14+
Version,
1215
} from '../../index.js'
1316
import {
17+
AcceptCode,
1418
BiMap,
1519
ClientInfoAndCapabilities,
1620
ContentMessageType,
@@ -40,6 +44,7 @@ import {
4044
BlockHeaderWithProof,
4145
BlockNumberKey,
4246
CANCUN_BLOCK,
47+
EphemeralHeaderOfferPayload,
4348
EphemeralHeaderPayload,
4449
HistoricalRootsBlockProof,
4550
HistoryNetworkContentType,
@@ -59,6 +64,7 @@ import type { ENR } from '@chainsafe/enr'
5964

6065
import { RunStatusCode } from '@lodestar/light-client'
6166
import type { Debugger } from 'debug'
67+
import type { LightClientHeader } from '@lodestar/types/lib/deneb/types.js'
6268

6369
export class HistoryNetwork extends BaseNetwork {
6470
networkId: NetworkId.HistoryNetwork
@@ -519,8 +525,7 @@ export class HistoryNetwork extends BaseNetwork {
519525

520526
/**
521527
* Convenience method to add content for the History Network to the DB
522-
* @param contentType - content type of the data item being stored
523-
* @param hashKey - hex string representation of blockHash or epochHash
528+
* @param contentKey - content key of the data item being stored
524529
* @param value - hex string representing RLP encoded blockheader, block body, or block receipt
525530
* @throws if `blockHash` or `value` is not hex string
526531
*/
@@ -617,27 +622,23 @@ export class HistoryNetwork extends BaseNetwork {
617622
}
618623
}
619624
case HistoryNetworkContentType.EphemeralHeaderOffer: {
620-
const payload = EphemeralHeaderPayload.deserialize(value)
621-
if (payload.length === 0) {
622-
this.logger.extend('STORE')('Received empty ephemeral header payload')
623-
return
624-
}
625-
const header = createBlockHeaderFromRLP(payload[0], { setHardfork: true })
625+
const payload = EphemeralHeaderOfferPayload.deserialize(value)
626+
const header = createBlockHeaderFromRLP(payload.header, { setHardfork: true })
626627
// Check if we already have this header
627628
if (this.ephemeralHeaderIndex.getByValue(bytesToHex(header.hash())) !== undefined) {
628629
this.logger.extend('STORE')(`Ephemeral header ${bytesToHex(header.hash())} already exists`)
629630
return
630631
}
631632
const hashKey = getEphemeralHeaderDbKey(header.hash())
632-
await this.put(hashKey, bytesToHex(payload[0]))
633+
await this.put(hashKey, bytesToHex(header.serialize()))
633634
this.ephemeralHeaderIndex.set(header.number, bytesToHex(header.hash()))
634635
break
635636
}
636637
}
637638

638639
this.emit('ContentAdded', contentKey, value)
639640
if (this.routingTable.values().length > 0) {
640-
if (contentType !== HistoryNetworkContentType.EphemeralHeader) {
641+
if (contentType !== HistoryNetworkContentType.EphemeralHeader && contentType !== HistoryNetworkContentType.EphemeralHeaderOffer && contentType !== HistoryNetworkContentType.EphemeralHeaderFindContent) {
641642
// Gossip new content to network except for ephemeral headers
642643
this.gossipManager.add(contentKey)
643644
}
@@ -738,4 +739,114 @@ export class HistoryNetwork extends BaseNetwork {
738739
this.logger.extend('FOUNDCONTENT').extend('EPHEMERALHEADERS')(`Found ${headers.length - 1} ancestor headers out of ${ancestorCount} requested for ${bytesToHex(blockHash)}`)
739740
return EphemeralHeaderPayload.serialize(headers)
740741
}
742+
743+
protected async handleOffer(
744+
src: INodeAddress,
745+
requestId: Uint8Array,
746+
msg: OfferMessage,
747+
version: Version,
748+
) {
749+
this.logger.extend('ACCEPT')(
750+
`Received from ${shortId(src.nodeId, this.routingTable)} with ${msg.contentKeys.length
751+
} pieces of content.`,
752+
)
753+
const decodedContentKeys = msg.contentKeys.map(key => decodeHistoryNetworkContentKey(key))
754+
// Check to see if the first content key is for ephemeral headers. If so, we expect all
755+
// content keys to be for ephemeral headers.
756+
if (decodedContentKeys[0].contentType === HistoryNetworkContentType.EphemeralHeaderOffer) {
757+
this.logger.extend('OFFER').extend('EPHEMERALHEADERS')('Received offer for ephemeral headers starting with block hash: ' + bytesToHex(decodedContentKeys[0].keyOpt as Uint8Array))
758+
const contentIds: number[] = Array(msg.contentKeys.length).fill(AcceptCode.GENERIC_DECLINE)
759+
const desiredContentKeys: Uint8Array[] = []
760+
let headHash: Uint8Array | undefined
761+
let headHashIndex = -1
762+
do {
763+
for (const key of decodedContentKeys) {
764+
if (key.contentType !== HistoryNetworkContentType.EphemeralHeaderOffer) {
765+
this.logger.extend('ACCEPT').extend('EPHEMERALHEADERS')('Received non-ephemeral header in offer for ephemeral headers. Declining offer.')
766+
// If we get an offer for ephemeral headers, all offered content keys should be for ephemeral headers
767+
// TODO: Ban/descore peers who send spec-noncompliant offers
768+
break
769+
}
770+
}
771+
const beacon = this.portal.networks.get(NetworkId.BeaconChainNetwork) as BeaconNetwork
772+
if (beacon === undefined || (beacon.lightClient?.status !== RunStatusCode.started && beacon.lightClient?.status !== RunStatusCode.syncing)) {
773+
// We can't validate ephemeral headers if our light client is not active and/or syncing
774+
this.logger.extend('ACCEPT').extend('EPHEMERALHEADERS')('Light client is not active and/or syncing. Declining offer.')
775+
break
776+
}
777+
// TODO: Make this fork safe (and not assume deneb)
778+
headHash = (beacon.lightClient.getHead() as LightClientHeader).execution.blockHash
779+
headHashIndex = decodedContentKeys.findIndex((key) => equalsBytes(key.keyOpt as Uint8Array, headHash!))
780+
if (headHashIndex === -1) {
781+
// If our known head hash isn't in the request, we can't validate other ephemeral headers so decline
782+
this.logger.extend('ACCEPT').extend('EPHEMERALHEADERS')('Known head hash not found in offer. Declining offer.')
783+
break
784+
}
785+
for (let i = headHashIndex; i < decodedContentKeys.length; i++) {
786+
const key = decodedContentKeys[i]
787+
if (this.ephemeralHeaderIndex.getByValue(bytesToHex(key.keyOpt as Uint8Array)) === undefined) {
788+
contentIds[i] = AcceptCode.ACCEPT
789+
desiredContentKeys.push(msg.contentKeys[i])
790+
}
791+
}
792+
this.logger.extend('ACCEPT').extend('EPHEMERALHEADERS')(`Sending accept for ${desiredContentKeys.length} desired headers`)
793+
// biome-ignore lint/correctness/noConstantCondition: We only want to do `sendAccept` once
794+
} while (false)
795+
796+
await this.sendAccept(src, requestId, contentIds, desiredContentKeys, version)
797+
798+
// Set up gossip parameters
799+
// 1) Define our gossip function (gossip ephemeral headers to random peers)
800+
// 2) Set up listener
801+
// 3) Send accept
802+
// await new Promise(resolve => {
803+
// // We use a custom gossip function here because ephemeral headers cannot be gossiped with other content types
804+
// const gossipEphemeralHeaders = async (contentKey: Uint8Array) => {
805+
// if (desiredContentKeys.length < 1) {
806+
// this.removeListener('ContentAdded', gossipEphemeralHeaders)
807+
// resolve(true)
808+
// return
809+
// }
810+
811+
// if (equalsBytes(desiredContentKeys[desiredContentKeys.length - 1], contentKey)) {
812+
// // Once we've received the last desired header, gossip all offered ephemeral headers starting with the head hash
813+
// // We either already have all of the headers or have received them from this gossip message and the spec calls for
814+
// // us to neighborhood gossip all of these
815+
// this.removeListener('ContentAdded', gossipEphemeralHeaders)
816+
// const content = []
817+
// for (const key of decodedContentKeys.slice(headHashIndex)) {
818+
// const value = await this.get(getEphemeralHeaderDbKey(key.keyOpt as Uint8Array))
819+
// if (value === undefined) {
820+
// // This shouldn't happen but short circuit here to avoid trying to gossip content we don't have
821+
// this.logger.extend('GOSSIP').extend('EPHEMERALHEADERS')(`Expected header ${bytesToHex(key.keyOpt as Uint8Array)} not found`)
822+
// resolve(false)
823+
// }
824+
// content.push(hexToBytes(value as PrefixedHexString))
825+
// }
826+
// const gossipPromises = []
827+
// // TODO: Replace 5 with a constant defined in History types once the proper number of peers is defined
828+
// for (let i = 0; i < 5; i++) {
829+
// let enr: ENR | undefined = this.routingTable.random()
830+
// while (enr === undefined) {
831+
// enr = this.routingTable.random()
832+
// }
833+
// const offerKeys = msg.contentKeys.slice(headHashIndex)
834+
// gossipPromises.push(this.sendOffer(enr, offerKeys, content))
835+
// }
836+
837+
// await Promise.allSettled(gossipPromises)
838+
// resolve(true)
839+
// }
840+
// }
841+
// this.addListener('ContentAdded', gossipEphemeralHeaders)
842+
843+
// if (desiredContentKeys.length < 1) {
844+
// // Clean up listener and return early if we don't accept any headers
845+
// this.removeListener('ContentAdded', gossipEphemeralHeaders)
846+
// return
847+
// }
848+
// })
849+
} else
850+
await super.handleOffer(src, requestId, msg, version)
851+
}
741852
}

packages/portalnetwork/src/networks/history/util.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ export const decodeHistoryNetworkContentKey = (
148148
}
149149
| {
150150
contentType: HistoryNetworkContentType.EphemeralHeaderFindContent
151-
keyOpt: EphemeralHeaderKeyValues
151+
keyOpt:
152+
| EphemeralHeaderKeyValues
152153
| {
153154
contentType: HistoryNetworkContentType.EphemeralHeaderOffer
154155
keyOpt: Uint8Array
@@ -170,7 +171,8 @@ export const decodeHistoryNetworkContentKey = (
170171
keyOpt: key,
171172
}
172173
}
173-
case HistoryNetworkContentType.EphemeralHeader: throw new Error('EphemeralHeader is only for internal use')
174+
case HistoryNetworkContentType.EphemeralHeader:
175+
throw new Error('EphemeralHeader is only for internal use')
174176
default: {
175177
const blockHash = contentKey.slice(1)
176178
return {
@@ -278,7 +280,10 @@ export const addRLPSerializedBlock = async (
278280
setHardfork: true,
279281
})
280282
const header = block.header
281-
const headerKey = getContentKey(HistoryNetworkContentType.BlockHeader, hexToBytes(blockHash as PrefixedHexString))
283+
const headerKey = getContentKey(
284+
HistoryNetworkContentType.BlockHeader,
285+
hexToBytes(blockHash as PrefixedHexString),
286+
)
282287
const headerProof = BlockHeaderWithProof.serialize({
283288
header: header.serialize(),
284289
proof,
@@ -303,7 +308,9 @@ export const blockNumberToLeafIndex = (blockNumber: bigint) => {
303308
return (Number(blockNumber) % 8192) * 2
304309
}
305310
export const epochRootByIndex = (index: number) => {
306-
return historicalEpochs[index] ? hexToBytes(historicalEpochs[index] as PrefixedHexString) : undefined
311+
return historicalEpochs[index]
312+
? hexToBytes(historicalEpochs[index] as PrefixedHexString)
313+
: undefined
307314
}
308315
export const epochRootByBlocknumber = (blockNumber: bigint) => {
309316
return epochRootByIndex(epochIndexByBlocknumber(blockNumber))

packages/portalnetwork/src/networks/network.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,7 @@ export abstract class BaseNetwork extends EventEmitter {
599599
* Offers content corresponding to `contentKeys` to peer corresponding to `dstId`
600600
* @param dstId node ID of a peer
601601
* @param contentKeys content keys being offered as specified by the subnetwork
602-
* @param networkId network ID of subnetwork being used
602+
* @param content content being offered
603603
*/
604604
public sendOffer = async (enr: ENR, contentKeys: Uint8Array[], content?: Uint8Array[]) => {
605605
let version
@@ -698,12 +698,12 @@ export abstract class BaseNetwork extends EventEmitter {
698698
}
699699
}
700700

701-
protected handleOffer = async (
701+
protected async handleOffer(
702702
src: INodeAddress,
703703
requestId: Uint8Array,
704704
msg: OfferMessage,
705705
version: Version,
706-
) => {
706+
) {
707707
this.logger.extend('OFFER')(
708708
`Received from ${shortId(src.nodeId, this.routingTable)} with ${msg.contentKeys.length
709709
} pieces of content.`,
@@ -884,7 +884,7 @@ export abstract class BaseNetwork extends EventEmitter {
884884
} pieces of content. connectionId: ${id}`,
885885
)
886886
const enr = this.findEnr(src.nodeId) ?? src
887-
await this.handleNewRequest({
887+
const req = await this.handleNewRequest({
888888
networkId: this.networkId,
889889
contentKeys: desiredContentKeys,
890890
enr,

0 commit comments

Comments
 (0)