Skip to content

Commit 432f8df

Browse files
committed
first draft of getBlobs
1 parent 135febc commit 432f8df

File tree

3 files changed

+121
-9
lines changed

3 files changed

+121
-9
lines changed

beacon_chain/el/el_manager.nim

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type
5757
GetPayloadV3Response |
5858
GetPayloadV4Response
5959

60+
6061
contract(DepositContract):
6162
proc deposit(pubkey: PubKeyBytes,
6263
withdrawalCredentials: WithdrawalCredentialsBytes,
@@ -108,6 +109,8 @@ const
108109
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/shanghai.md#request-2
109110
GETPAYLOAD_TIMEOUT = 1.seconds
110111

112+
# https://github.com/ethereum/execution-apis/blob/ad9102b11212d51b736a0413c8655a8da93e55fc/src/engine/cancun.md#request-3
113+
GETBLOBS_TIMEOUT = 1.seconds
111114
connectionStateChangeHysteresisThreshold = 15
112115
## How many unsuccesful/successful requests we must see
113116
## before declaring the connection as degraded/restored
@@ -862,6 +865,13 @@ proc sendNewPayloadToSingleEL(
862865
payload, versioned_hashes, Hash32 parent_beacon_block_root,
863866
executionRequests)
864867

868+
proc sendGetBlobsToSingleEL(
869+
connection: ELConnection,
870+
versioned_hashes: seq[engine_api.VersionedHash]
871+
): Future[GetBlobsV1Response] {.async: (raises: [CatchableError]).} =
872+
let rpcClient = await connection.connectedRpcClient()
873+
await rpcClient.engine_getBlobsV1(versioned_hashes)
874+
865875
type
866876
StatusRelation = enum
867877
newStatusIsPreferable
@@ -990,6 +1000,61 @@ proc lazyWait(futures: seq[FutureBase]) {.async: (raises: []).} =
9901000
if len(pending) > 0:
9911001
await noCancel allFutures(pending)
9921002

1003+
proc sendGetBlobs*(
1004+
m: ELManager,
1005+
blck: electra.SignedBeaconBlock | fulu.SignedBeaconBlock
1006+
): Future[Opt[seq[BlobAndProofV1]]] {.async: (raises: [CancelledError]).} =
1007+
if m.elConnections.len == 0:
1008+
return err()
1009+
let
1010+
timeout = GETBLOBS_TIMEOUT
1011+
deadline = sleepAsync(timeout)
1012+
1013+
var bestResponse = Opt.none(int)
1014+
1015+
while true:
1016+
let
1017+
requests = m.elConnections.mapIt(
1018+
sendGetBlobsToSingleEL(it, mapIt(
1019+
blck.message.body.blob_kzg_commitments,
1020+
engine_api.VersionedHash(kzg_commitment_to_versioned_hash(it)))))
1021+
timeoutExceeded =
1022+
try:
1023+
await allFutures(requests).wait(deadline)
1024+
false
1025+
except AsyncTimeoutError:
1026+
true
1027+
except CancelledError as exc:
1028+
let pending =
1029+
requests.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
1030+
await noCancel allFutures(pending)
1031+
raise exc
1032+
1033+
for idx, req in requests:
1034+
if not(req.finished()):
1035+
warn "Timeout while getting blob and proof",
1036+
url = m.elConnections[idx].engineUrl.url,
1037+
reason = req.error.msg
1038+
else:
1039+
if bestResponse.isNone:
1040+
bestResponse = Opt.some(idx)
1041+
1042+
let pending =
1043+
requests.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
1044+
await noCancel allFutures(pending)
1045+
1046+
if bestResponse.isSome():
1047+
return ok(requests[bestResponse.get()].value().blobsAndProofs)
1048+
1049+
else:
1050+
# should not reach this case
1051+
discard
1052+
1053+
if timeoutExceeded:
1054+
break
1055+
1056+
err()
1057+
9931058
proc sendNewPayload*(
9941059
m: ELManager,
9951060
blck: SomeForkyBeaconBlock,

beacon_chain/nimbus_beacon_node.nim

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88
{.push raises: [].}
99

1010
import
11-
std/[os, random, terminal, times, exitprocs],
11+
std/[os, random, terminal, times, exitprocs, sequtils],
1212
chronos, chronicles,
1313
metrics, metrics/chronos_httpserver,
14+
ssz_serialization/types,
1415
stew/[byteutils, io2],
1516
eth/p2p/discoveryv5/[enr, random2],
1617
./consensus_object_pools/[
@@ -459,21 +460,67 @@ proc initFullNode(
459460
maybeFinalized: bool):
460461
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
461462
withBlck(signedBlock):
462-
when consensusFork >= ConsensusFork.Deneb:
463+
when consensusFork >= ConsensusFork.Electra:
464+
# Pull blobs and proofs from the EL blob pool
465+
let blobsFromElOpt = await node.elManager.sendGetBlobs(forkyBlck)
466+
if blobsFromElOpt.isSome():
467+
let blobsEl = blobsFromElOpt.get()
468+
# check lengths of array[BlobAndProofV1] with blob
469+
# kzg commitments of the signed block
470+
if blobsEl.len == forkyBlck.message.body.blob_kzg_commitments.len:
471+
# create blob sidecars from EL instead
472+
var
473+
kzgBlbs: deneb.Blobs
474+
kzgPrfs: deneb.KzgProofs
475+
476+
for idx in 0..<blobsEl.len:
477+
kzgBlbs[idx] = blobsEl[idx].blob.data
478+
kzgPrfs[idx].bytes = blobsEl[idx].proof.data
479+
let blob_sidecars_el =
480+
create_blob_sidecars(forkyBlck, kzgPrfs, kzgBlbs)
481+
482+
# populate blob quarantine to tackle blob loop
483+
for blb_el in blob_sidecars_el:
484+
blobQuarantine[].put(newClone blb_el)
485+
486+
# now pop blobQuarantine and make block available for attestation
487+
let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
488+
return await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
489+
Opt.some(blobs),
490+
maybeFinalized = maybeFinalized)
491+
492+
# in case EL does not support `engine_getBlobsV1`
493+
else:
494+
if not blobQuarantine[].hasBlobs(forkyBlck):
495+
# We don't have all the blobs for this block, so we have
496+
# to put it in blobless quarantine.
497+
if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck):
498+
return err(VerifierError.UnviableFork)
499+
else:
500+
return err(VerifierError.MissingParent)
501+
else:
502+
let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
503+
return await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
504+
Opt.some(blobs),
505+
maybeFinalized = maybeFinalized)
506+
507+
elif consensusFork >= ConsensusFork.Deneb and
508+
consensusFork < ConsensusFork.Electra:
463509
if not blobQuarantine[].hasBlobs(forkyBlck):
464510
# We don't have all the blobs for this block, so we have
465511
# to put it in blobless quarantine.
466512
if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck):
467-
err(VerifierError.UnviableFork)
513+
return err(VerifierError.UnviableFork)
468514
else:
469-
err(VerifierError.MissingParent)
515+
return err(VerifierError.MissingParent)
470516
else:
471517
let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
472-
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
473-
Opt.some(blobs),
474-
maybeFinalized = maybeFinalized)
518+
return await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
519+
Opt.some(blobs),
520+
maybeFinalized = maybeFinalized)
521+
475522
else:
476-
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
523+
return await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
477524
Opt.none(BlobSidecars),
478525
maybeFinalized = maybeFinalized)
479526
rmanBlockLoader = proc(

0 commit comments

Comments
 (0)