diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 04d67e1505..8947e3a264 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -28,7 +28,7 @@ import ./spec/datatypes/[base, altair], ./spec/eth2_apis/dynamic_fee_recipients, ./spec/signatures_batch, - ./sync/[sync_manager, request_manager, sync_types], + ./sync/[sync_manager, request_manager, sync_types, column_syncer], ./validators/[ action_tracker, message_router, validator_monitor, validator_pool, keystore_management], @@ -96,6 +96,7 @@ type vcProcess*: Process requestManager*: RequestManager syncManager*: SyncManager[Peer, PeerId] + columnManager*: ColumnManager[Peer, PeerId] backfiller*: SyncManager[Peer, PeerId] untrustedManager*: SyncManager[Peer, PeerId] syncOverseer*: SyncOverseerRef diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index d12ba63bb8..ab9fc65ff2 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -133,6 +133,10 @@ type Light = "light", Lenient = "lenient" + ColumnSyncerStrategy* {.pure.} = enum + Greedy = "greedy" + Impartial = "impartial" + BeaconNodeConf* = object configFile* {. desc: "Loads the configuration from a TOML file" @@ -559,6 +563,14 @@ type desc: "Maximum number of sync committee periods to retain light client data" name: "light-client-data-max-periods" .}: Option[uint64] + columnSyncerStrategy* {. + hidden + desc: "Choose how to sync columns, " & + " 1) Greedy would make the BN actively filter peers based on higher column custody. " & + " 2) Impartial would rely on every peer for columns till requisite column count is reached. " + defaultValue: ColumnSyncerStrategy.Impartial + name: "debug-column-syncer".}: ColumnSyncerStrategy + longRangeSync* {. hidden desc: "Enable long-range syncing (genesis sync)", diff --git a/beacon_chain/networking/peer_scores.nim b/beacon_chain/networking/peer_scores.nim index c6f2c10bd4..e95586306f 100644 --- a/beacon_chain/networking/peer_scores.nim +++ b/beacon_chain/networking/peer_scores.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018-2024 Status Research & Development GmbH +# Copyright (c) 2018-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -14,6 +14,11 @@ const ## Score after which peer will be kicked PeerScoreHighLimit* = 1000 ## Max value of peer's score + PeerScoreSupernode* = 700 + ## Score assigned to a supernode in order to prevent disconnection + PeerScoreIntersectingColumns* = 200 + ## Score assigned to a node with intersecting columns + ## to the local BN in order to prevent disconnection PeerScorePoorRequest* = -50 ## This peer is not responding on time or behaving improperly otherwise PeerScoreInvalidRequest* = -500 diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 809d37507f..a9d1e28ec7 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -22,7 +22,9 @@ import ./spec/[ deposit_snapshots, engine_authentication, weak_subjectivity, peerdas_helpers], - ./sync/[sync_protocol, light_client_protocol, sync_overseer], + ./sync/[ + column_syncer, sync_protocol, light_client_protocol, + sync_overseer, column_syncer_assist], ./validators/[keystore_management, beacon_validators], "."/[ beacon_node, beacon_node_light_client, deposits, @@ -445,7 +447,16 @@ proc initFullNode( config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming, batchVerifier, consensusManager, node.validatorMonitor, blobQuarantine, dataColumnQuarantine, getBeaconTime) - + peerdasBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, + columns: Opt[DataColumnSidecars], maybeFinalized: bool): + Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = + # The design with a callback for block verification is unusual compared + # to the rest of the application, but fits with the general approach + # taken in the sync/request managers - this is an architectural compromise + # that should probably be reimagined more holistically in the future. + blockProcessor[].addBlock( + MsgSource.gossip, signedBlock, Opt.none(BlobSidecars), columns, + maybeFinalized = maybeFinalized) blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = @@ -536,6 +547,29 @@ proc initFullNode( {SyncManagerFlag.NoGenesisSync} else: {} + columnManagerModes = + if node.config.longRangeSync != LongRangeSyncMode.Lenient: + {ColumnSyncerMode.NoGenesisSync} + else: + {} + columnSyncerFlags = + if node.config.columnSyncerStrategy == ColumnSyncerStrategy.Impartial: + {ColumnSyncerFlag.Impartial} + else: + {ColumnSyncerFlag.Greedy} + + columnManager = newColumnManager[Peer, PeerId]( + node.network.peerPool, dag.cfg, supernode, custody_columns_set, + custody_columns_list, dag.cfg.FULU_FORK_EPOCH, + dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, + dag.cfg.MAX_BLOBS_PER_BLOCK_ELECTRA, + ColumnSyncerDirection.Forward, getLocalHeadSlot, + getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, + getFrontfillSlot, isWithinWeakSubjectivityPeriod, + dag.tail.slot, peerdasBlockVerifier, + shutdownEvent = node.shutdownEvent, + flags = columnSyncerFlags, + modes = columnManagerModes) syncManager = newSyncManager[Peer, PeerId]( node.network.peerPool, dag.cfg.DENEB_FORK_EPOCH, @@ -645,6 +679,7 @@ proc initFullNode( node.blockProcessor = blockProcessor node.consensusManager = consensusManager node.requestManager = requestManager + node.columnManager = columnManager node.syncManager = syncManager node.backfiller = backfiller node.untrustedManager = untrustedManager @@ -1895,6 +1930,28 @@ func formatNextConsensusFork( (if withVanityArt: nextConsensusFork.getVanityMascot & " " else: "") & $nextConsensusFork & ":" & $nextForkEpoch) +func columnSyncStatus(node: BeaconNode, wallSlot: Slot): string = + let optimisticHead = not node.dag.head.executionValid + if node.columnManager.inProgress: + let + optimisticSuffix = + if optimisticHead: + "/opt" + else: + "" + lightClientSuffix = + if node.consensusManager[].shouldSyncOptimistically(wallSlot): + " - lc: " & $shortLog(node.consensusManager[].optimisticHead) + else: + "" + node.columnManager.syncStatus & optimisticSuffix & lightClientSuffix + elif node.backfiller.inProgress: + "backfill: " & node.backfiller.syncStatus + elif optimistic_head: + "synced/opt" + else: + "synced" + func syncStatus(node: BeaconNode, wallSlot: Slot): string = node.syncOverseer.statusMsg.valueOr: let optimisticHead = not node.dag.head.executionValid @@ -1946,6 +2003,7 @@ proc onSlotStart(node: BeaconNode, wallTime: BeaconTime, slot = shortLog(wallSlot) epoch = shortLog(wallSlot.epoch) sync = node.syncStatus(wallSlot) + columnSync = node.columnSyncStatus(wallSlot) peers = len(node.network.peerPool) head = shortLog(node.dag.head) finalized = shortLog(getStateField( @@ -2256,6 +2314,7 @@ proc run(node: BeaconNode) {.raises: [CatchableError].} = node.network.cfg.FULU_FORK_EPOCH: node.requestManager.switchToColumnLoop() node.syncOverseer.start() + node.columnManager.start() waitFor node.updateGossipStatus(wallSlot) @@ -2438,6 +2497,9 @@ when not defined(windows): of "sync_status": node.syncStatus(node.currentSlot) + + of "column_sync_status": + node.columnSyncStatus(node.currentSlot) else: # We ignore typos for now and just render the expression # as it was written. TODO: come up with a good way to show diff --git a/beacon_chain/sync/column_syncer.nim b/beacon_chain/sync/column_syncer.nim new file mode 100644 index 0000000000..75e5e156a9 --- /dev/null +++ b/beacon_chain/sync/column_syncer.nim @@ -0,0 +1,1285 @@ +# beacon_chain +# Copyright (c) 2018-2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import std/[strutils, sequtils, algorithm] +import stew/base10, chronos, chronicles, results +import + ../spec/datatypes/[phase0, altair], + ../spec/eth2_apis/rest_types, + ../spec/[helpers, forks, network, peerdas_helpers], + ../networking/[peer_pool, peer_scores, eth2_network], + ../gossip_processing/block_processor, + ../beacon_clock, + "."/[sync_protocol, sync_queue, column_syncer_assist] + +export phase0, altair, merge, chronos, chronicles, results, + helpers, peer_scores, sync_queue, forks, sync_protocol + +const + ColumnSyncWorkerCount* = 15 + ## Number of workers to spawn for column syncing + + StatusUpdateInterval* = chronos.minutes(1) + + StatusExpirationTime* = chronos.minutes(2) + +type + ColumnSyncerStatus* {.pure.} = enum + Sleeping, WaitingPeer, UpdatingStatus, Requesting, Downloading, + Queueing, Processing + + ColumnAndBlockResponse* = object + blk*: fulu.SignedBeaconBlock + columns*: Opt[DataColumnSidecars] + + ColumnSyncerFlag* {.pure.} = enum + Greedy, Impartial + + ColumnSyncerMode* {.pure.} = enum + NoMonitor, NoGenesisSync + + ColumnSyncer*[A, B] = object + future: Future[void].Raising([CancelledError]) + status: ColumnSyncerStatus + + ColumnManager*[A, B] = ref object + pool: PeerPool[A, B] + cfg*: RuntimeConfig + amIsupernode*: bool + custody_columns_set*: HashSet[ColumnIndex] + custody_columns_list*: List[ColumnIndex, NUMBER_OF_COLUMNS] + column_syncer_table*: OrderedTable[Slot, ColumnAndBlockResponse] + FULU_FORK_EPOCH: Epoch + MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: uint64 + MAX_BLOBS_PER_BLOCK_ELECTRA: uint64 + responseTimeout: chronos.Duration + maxHeadAge: uint64 + isWithinWeakSubjectivityPeriod: GetBoolCallback + assist*: ColumnSyncerAssist[A] + getLocalHeadSlot: GetSlotCallback + getLocalWallSlot: GetSlotCallback + getSafeSlot: GetSlotCallback + getFirstSlot: GetSlotCallback + getLastSlot: GetSlotCallback + progressPivot: Slot + workers: array[ColumnSyncWorkerCount, ColumnSyncer[A, B]] + notInSyncEvent: AsyncEvent + shutdownEvent: AsyncEvent + rangeAge: uint64 + chunkSize: uint64 + columnSyncFut: Future[void].Raising([CancelledError]) + peerdasBlockVerifier: PeerdasBlockVerifier + inProgress*: bool + insSyncSpeed*: float + avgSyncSpeed*: float + syncStatus*: string + direction: ColumnSyncerDirection + flags: set[ColumnSyncerFlag] + modes: set[ColumnSyncerMode] + + ColumnSyncTimestamp* = object + timestamp*: chronos.Moment + slots*: uint64 + + BeaconBlocksRes = + NetRes[List[ref ForkedSignedBeaconBlock, Limit MAX_REQUEST_BLOCKS]] + + DataColumnSidecarsRes = + NetRes[List[ref DataColumnSidecar, Limit MAX_REQUEST_DATA_COLUMN_SIDECARS]] + +proc now*(cst: typedesc[ColumnSyncTimestamp], + slots: uint64): + ColumnSyncTimestamp {.inline.} = + ColumnSyncTimestamp(timestamp: now(chronos.Moment), slots: slots) + +proc speed*(start, finish: ColumnSyncTimestamp): float {.inline.} = + ## Returns the number of slots per second + ## it is syncing columns with + if finish.slots <= start.slots or finish.timestamp <= start.timestamp: + 0.0 + else: + let + slots = float(finish.slots - start.slots) + dur = toFloatSeconds(finish.timestamp - start.timestamp) + slots / dur + +proc initColumnSyncerAssist[A, B](man: ColumnManager[A, B]) = + case man.direction + of ColumnSyncerDirection.Forward: + man.assist = ColumnSyncerAssist.init(A, man.direction, man.getFirstSlot(), + man.getLastSlot(), man.chunkSize, + man.getSafeSlot, man.peerdasBlockVerifier, 1) + of ColumnSyncerDirection.Backward: + let + firstSlot = man.getFirstSlot() + lastSlot = man.getLastSlot() + startSlot = if firstSlot == lastSlot: + # This case should never happen in real life because + # there is present check `needsBackfill()`. + firstSlot + else: + firstSlot - 1'u64 + man.assist = ColumnSyncerAssist.init(A, man.direction, startSlot, lastSlot, + man.chunkSize, man.getSafeSlot, + man.peerdasBlockVerifier, 1) + +proc newColumnManager*[A, B]( + pool: PeerPool[A, B], + cfg: RuntimeConfig, + amIsupernode: bool, + custody_columns_set: HashSet[ColumnIndex], + custody_columns_list: List[ColumnIndex, NUMBER_OF_COLUMNS], + fuluEpoch: Epoch, + minEpochsForBlobSidecarsRequests: uint64, + maxBlobsPerBlockElectra: uint64, + direction: ColumnSyncerDirection, + getLocalHeadSlotCb: GetSlotCallback, + getLocalWallSlotCb: GetSlotCallback, + getFinalizedSlotCb: GetSlotCallback, + getBackfillSlotCb: GetSlotCallback, + getFrontfillSlotCb: GetSlotCallback, + weakSubjectivityPeriodCb: GetBoolCallback, + progressPivot: Slot, + peerdasBlockVerifier: PeerdasBlockVerifier, + shutdownEvent: AsyncEvent, + maxHeadAge = uint64(SLOTS_PER_EPOCH * 1), + chunkSize = uint64(SLOTS_PER_EPOCH), + flags: set[ColumnSyncerFlag] = {}, + modes: set[ColumnSyncerMode] = {} +): ColumnManager[A, B] = + + let (getFirstSlot, getLastSlot, getSafeSlot) = case direction + of ColumnSyncerDirection.Forward: + (getLocalHeadSlotCb, getLocalWallSlotCb, getFinalizedSlotCb) + of ColumnSyncerDirection.Backward: + (getBackfillSlotCb, getFrontfillSlotCb, getBackfillSlotCb) + + var res = ColumnManager[A, B]( + pool: pool, + cfg: cfg, + amIsupernode: amIsupernode, + custody_columns_set: custody_columns_set, + custody_columns_list: custody_columns_list, + column_syncer_table: initOrderedTable[Slot, ColumnAndBlockResponse](), + FULU_FORK_EPOCH: fuluEpoch, + MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: minEpochsForBlobSidecarsRequests, + MAX_BLOBS_PER_BLOCK_ELECTRA: maxBlobsPerBlockElectra, + getLocalHeadSlot: getLocalHeadSlotCb, + getLocalWallSlot: getLocalWallSlotCb, + isWithinWeakSubjectivityPeriod: weakSubjectivityPeriodCb, + getSafeSlot: getSafeSlot, + getFirstSlot: getFirstSlot, + getLastSlot: getLastSlot, + progressPivot: progressPivot, + shutdownEvent: shutdownEvent, + maxHeadAge: maxHeadAge, + chunkSize: chunkSize, + peerdasBlockVerifier: peerdasBlockVerifier, + notInSyncEvent: newAsyncEvent(), + direction: direction, + flags: flags, + modes: modes + ) + res.initColumnSyncerAssist() + res + +proc fetchBlocksForColumnNavigation[A, B](man: ColumnManager[A, B], peer: A, + req: ColumnSyncRequest[A]): Future[BeaconBlocksRes] + {.async: (raises: [CancelledError], raw: true).} = + mixin getScore, `==` + + debugEcho "fetching block for column navigation" + + doAssert(not(req.isEmpty()), "Request must not be empty!") + debug "Requesting blocks from peer", + peer_score = req.item.getScore(), + peer_speed = req.item.netKbps(), + topics = "columnsync" + + beaconBlocksByRange_v2(peer, req.slot, req.count, 1'u64) + +proc shouldGetDataColumns[A, B]( + man: ColumnManager[A, B], + s: Slot): bool = + + debugEcho "checking if we should get data columns in the current fork" + let + wallEpoch = man.getLocalWallSlot().epoch + epoch = s.epoch() + (epoch >= man.FULU_FORK_EPOCH) and + (wallEpoch < man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS or + epoch >= wallEpoch - man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS) + +proc shouldGetDataColumns[A, B]( + man: ColumnManager[A, B], + r: ColumnSyncRequest[A]): bool = + man.shouldGetDataColumns(r.slot) or man.shouldGetDataColumns(r.slot + (r.count - 1)) + +proc checkDataColumns(data_columns: seq[DataColumnSidecars]): + Result[void, string] = + debugEcho "verifying data columns from column syncer" + for data_column_sidecars in data_columns: + for data_column_sidecar in data_column_sidecars: + ? data_column_sidecar[].verify_data_column_sidecar_inclusion_proof() + let sync_check_dc = + data_column_sidecar[].verify_data_column_sidecar_kzg_proofs() + if sync_check_dc.isErr: + return err("Invalid data column received while column syncing") + +proc intersectionColumns[A, B]( + man: ColumnManager[A, B], + peer: A): List[ColumnIndex, NUMBER_OF_COLUMNS] = + debugEcho "computing interesecting columns for the current peer" + let + remoteNodeId = + fetchNodeIdFromPeerId(peer) + peer_cgc = + peer.lookupCgcFromPeer() + intersect_cgc = + intersection( + man.custody_columns_set, + man.cfg.resolve_columns_from_custody_groups(remoteNodeId, + max(SAMPLES_PER_SLOT.uint64, + peer_cgc)).toHashSet()) + var + intersect_list: List[ColumnIndex, NUMBER_OF_COLUMNS] + + for item in intersect_cgc: + discard intersect_list.add item + + intersect_list + +proc refreshColumnScoring[A, B]( + man: ColumnManager[A, B]) = + + for peer in availablePeers(man.pool): + + if peer.lookupCgcFromPeer() >= + (NUMBER_OF_CUSTODY_GROUPS.uint64 div 2): + ## This is a supernode we will positively + ## score the peer now to prevent disconnection + peer.updateScore(PeerScoreSupernode) + + else: + let + remoteNodeId = fetchNodeIdFromPeerId(peer) + remoteColumns = + remoteNodeId.resolve_column_sets_from_custody_groups( + max(SAMPLES_PER_SLOT.uint64, + peer.lookupCgcFromPeer())) + intersecting_columns = + intersection(man.custody_columns_set, + remoteColumns) + + if intersecting_columns.len > 0: + ## This peer has custody columns that + ## are a subset or superset of what we locally + ## custody hence we will positively score + ## the peer to prevent disconnection + peer.updateScore(PeerScoreIntersectingColumns) + +proc getDataColumnSidecarsByRange[A, B](man: ColumnManager[A, B], + peer: A, + r: ColumnSyncRequest[A], + req_cols: List[ColumnIndex, NUMBER_OF_COLUMNS]): + Future[DataColumnSidecarsRes] + {.async: (raises: [CancelledError], raw: true).} = + mixin getScore, `==` + + doAssert(not(r.isEmpty()), "Request must not be empty") + debug "Requesting data column sidecars by range from peer", + topics = "columnsync" + dataColumnSidecarsByRange(peer, r.slot, r.count, req_cols) + +proc remainingSlots(man: ColumnManager): uint64 = + let + first = man.getFirstSlot() + last = man.getLastSlot() + if man.direction == ColumnSyncerDirection.Forward: + if last > first: + man.getLastSlot() - man.getFirstSlot() + else: + 0'u64 + else: + if first > last: + man.getFirstSlot() - man.getLastSlot() + else: + 0'u64 + +proc filterRelevantPeers[A, B](man: ColumnManager[A, B], + peers: seq[A], + w_index: int): + Future[seq[A]] {.async: (raises: [CancelledError]).} = + ## This iterates over the available peers + ## and returns a refreshed peer list based on + ## whichever's peer status is recent and relevant + ## + + + + var + refreshed_peer_set: seq[A] + for peer in peers: + var + headSlot = man.getLocalHeadSlot() + wallSlot = man.getLocalWallSlot() + peerSlot = peer.getHeadSlot() + + debug "Peer's syncing status", peer = peer, + peer_score = peer.getScore(), peer_speed = peer.netKbps(), + wall_clock_slot = wallSlot, remote_head_slot = peerSlot, + local_head_slot = headSlot, direction = man.direction, + topics = "columnsync" + + let + peerStatusAge = Moment.now() - peer.getStatusLastTime() + needsUpdate = + # Latest update we got is old + peerStatusAge >= StatusExpirationTime or + man.getFirstSlot() >= peerSlot + + if needsUpdate: + man.workers[w_index].status = ColumnSyncerStatus.UpdatingStatus + + # Avoiding overflow of requests, but make them more frequent in case the + # peer is `close` to the slot range of our interest + if peerStatusAge < StatusExpirationTime div 2: + await sleepAsync(StatusExpirationTime div 2 - peerStatusAge) + + trace "Updating peer's status information", + peer = peer, peer_score = peer.getScore(), + peer_speed = peer.netKbps(), wall_clock_slot = wallSlot, + remote_head_slot = peerSlot, local_head_slot = headSlot, + direction = man.direction, topics = "columnsync" + + + if not(await peer.updateStatus()): + peer.updateScore(PeerScoreNoStatus) + debug "Failed to get remote peer's status, not including in refreshed list", + peer_head_slot = peerSlot, topics = "columnsync" + + let newPeerSlot = peer.getHeadSlot() + if peerSlot >= newPeerSlot: + peer.updateScore(PeerScoreStaleStatus) + debug "Peer's status information is stale", + wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot, + local_head_slot = headSlot, remote_new_head_slot = newPeerSlot, + topics = "columnsync" + + else: + debug "Peer's status information updated", wall_clock_slot = wallSlot, + remote_head_slot = peerSlot, local_head_slot = headSlot, + remote_new_head_slot = newPeerSlot, topics = "columnsync" + peer.updateScore(PeerScoreGoodStatus) + peerSlot = newPeerSlot + refreshed_peer_set.add(peer) + + refreshed_peer_set + +func groupAndFillColumnTable*[A, B]( + man: ColumnManager[A, B], + blocks: seq[ref ForkedSignedBeaconBlock], + columns: seq[ref DataColumnSidecar] +): Result[void, string] = + var grouped = newSeq[DataColumnSidecars](blocks.len) + var column_cursor = 0 + for block_idx, blck in blocks: + withBlck(blck[]): + when consensusFork >= ConsensusFork.Fulu: + template kzgs: untyped = forkyBlck.message.body.blob_kzg_commitments + if kzgs.len == 0: + # It means there were no columns published against this block + # So we will make a table entry for BlockAndColumnResponse + # where columns will be None + man.column_syncer_table[forkyBlck.message.slot] = + ColumnAndBlockResponse( + blk: forkyBlck, + columns: Opt.none(DataColumnSidecars)) + continue + + let header = forkyBlck.toSignedBeaconBlockHeader() + for column_idx in 0..= columns.len: + return err("DataColumnSidecar: Response is too short") + if column_sidecar.signed_block_header == header: + grouped[block_idx].add(column_sidecar) + else: + return err("DataColumnSidecar: unexpected signed_block_header") + inc column_cursor + # Make a table entry for the grouped columns + if grouped[block_idx].len > 0: + man.column_syncer_table[forkyBlck.message.slot] = + ColumnAndBlockResponse( + blk: forkyBlck, + columns: Opt.some(grouped[block_idx])) + + if column_cursor != columns.len: + # we reached the end of blocks without consuming all columns, + # so either we got too few blocks in the paired request, or the + # columns sent to us by the peer is malicious + ok() + else: + return err("DataColumnSidecar: invalid block/column is received") + +proc serializeColumnTable*[A, B]( + man: ColumnManager[A, B] +): Result[seq[DataColumnSidecars], string] = + # Iterate through the column syncer table + for k, v in man.column_syncer_table.pairs(): + if man.amIsupernode: + if v.columns.get.lenu64 >= (man.cfg.NUMBER_OF_COLUMNS div 2) and + v.columns.isSome(): + let + recovered_cps = + recover_cells_and_proofs(v.columns.get().mapIt(it[])) + reconstructed_columns = + get_data_column_sidecars(v.blk, recovered_cps.get()).mapIt(newClone it) + # Populate that particular entry with reconstructed columns + man.column_syncer_table[k] = + ColumnAndBlockResponse( + blk: v.blk, + columns: Opt.some(reconstructed_columns)) + + elif v.columns.get.lenu64 < (man.cfg.NUMBER_OF_COLUMNS div 2) and + v.columns.isSome(): + return err ("Requisite number of columns not yet reached") + + else: + if v.columns.get.lenu64 == max(man.cfg.CUSTODY_REQUIREMENT, man.cfg.SAMPLES_PER_SLOT) and + v.columns.isSome: + + # Do nothing, table entry is fine + discard + elif v.columns.get.lenu64 == max(man.cfg.CUSTODY_REQUIREMENT, man.cfg.SAMPLES_PER_SLOT) and + v.columns.isSome: + + # Retry as custody has not been reached yet + return err ("Requisite number of columns not yet reached") + + var grouped_serialized_columns: seq[DataColumnSidecars] + # Iterate once more to serialize the entries + for _, v in man.column_syncer_table.pairs(): + grouped_serialized_columns.add(v.columns.get) + + ok(grouped_serialized_columns) + +## Nimbus cares about the representation of home +## stakers and solo stakers, and we understand supernodes +## stand against the same ethos even if supernodes pose as +## a critical modification in terms of peerdas development +## +## Hence, to stand with our values, we shall provide the user +## 2 separate modes of column syncing: +## +## 1 - Greedy: In the `Greedy` mode we give a greater preference +## to supernodes or nodes that custody more than 50% of all +## data columns in the network for a given slot. Thereby, using +## either them or we are able to recover all data columns. +## +## We read this by checking whether the remote peer's advertised +## custody group count (cgc) is greater than half of the total +## columns per slot or not. +## +## 2 - Impartial: In the `Impartial` mode we consider every peer as +## `equal` instead of intersecting what columns we custody and they +## custody, we range request all the columns with what +## they can serve per slot, until we move to the goal of +## reconstructability. +## +## This way no peer in the Peer Pool becomes redundant, they were +## previously useful for blocks, and now they are useful for columns +## too. +## +## Slot \ Custody | 0 | 1 | 2 | 3 | 4 | ..n | +## -------------- | --- | --- | --- | --- | --- | --- | +## 0 | X | X | | X | X | X | +## 1 | X | X | X | | | X | +## 2 | X | | X | X | X | | +## 3 | X | X | | X | X | X | +## 4 | | X | X | | X | X | +## ..m | X | | X | X | | X | +## +## +## where say, Peerₐ → Column 1, 4, 17 +## Peerᵦ → Column 3, 5, 33 +## Peer𝒸 → Column 5, 45, 111 +## +## || +## after || +## reconstruction \/ +## +## +## Slot \ Custody | 0 | 1 | 2 | 3 | 4 | ..n | +## -------------- | --- | --- | --- | --- | --- | --- | +## 0 | X | X | X | X | X | X | +## 1 | X | X | X | X | X | X | +## 2 | X | X | X | X | X | X | +## 3 | X | X | X | X | X | X | +## 4 | X | X | X | X | X | X | +## ..m | X | X | X | X | X | X | +## + +proc columnSyncStrategyImpartial[A, B]( + man: ColumnManager[A, B], index: int, peer: A +) {.async: (raises: [CancelledError]).} = + + var + headSlot = man.getLocalHeadSlot() + wallSlot = man.getLocalWallSlot() + peerSlot = peer.getHeadSlot() + + block: + debug "Peer's syncing status", wall_clock_slot = wallSlot, + remote_head_slot = peerSlot, local_head_slot = headSlot, + direction = man.direction, topics = "columnsync" + + let + peerStatusAge = Moment.now() - peer.getStatusLastTime() + needsUpdate = + # Latest status we got is old + peerStatusAge >= StatusExpirationTime or + # The point we need to sync is close to where the peer is + man.getFirstSlot() >= peerSlot + + if needsUpdate: + man.workers[index].status = ColumnSyncerStatus.UpdatingStatus + + if peerStatusAge < StatusExpirationTime div 2: + await sleepAsync(StatusExpirationTime div 2 - peerStatusAge) + + trace "Updating peer's status information", wall_clock_slot = wallSlot, + remote_head_slot = peerSlot, local_head_slot = headSlot, + direction = man.direction, topics = "columnsync" + + if not(await peer.updateStatus()): + peer.updateScore(PeerScoreNoStatus) + debug "Failed to get remote peer's status, exiting", + peer_head_slot = peerSlot, topics = "columnsync" + return + + let newPeerSlot = peer.getHeadSlot() + if peerSlot >= newPeerSlot: + peer.updateScore(PeerScoreStaleStatus) + debug "Peer's status information is stale", + wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot, + local_head_slot = headSlot, remote_new_head_slot = newPeerSlot, + direction = man.direction, topics = "columnsync" + else: + debug "Peer's status information updated", wall_clock_slot = wallSlot, + remote_old_head_slot = peerSlot, local_head_slot = headSlot, + remote_new_head_slot = newPeerSlot, direction = man.direction, + topics = "columnsync" + peer.updateScore(PeerScoreGoodStatus) + peerSlot = newPeerSlot + + # Time passed - enough to move slots, if sleep happened + headSlot = man.getLocalHeadSlot() + wallSlot = man.getLocalWallSlot() + + if man.remainingSlots() <= man.maxHeadAge: + info "Column syncing is completed", wall_clock_slot = wallSlot, + remote_head_slot = peerSlot, local_head_slot = headSlot, + direction = man.direction, topics = "columnsync" + + # We clear ColumnManager's `notInSyncEvent` so all the workers will become + # sleeping down. + man.notInSyncEvent.clear() + return + + if man.getFirstSlot() >= peerSlot: + debug "Peer's head slot is lower than local head slot", peer = peer, + wall_clock_slot = wallSlot, remote_head_slot = peerSlot, + local_last_slot = man.getLastSlot(), + local_first_slot = man.getFirstSlot(), + topics = "columnsync" + peer.updateScore(PeerScoreUseless) + return + + # Wall clock keeps ticking, so we need to update the queue + man.assist.updateLastSlot(man.getLastSlot()) + + man.workers[index].status = ColumnSyncerStatus.Requesting + let req = man.assist.pop(peerSlot, peer) + if req.isEmpty(): + debug "Empty request received from queue", peer = peer, + local_head_slot = headSlot, remote_head_slot = peerSlot, + queue_input_slot = man.assist.inpSlot, + queue_output_slot = man.assist.outSlot, + queue_last_slot = man.assist.finalSlot, + topics = "columnsync" + await sleepAsync(RESP_TIMEOUT_DUR) + return + + debug "Creating new request for peer", wall_clock_slot = wallSlot, + remote_head_slot = peerSlot, local_head_slot = headSlot, + topics = "columnsync" + + man.workers[index].status = ColumnSyncerStatus.Downloading + + let blocks = await man.fetchBlocksForColumnNavigation(peer, req) + if blocks.isErr(): + peer.updateScore(PeerScoreNoValues) + man.assist.push(req) + debug "Failed to receive blocks on request", + err = blocks.error, topics = "columnsync" + return + let blockData = blocks.get().asSeq() + debug "Received blocks on request", + blocks_count = len(blockData), topics = "columnsync" + + let slots = mapIt(blockData, it[].slot) + checkResponse(req, slots).isOkOr: + peer.updateScore(PeerScoreBadResponse) + man.assist.push(req) + warn "Incorrect blocks sequence received", + blocks_count = len(blockData), + reason = error, topics = "columnsync" + return + + let shouldGetDataColumns = + if not man.shouldGetDataColumns(req): + false + else: + var hasColumns = false + for blck in blockData: + withBlck(blck[]): + when consensusFork >= ConsensusFork.Fulu: + if forkyBlck.message.body.blob_kzg_commitments.len > 0: + hasColumns = true + break + hasColumns + + let + remoteNodeId = + fetchNodeIdFromPeerId(peer) + peer_cgc = + peer.lookupCgcFromPeer() + serveable_columns = + List[ColumnIndex, NUMBER_OF_COLUMNS].init( + man.cfg.resolve_columns_from_custody_groups( + remoteNodeId, + max(SAMPLES_PER_SLOT.uint64, + peer_cgc))) + + let columnData = + if shouldGetDataColumns: + let columns = + await man.getDataColumnSidecarsByRange(peer, req, serveable_columns) + if columns.isErr: + peer.updateScore(PeerScoreNoValues) + man.assist.push(req) + debug "Failed to receive columns on request", + err = columns.error, topics = "columnsync" + return + let columnData = columns.get().asSeq() + debug "Received data columns on request", + columns_count = len(columnData), topics = "columnsync" + if len(columnData) > 0: + let slots = mapIt(columnData, it[].signed_block_header.message.slot) + checkDataColumnsResponse(req, slots, man.MAX_BLOBS_PER_BLOCK_ELECTRA).isOkOr: + peer.updateScore(PeerScoreBadResponse) + man.assist.push(req) + warn "Incorrect columns sequence received", + columns_count = len(columnData), + reason = error, topics = "columnsync" + return + + man.groupAndFillColumnTable(blockData, columnData).isOkOr: + peer.updateScore(PeerScoreNoValues) + man.assist.push(req) + info "Received column sequence is inconsistent", + msg = error, topics = "columnsync" + return + + let finalColumns = + man.serializeColumnTable().valueOr: + warn "Issue in grouping reconstructed columns", + msg = error, topics = "columnsync" + return + finalColumns.checkDataColumns().isOkOr: + peer.updateScore(PeerScoreBadResponse) + man.assist.push(req) + warn "Columns verification failed", + columns_count = len(columnData), + reason = error, topics = "columnsync" + return + + # Reset the column syncer table for the next batch + man.column_syncer_table = initOrderedTable[Slot, ColumnAndBlockResponse]() + Opt.some(finalColumns) + else: + Opt.none(seq[DataColumnSidecars]) + + if len(blockData) == 0 and req.contains(man.getSafeSlot()): + peer.updateScore(PeerScoreNoValues) + debug "Response does not include known-to-exist block", + topics = "columnsync" + return + + # Scoring will happen in `syncUpdate` + man.workers[index].status = ColumnSyncerStatus.Queueing + + let + peerFinalized = peer.getFinalizedEpoch().start_slot() + lastSlot = req.slot + req.count + # The peer claims the block is finalized - our own block processing will + # verify this point down the line + maybeFinalized = lastSlot < peerFinalized + + await man.assist.push( + req, blockData, columnData, + maybeFinalized, proc() = + man.workers[index].status = ColumnSyncerStatus.Processing) + +proc columnSyncStrategyGreedy[A, B]( + man: ColumnManager[A, B], + peers: seq[A], + w_index: int) + {.async: (raises: [CancelledError]).} = + + var + accumulator = 0 + requested_peer: A = nil + + let filtered_peers = await man.filterRelevantPeers(peers, w_index) + for peer in filtered_peers: + ## Look for the broadest intersection set among the peers + if man.intersectionColumns(peer).len > accumulator: + accumulator = man.intersectionColumns(peer).len + requested_peer = peer + + # Extract the intersection columns between local and peer to request + let int_cols = man.intersectionColumns(requested_peer) + + if man.remainingSlots() <= man.maxHeadAge: + info "We have synced all columns from the network", + topics = "columnsync" + # Putting all ColumnSync workers to sleep + man.notInSyncEvent.clear() + return + + var + headSlot = man.getLocalHeadSlot() + wallSlot = man.getLocalWallSlot() + reqPeerSlot = requested_peer.getHeadSlot() + + if man.getFirstSlot() >= reqPeerSlot: + debug "Peer's head slot is lower than local head slot", peer = requested_peer, + wall_clock_slot = wallSlot, remote_head_slot = reqPeerSlot, + local_last_slot = man.getLastSlot(), + local_first_slot = man.getFirstSlot(), + topics = "columnsync" + requested_peer.updateScore(PeerScoreUseless) + return + + # Wall clock keeps ticking, so we need an update + man.assist.updateLastSlot(man.getLastSlot()) + + man.workers[w_index].status = ColumnSyncerStatus.Requesting + let req = man.assist.pop(reqPeerSlot, requested_peer) + if req.isEmpty(): + debug "Empty request received from syncer assist, exiting", peer = requested_peer, + local_head_slot = headSlot, remote_head_slot = reqPeerSlot, + queue_input_slot = man.assist.inpSlot, + queue_output_slot = man.assist.outSlot, + queue_last_slot = man.assist.finalSlot, + topics = "columnsync" + await sleepAsync(RESP_TIMEOUT_DUR) + return + + debug "Creating a new request for the peer", wall_clock_slot = wallSlot, + remote_head_slot = reqPeerSlot, local_head_slot = headSlot, + topics = "columnsync" + + man.workers[w_index].status = ColumnSyncerStatus.Downloading + + let blocks = await man.fetchBlocksForColumnNavigation(requested_peer, req) + if blocks.isErr(): + requested_peer.updateScore(PeerScoreNoValues) + man.assist.push(req) + debug "Failed to receive blocks on request", + err = blocks.error, topics = "columnsync" + return + + let blockData = blocks.get().asSeq() + debug "Received blocks on request", + blocks_count = len(blockData) + + let slots = mapIt(blockData, it[].slot) + checkResponse(req, slots).isOkOr: + requested_peer.updateScore(PeerScoreBadResponse) + man.assist.push(req) + warn "Incorrect blocks sequence received", + blocks_count = len(blockData), + reason = error, topics = "columnsync" + return + + let shouldGetDataColumns = + if not man.shouldGetDataColumns(req): + false + else: + var hasColumns = false + for blck in blockData: + withBlck(blck[]): + when consensusFork >= ConsensusFork.Fulu: + if forkyBlck.message.body.blob_kzg_commitments.len > 0: + hasColumns = true + break + hasColumns + + debug "Requesting common columns from the best peer", topics = "columnsync" + let columnData = + if shouldGetDataColumns: + let columns = + await man.getDataColumnSidecarsByRange(requested_peer, req, int_cols) + if columns.isErr(): + requested_peer.updateScore(PeerScoreNoValues) + man.assist.push(req) + debug "Failed to receive columns on request", + err = columns.error, topics = "columnsync" + return + let columnData = columns.get().asSeq() + debug "Received data columns on request", + columns_count = len(columnData), topics = "columnsync" + + if len(columnData) > 0: + let slots = mapIt(columnData, it[].signed_block_header.message.slot) + checkDataColumnsResponse(req, slots, man.MAX_BLOBS_PER_BLOCK_ELECTRA).isOkOr: + requested_peer.updateScore(PeerScoreBadResponse) + man.assist.push(req) + warn "Incorrect columns sequence received", + columns_count = len(columnData), + reason = error, topics = "columnsync" + return + + man.groupAndFillColumnTable(blockData, columnData).isOkOr: + requested_peer.updateScore(PeerScoreNoValues) + man.assist.push(req) + info "Received columns sequence is inconsistent", + msg = error, topics = "columnsync" + return + + let finalColumns = + man.serializeColumnTable().valueOr: + warn "Issue in grouping reconstructed columns", + msg = error, topics = "columnsync" + return + finalColumns.checkDataColumns().isOkOr: + requested_peer.updateScore(PeerScoreBadResponse) + man.assist.push(req) + warn "Columns verification failed", + columns_count = len(columnData), + reason = error, topics = "columnsync" + return + # Reset the column syncer table for the next batch + man.column_syncer_table = initOrderedTable[Slot, ColumnAndBlockResponse]() + Opt.some(finalColumns) + else: + Opt.none(seq[DataColumnSidecars]) + + if len(blockData) == 0 and req.contains(man.getSafeSlot()): + requested_peer.updateScore(PeerScoreNoValues) + debug "Response does not include known-to-exist block", topics = "columnsync" + return + + # Scoring will happen in `syncUpdate` + man.workers[w_index].status = ColumnSyncerStatus.Queueing + + let + peerFinalized = requested_peer.getFinalizedEpoch().start_slot() + lastSlot = req.slot + req.count + # The peer claims the block is finalized - our own block processing will + # verify this point down the line + maybeFinalized = lastSlot < peerFinalized + + await man.assist.push( + req, blockData, columnData, + maybeFinalized, proc() = + man.workers[w_index].status = ColumnSyncerStatus.Processing) + +proc columnSyncWorkerGreedy[A, B]( + man: ColumnManager[A, B], + index: int +) {.async: (raises: [CancelledError]).} = + mixin getKey, getScore, getHeadSlot + + debug "Starting column syncer in `Greedy` mode", topics = "columnsync" + var usefulPeers, uselessPeers: seq[A] + + try: + while true: + man.workers[index].status = ColumnSyncerStatus.Sleeping + # This event is going to be set until we are not in sync with the network + await man.notInSyncEvent.wait() + man.workers[index].status = ColumnSyncerStatus.WaitingPeer + for _ in 0..<10: + var peer: A = nil + peer = await man.pool.acquire() + let + peer_nodeid = peer.fetchNodeIdFromPeerId() + peer_cgc = peer.lookupCgcFromPeer() + intersect_cgc = + intersection( + man.custody_columns_set, + man.cfg.resolve_columns_from_custody_groups(peer_nodeid, + max(SAMPLES_PER_SLOT.uint64, + peer_cgc)).toHashSet()) + if intersect_cgc.len > 0: + usefulPeers.add(peer) + else: + uselessPeers.add(peer) + + if peer.lookupCgcFromPeer() == NUMBER_OF_COLUMNS.uint64: + break + # send back the useless peers back to pool + man.pool.release(uselessPeers) + + # send the useful peers to `columnSyncingStrategy` + await man.columnSyncStrategyGreedy(usefulPeers, index) + + man.pool.release(usefulPeers) + finally: + for peer in usefulPeers & uselessPeers: + if not isNil(peer): + man.pool.release(peer) + +proc columnSyncWorkerImparital[A, B]( + man: ColumnManager[A, B], + index: int +) {.async: (raises: [CancelledError]).} = + mixin getKey, getScore, getHeadSlot + + debug "Starting column syncer in `Impartial` mode", + topics = "columnsync" + var peer: A = nil + try: + while true: + man.workers[index].status = ColumnSyncerStatus.Sleeping + # This event if going to be set until we are not in sync with the network + await man.notInSyncEvent.wait() + man.workers[index].status = ColumnSyncerStatus.WaitingPeer + peer = await man.pool.acquire() + await man.columnSyncStrategyImpartial(index, peer) + man.pool.release(peer) + peer = nil + finally: + if not(isNil(peer)): + man.pool.release(peer) + + debug "Column syncer has stopped.", topics = "columnsync" + +proc getColumnSyncerStats[A, B](man: ColumnManager[A, B]): + tuple[map: string, + sleeping: int, + waiting: int, + pending: int] = + var map = newString(len(man.workers)) + var sleeping, waiting, pending: int + for i in 0.. 0: + res = res & (if ndays < 10: "0" & $ndays else: $ndays) & "d" + v = v - chronos.days(ndays) + + let nhours = chronos.hours(v) + if nhours > 0: + res = res & (if nhours < 10: "0" & $nhours else: $nhours) & "h" + v = v - chronos.hours(nhours) + else: + res = res & "00h" + + let nmins = chronos.minutes(v) + if nmins > 0: + res = res & (if nmins < 10: "0" & $nmins else: $nmins) & "m" + v = v - chronos.minutes(nmins) + else: + res = res & "00m" + res + +proc columnSyncClose[A, B]( + man: ColumnManager[A, B], + speedTaskFut: Future[void] +) {.async: (raises: []).} = + var pending: seq[FutureBase] + if not(speedTaskFut.finished()): + pending.add(speedTaskFut.cancelAndWait()) + for worker in man.workers: + doAssert(worker.status in {Sleeping, WaitingPeer}) + pending.add(worker.future.cancelAndWait()) + await noCancel allFutures(pending) + +proc columnSyncLoop[A, B]( + man: ColumnManager[A, B] +) {.async: (raises: [CancelledError]).} = + + mixin getKey, getScore + var pauseTime = 0 + + man.initColumnSyncerAssist() + man.startColumnSyncWorkers() + + debug "Column sync loop has started", + topics = "columnsync" + + proc averageSpeedTask() {.async: (raises: [CancelledError]).} = + while true: + # Reset column sync speeds between each loss-of-sync event + man.avgSyncSpeed = 0 + man.insSyncSpeed = 0 + + await man.notInSyncEvent.wait() + + # Give the node time to connect to peers and get the column sync started + await sleepAsync(seconds(SECONDS_PER_SLOT.int64)) + + var + stamp = ColumnSyncTimestamp.now(man.assist.progress()) + syncCount = 0 + + while man.inProgress: + await sleepAsync(seconds(SECONDS_PER_SLOT.int64)) + + let + newStamp = ColumnSyncTimestamp.now(man.assist.progress()) + slotsPerSec = speed(stamp, newStamp) + + syncCount += 1 + + man.insSyncSpeed = slotsPerSec + man.avgSyncSpeed = + man.avgSyncSpeed + (slotsPerSec - man.avgSyncSpeed) / float(syncCount) + + stamp = newStamp + + let averageSpeedTaskFut = averageSpeedTask() + + while true: + let wallSlot = man.getLocalWallSlot() + let headSlot = man.getLocalHeadSlot() + + let (map, sleeping, waiting, pending) = man.getColumnSyncerStats() + + case man.assist.direction + of ColumnSyncerDirection.Forward: + debug "Current column syncing state", workers_map = map, + sleeping_workers_count = sleeping, + waiting_workers_count = waiting, + pending_workers_count = pending, + wall_head_slot = wallSlot, + local_head_slot = headSlot, + pause_time = $chronos.seconds(pauseTime), + avg_sync_speed = man.avgSyncSpeed.formatBiggestFloat(ffDecimal, 4), + ins_sync_speed = man.insSyncSpeed.formatBiggestFloat(ffDecimal, 4), + topics = "columnsync" + of ColumnSyncerDirection.Backward: + debug "Current syncing state", workers_map = map, + sleeping_workers_count = sleeping, + waiting_workers_count = waiting, + pending_workers_count = pending, + wall_head_slot = wallSlot, + backfill_slot = man.getSafeSlot(), + pause_time = $chronos.seconds(pauseTime), + avg_sync_speed = man.avgSyncSpeed.formatBiggestFloat(ffDecimal, 4), + ins_sync_speed = man.insSyncSpeed.formatBiggestFloat(ffDecimal, 4), + topics = "columnsync" + + let + pivot = man.progressPivot + progress = + case man.assist.direction + of ColumnSyncerDirection.Forward: + if man.assist.outSlot >= pivot: + man.assist.outSlot - pivot + else: + 0'u64 + of ColumnSyncerDirection.Backward: + if pivot >= man.assist.outSlot: + pivot - man.assist.outSlot + else: + 0'u64 + total = + case man.assist.direction + of ColumnSyncerDirection.Forward: + if man.assist.finalSlot >= pivot: + man.assist.finalSlot + 1'u64 - pivot + else: + 0'u64 + of ColumnSyncerDirection.Backward: + if pivot >= man.assist.finalSlot: + pivot + 1'u64 - man.assist.finalSlot + else: + 0'u64 + remaining = total - progress + done = + if total > 0: + progress.float / total.float + else: + 1.0 + timeleft = + if man.avgSyncSpeed >= 0.001: + Duration.fromFloatSeconds(remaining.float / man.avgSyncSpeed) + else: + InfiniteDuration + currentSlot = Base10.toString( + if man.assist.direction == ColumnSyncerDirection.Forward: + max(uint64(man.assist.outSlot), 1'u64) - 1'u64 + else: + uint64(man.assist.outSlot) + 1'u64 + ) + + # Update status string + man.syncStatus = timeleft.timeLeftForColumnSyncer() & " (" & + (done * 100).formatBiggestFloat(ffDecimal, 2) & "%) " & + man.avgSyncSpeed.formatBiggestFloat(ffDecimal, 4) & + "slots/s (" & map & ":" & currentSlot & ")" + + if (man.assist.direction == ColumnSyncerDirection.Forward) and + (ColumnSyncerMode.NoGenesisSync in man.modes): + if not(man.isWithinWeakSubjectivityPeriod()): + fatal WeakSubjectivityLogMessage, current_slot = wallSlot + await man.stopColumnSyncWorkers() + man.shutdownEvent.fire() + return + + if man.remainingSlots() <= man.maxHeadAge: + man.notInSyncEvent.clear() + # We are marking SyncManager as not working only when we are in sync and + # all sync workers are in `Sleeping` state. + if pending > 0: + debug "Synchronization loop waits for workers completion", + wall_head_slot = wallSlot, local_head_slot = headSlot, + difference = (wallSlot - headSlot), max_head_age = man.maxHeadAge, + sleeping_workers_count = sleeping, + waiting_workers_count = waiting, pending_workers_count = pending, + topics = "columnsync" + # We already synced, so we should reset all the pending workers from + # any state they have. + man.assist.clearAndWakeup() + man.inProgress = true + else: + case man.direction + of ColumnSyncerDirection.Forward: + if man.inProgress: + if ColumnSyncerMode.NoMonitor in man.modes: + await man.columnSyncClose(averageSpeedTaskFut) + man.inProgress = false + debug "Forward column sync process finished, exiting", + wall_head_slot = wallSlot, local_head_slot = headSlot, + difference = (wallSlot - headSlot), + max_head_age = man.maxHeadAge, + topics = "columnsync" + break + else: + man.inProgress = false + debug "Forward column sync process finished, sleeping", + wall_head_slot = wallSlot, local_head_slot = headSlot, + difference = (wallSlot - headSlot), + max_head_age = man.maxHeadAge, + topics = "columnsync" + else: + debug "Column sync loop sleeping", wall_head_slot = wallSlot, + local_head_slot = headSlot, + difference = (wallSlot - headSlot), + max_head_Age = man.maxHeadAge, + topics = "columnsync" + of ColumnSyncerDirection.Backward: + await man.columnSyncClose(averageSpeedTaskFut) + man.inProgress = false + debug "Backward column sync process finished, exiting", + wall_head_slot = wallSlot, local_head_slot = headSlot, + backfill_slot = man.getLastSlot(), + max_head_Age = man.maxHeadAge, + topics = "columnsync" + break + else: + if not(man.notInSyncEvent.isSet()): + # We get here only if we lost sync for more than `maxHeadAge` period. + if pending == 0: + man.initColumnSyncerAssist() + man.notInSyncEvent.fire() + man.inProgress = true + debug "Node lost column sync for more than preset period", + period = man.maxHeadAge, wall_head_slot = wallSlot, + local_head_slot = headSlot, + missing_slots = man.remainingSlots(), + progress = float(man.assist.progress()), + topics = "columnsync" + else: + man.notInSyncEvent.fire() + man.inProgress = true + + await sleepAsync(chronos.seconds(2)) + +proc start*[A, B](man: ColumnManager[A, B]) = + man.columnSyncFut = man.columnSyncLoop() + +proc updatePivot*[A, B](man: ColumnManager[A, B], pivot: Slot) = + man.progressPivot = pivot + +proc join*[A, B]( + man: ColumnManager[A, B] +): Future[void] {.async: (raw: true, raises: [CancelledError]).} = + if man.columnSyncFut.isNil(): + let retFuture = + Future[void].Raising([CancelledError]).init("nimbus-eth2.join()") + retFuture.complete() + retFuture + else: + man.columnSyncFut.join() + diff --git a/beacon_chain/sync/column_syncer_assist.nim b/beacon_chain/sync/column_syncer_assist.nim new file mode 100644 index 0000000000..bb95673beb --- /dev/null +++ b/beacon_chain/sync/column_syncer_assist.nim @@ -0,0 +1,927 @@ +# beacon_chain +# Copyright (c) 2018-2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import std/[heapqueue, tables, strutils, sequtils, math] +import stew/base10, chronos, chronicles, results +import + ../spec/datatypes/[phase0, altair], + ../spec/eth2_apis/rest_types, + ../spec/[helpers, forks, network], + ../networking/[peer_pool, peer_scores, eth2_network], + ../gossip_processing/block_processor, + ../beacon_clock, + "."/[sync_protocol, sync_queue] + +type + PeerdasBlockVerifier* = proc(signedBlock: ForkedSignedBeaconBlock, + columns: Opt[DataColumnSidecars], + maybeFinalized: bool): + Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} + + ColumnSyncWaiter* = ref object + future: Future[void].Raising([CancelledError]) + reset: bool + + ColumnSyncerDirection* {.pure.} = enum + Forward, Backward + + GapColumn*[T] = object + start*: Slot + finish*: Slot + item*: T + + ColumnSyncRequest*[T] = object + direction*: ColumnSyncerDirection + index*: uint64 + slot*: Slot + count*: uint64 + item*: T + + ColumnSyncResult*[T] = object + request*: ColumnSyncRequest[T] + data*: seq[ref ForkedSignedBeaconBlock] + columns*: Opt[seq[DataColumnSidecars]] + + ColumnSyncerAssist*[T] = ref object + direction*: ColumnSyncerDirection + inpSlot*: Slot + outSlot*: Slot + startSlot*: Slot + finalSlot*: Slot + chunkSize*: uint64 + queueSize*: int + counter*: uint64 + pending*: Table[uint64, ColumnSyncRequest[T]] + gapList*: seq[GapColumn[T]] + waiters*: seq[ColumnSyncWaiter] + getSafeSlot*: GetSlotCallback + debtsQueue: HeapQueue[ColumnSyncRequest[T]] + debtsCount: uint64 + readyQueue: HeapQueue[ColumnSyncResult[T]] + rewind: Option[RewindPoint] + db: BeaconChainDB + peerdasBlockVerifier: PeerdasBlockVerifier + +proc init[T](t1: typedesc[ColumnSyncRequest], direction: ColumnSyncerDirection, start: Slot, + finish: Slot, t2: typedesc[T]): ColumnSyncRequest[T] = + let count = finish - start + 1'u64 + ColumnSyncRequest[T](direction: direction, slot: start, count: count) + +proc init[T](t1: typedesc[ColumnSyncRequest], direction: ColumnSyncerDirection, slot: Slot, + count: uint64, item: T): ColumnSyncRequest[T] = + ColumnSyncRequest[T](direction: direction, slot: slot, count: count, item: item) + +proc init[T](t1: typedesc[ColumnSyncRequest], direction: ColumnSyncerDirection, start: Slot, + finish: Slot, item: T): ColumnSyncRequest[T] = + let count = finish - start + 1'u64 + ColumnSyncRequest[T](direction: direction, slot: start, count: count, item: item) + +proc empty*[T](t: typedesc[ColumnSyncRequest], direction: ColumnSyncerDirection, + t2: typedesc[T]): ColumnSyncRequest[T] {.inline.} = + ColumnSyncRequest[T](direction: direction, count: 0'u64) + +proc setItem*[T](sr: var ColumnSyncRequest[T], item: T) = + sr.item = item + +proc isEmpty*[T](sr: ColumnSyncRequest[T]): bool = + (sr.count == 0'u64) + +template shortLog*[T](req: ColumnSyncRequest[T]): string = + Base10.toString(uint64(req.slot)) & ":" & + Base10.toString(req.count) & "@" & + Base10.toString(req.index) + +proc contains*[T](req: ColumnSyncRequest[T], slot: Slot): bool {.inline.} = + slot >= req.slot and slot < req.slot + req.count + +proc cmp*[T](a, b: ColumnSyncRequest[T]): int = + cmp(uint64(a.slot), uint64(b.slot)) + +proc checkResponse*[T](req: ColumnSyncRequest[T], + data: openArray[Slot]): Result[void, cstring] = + if len(data) == 0: + # Impossible to verify empty response. + return ok() + + if lenu64(data) > req.count: + # Number of blocks in response should be less or equal to number of + # requested blocks. + return err("Too many blocks received") + + var + slot = req.slot + rindex = 0'u64 + dindex = 0 + + while (rindex < req.count) and (dindex < len(data)): + if slot < data[dindex]: + discard + elif slot == data[dindex]: + inc(dindex) + else: + return err("Incorrect order or duplicate blocks found") + slot += 1'u64 + rindex += 1'u64 + + if dindex != len(data): + return err("Some of the blocks are outside the requested range") + + ok() + +proc checkDataColumnsResponse*[T](req: ColumnSyncRequest[T], + data: openArray[Slot], + maxBlobsPerBlockElectra: uint64): + Result[void, string] = + if data.len == 0: + return ok() + + if lenu64(data) > (req.count * NUMBER_OF_COLUMNS): + # Number of data columns in the response should be less than + # or equal to (MAX_BLOBS_PER_BLOCK_FULU * NUMBER_OF_COLUMNS). + return err ("Too many data columns have been received") + + var + pSlot = data[0] + counter = 0'u64 + for slot in data: + if (slot < req.slot) or (slot >= req.slot + req.count): + return err ("Some of the data columns are not in the range") + if slot < pSlot: + return err ("Data columns have been sent in incorrect order") + if slot == pSlot: + inc counter + # keeping this constant Electra until Fulu comes in + if counter > maxBlobsPerBlockElectra * NUMBER_OF_COLUMNS: + return err ("Number of data columns in the block has exceeded the limit") + else: + counter = 1'u64 + pSlot = slot + + ok() + +proc init*[T](t1: typedesc[ColumnSyncerAssist], t2: typedesc[T], + direction: ColumnSyncerDirection, + start, final: Slot, chunkSize: uint64, + getSafeSlotCb: GetSlotCallback, + peerdasBlockVerifier: PeerdasBlockVerifier, + syncQueueSize: int = -1): + ColumnSyncerAssist[T] = + + doAssert(chunkSize > 0'u64, "Chunk size should not be zero") + ColumnSyncerAssist[T]( + direction: direction, + startSlot: start, + finalSlot: final, + chunkSize: chunkSize, + queueSize: syncQueueSize, + getSafeSlot: getSafeSlotCb, + waiters: newSeq[ColumnSyncWaiter](), + counter: 1'u64, + pending: initTable[uint64, ColumnSyncRequest[T]](), + debtsQueue: initHeapQueue[ColumnSyncRequest[T]](), + inpSlot: start, + outSlot: start, + peerdasBlockVerifier: peerdasBlockVerifier) + +proc `<`*[T](a, b: ColumnSyncRequest[T]): bool = + doAssert(a.direction == b.direction) + case a.direction + of ColumnSyncerDirection.Forward: + a.slot < b.slot + of ColumnSyncerDirection.Backward: + a.slot > b.slot + +proc `<`*[T](a, b: ColumnSyncResult[T]): bool = + doAssert(a.request.direction == b.request.direction) + case a.request.direction + of ColumnSyncerDirection.Forward: + a.request.slot < b.request.slot + of ColumnSyncerDirection.Backward: + a.request.slot > b.request.slot + +proc `==`*[T](a, b: ColumnSyncRequest[T]): bool = + (a.slot == b.slot) and (a.count == b.count) + +proc lastSlot*[T](r: ColumnSyncRequest[T]): Slot = + ## Returns last slot for request + r.slot + r.count - 1'u64 + +proc makePending*[T](cas: ColumnSyncerAssist[T], req: var ColumnSyncRequest[T]) = + req.index = cas.counter + cas.counter = cas.counter + 1'u64 + +proc updateLastSlot*[T](cas: ColumnSyncerAssist[T], last: Slot) {.inline.} = + cas.finalSlot = last + +proc wakeUpWaiters[T](cas: ColumnSyncerAssist[T], reset = false) = + ## Wakeup one or all blocked waiters + for item in cas.waiters: + if reset: + item.reset = true + + if not(item.future.finished()): + item.future.complete() + +proc waitForChanges[T](cas: ColumnSyncerAssist[T]): + Future[bool] + {.async: (raises: [CancelledError]).} = + ## Create new waiter and wait for completion from `wakeUpWaiters()`. + let + waitFut = + Future[void].Raising([CancelledError]).init("ColumnSyncerAssist.waitForChanges") + waitItem = + ColumnSyncWaiter(future: waitfut) + cas.waiters.add(waitItem) + try: + await waitFut + return waitItem.reset + finally: + cas.waiters.delete(cas.waiters.find(waitItem)) + +proc wakeupAndWaitWaiters[T](cas: ColumnSyncerAssist[T]) + {.async: (raises: [CancelledError]).} = + ## This proc will perform wakeUpWaiters(true) and block until + ## last waiter will be awakened + var waitChanges = cas.waitForChanges() + cas.wakeupWaiters(true) + discard await waitChanges + +proc clearAndWakeup*[T](cas: ColumnSyncerAssist[T]) = + cas.pending.clear() + cas.wakeUpWaiters(true) + +proc resetWait*[T](cas: ColumnSyncerAssist[T], toSlot: Option[Slot]) {.async: (raises: [CancelledError]).} = + cas.pending.clear() + + let minSlot = + case cas.direction + of ColumnSyncerDirection.Forward: + if toSlot.isSome(): + min(toSlot.get(), cas.outSlot) + else: + cas.outSlot + of ColumnSyncerDirection.Backward: + if toSlot.isSome(): + toSlot.get() + else: + cas.outSlot + cas.debtsQueue.clear() + cas.debtsCount = 0 + cas.readyQueue.clear() + cas.inpSlot = minSlot + cas.outSlot = minSlot + # Waking up all waiters and wait for last one. + await cas.wakeupAndWaitWaiters() + +proc isEmpty*[T](sr: ColumnSyncResult[T]): bool {.inline.} = + ## Returns ``true`` if response chain of blocks is empty (has only + ## empty slots). + len(sr.data) == 0 + +proc hasEndGap*[T](sr: ColumnSyncResult[T]): bool {.inline.} = + ## Returns ``true`` if response chain of blocks has a gap at the end + let lastSlot = + sr.request.slot + sr.request.count - 1'u64 + if len(sr.data) == 0: + return true + if sr.data[^1][].slot != lastslot: + return true + return false + +proc getLastNonEmptySlot*[T](sr: ColumnSyncResult[T]): Slot {.inline.} = + ## Returns last non-empty slot from result. If response has only + ## empty slots, original request slot will be returned. + if len(sr.data) == 0: + # If response has only empty slots we are going to use original + # request slot + sr.request.slot + else: + sr.data[^1][].slot + +proc processGap[T](cas: ColumnSyncerAssist[T], sr: ColumnSyncResult[T]) = + if sr.isEmpty(): + let gitem = GapColumn[T](start: sr.request.slot, + finish: sr.request.slot + sr.request.count - 1'u64, + item: sr.request.item) + cas.gapList.add(gitem) + else: + if sr.hasEndGap(): + let gitem = GapColumn[T](start: sr.getLastNonEmptySlot() + 1'u64, + finish: sr.request.slot + sr.request.count - 1'u64, + item: sr.request.item) + cas.gapList.add(gitem) + else: + cas.gapList.reset() + +proc rewardForGaps[T](cas: ColumnSyncerAssist[T], score: int) = + mixin updateScore, getStats + + for gap in cas.gapList: + if score < 0: + # Every empty response increases penalty by 25%, but not more than 200%. + let + emptyCount = gap.item.getStats(SyncResponseKind.Empty) + goodCount = gap.item.getStats(SyncResponseKind.Good) + + if emptyCount <= goodCount: + gap.item.updateScore(score) + else: + let + weight = int(min(emptyCount - goodCount, 8'u64)) + newScore = score + score * weight div 4 + gap.item.updateScore(newScore) + debug "Peer received gap penalty, for missing columns in response", + peer = gap.item, penalty = newScore, topics = "columnsync" + else: + gap.item.updateScore(score) + +proc toDebtsQueue[T](cas: ColumnSyncerAssist[T], sr: ColumnSyncRequest[T]) = + cas.debtsQueue.push(sr) + cas.debtsCount = cas.debtsCount + sr.count + +proc getRewindPoint*[T](cas: ColumnSyncerAssist[T], failSlot: Slot, + safeSlot: Slot): Slot = + logScope: + direction = cas.kind + + case cas.direction + of ColumnSyncerDirection.Forward: + # Calculate the latest finalized epoch + let finalizedEpoch = epoch(safeSlot) + + # Calculate failure epoch + let failEpoch = epoch(failSlot) + + # Calculate exponential rewind point in number of epochs. + let epochCount = + if cas.rewind.isSome(): + let rewind = cas.rewind.get() + if failSlot == rewind.failSlot: + # `MissingParent` happened at same slot so we increase rewind point by + # factor of 2. + if failEpoch > finalizedEpoch: + let rewindPoint = rewind.epochCount shl 1 + if rewindPoint < rewind.epochCount: + # If exponential rewind point produces `uint64` overflow we will + # make rewind to latest finalized epoch + failEpoch - finalizedEpoch + else: + if (failEpoch < rewindPoint) or + (failEpoch - rewindPoint < finalizedEpoch): + # If exponential rewind point points to position which is far + # behind latest finalized epoch. + failEpoch - finalizedEpoch + else: + rewindPoint + else: + warn "Trying to rewind over the last finalized epoch", + finalized_slot = safeSlot, fail_slot = failSlot, + finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, + rewind_epoch_count = rewind.epochCount, + finalized_epoch = finalizedEpoch + 0'u64 + else: + # `MissingParent` happened at different slot so we are going to rewind + # for 1 epoch only. + if (failEpoch < 1'u64) or (failEpoch - 1'u64 < finalizedEpoch): + warn "Could not rewind further than the last finalized epoch", + finalized_slot = safeSlot, fail_slot = failSlot, + finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, + rewind_epoch_count = rewind.epochCount, + finalized_epoch = finalizedEpoch + 0'u64 + else: + 1'u64 + else: + # `MissingParent` happened first time. + if (failEpoch < 1'u64) or (failEpoch - 1'u64 < finalizedEpoch): + warn "Could not rewind further than the last finalized slot", + finalized_slot = safeSlot, fail_slot = failSlot, + finalized_epoch = finalizedEpoch, fail_epoch = failEpoch + 0'u64 + else: + 1'u64 + + if epochCount == 0'u64: + warn "Unable to continue syncing, please restart the node", + finalized_slot = safeSlot, fail_slot = failSlot, + finalized_epoch = finalizedEpoch, fail_epoch = failEpoch + # Calculate the rewind epoch, which will be equal to last rewind point or + # finalizedEpoch + let rewindEpoch = + if cas.rewind.isNone(): + finalizedEpoch + else: + epoch(cas.rewind.get().failSlot) - cas.rewind.get().epochCount + rewindEpoch.start_slot() + else: + # Calculate the rewind epoch, which should not be less than the latest + # finalized epoch. + let rewindEpoch = failEpoch - epochCount + # Update and save new rewind point in ColumnSyncerAssist + cas.rewind = some(RewindPoint(failSlot: failSlot, epochCount: epochCount)) + rewindEpoch.start_slot() + + of ColumnSyncerDirection.Backward: + # While we perform backward sync, the only possible slot we could rewind is + # the latest stored block. + if failSlot == safeSlot: + warn "Unable to continue syncing, please restart the node", + safe_slot = safeSlot, fail_slot = failSlot + safeSlot + +func getOpt(columns: Opt[seq[DataColumnSidecars]], i: int): Opt[DataColumnSidecars] = + if columns.isSome: + Opt.some(columns.get()[i]) + else: + Opt.none(DataColumnSidecars) + +iterator peerdas_blocks[T](cas: ColumnSyncerAssist[T], + sr: ColumnSyncResult[T]): (ref ForkedSignedBeaconBlock, Opt[DataColumnSidecars]) = + case cas.direction + of ColumnSyncerDirection.Forward: + for i in countup(0, len(sr.data) - 1): + yield (sr.data[i], sr.columns.getOpt(i)) + of ColumnSyncerDirection.Backward: + for i in countdown(len(sr.data) - 1, 0): + yield (sr.data[i], sr.columns.getOpt(i)) + +proc advanceOutput*[T](cas: ColumnSyncerAssist[T], number: uint64) = + case cas.direction + of ColumnSyncerDirection.Forward: + cas.outSlot = cas.outSlot + number + of ColumnSyncerDirection.Backward: + cas.outSlot = cas.outSlot - number + +proc advanceInput[T](cas: ColumnSyncerAssist[T], number: uint64) = + case cas.direction + of ColumnSyncerDirection.Forward: + cas.inpSlot = cas.inpSlot + number + of ColumnSyncerDirection.Backward: + cas.inpSlot = cas.inpSlot - number + +proc notInRange[T](cas: ColumnSyncerAssist[T], sr: ColumnSyncRequest[T]): bool = + case cas.direction + of ColumnSyncerDirection.Forward: + (cas.queueSize > 0) and (sr.slot > cas.outSlot) + of ColumnSyncerDirection.Backward: + (cas.queueSize > 0) and (sr.lastSlot < cas.outSlot) + +func numAlreadyKnownSlots[T](cas: ColumnSyncerAssist, sr: ColumnSyncRequest[T]): uint64 = + ## Compute the number of slots covered by a given `ColumnSyncResult` that are + ## already known and, hence, no relevant for sync progressions + let + outSlot = cas.outSlot + lowSlot = sr.slot + highSlot = sr.lastSlot + case cas.direction + of ColumnSyncerDirection.Forward: + if outSlot > highSlot: + # Entire request is no longer relevant. + sr.count + elif outSlot > lowSlot: + # Request is only partially relevant. + outSlot - lowSlot + else: + # Entire request is still relevant. + 0 + of ColumnSyncerDirection.Backward: + if lowSlot > outSlot: + # Entire request is no longer relevant + sr.count + elif highSlot > outSlot: + # Request is only partially relevant + highSlot - outSlot + else: + # Entire request is still relevant + 0 + +proc push*[T](cas: ColumnSyncerAssist[T], sr: ColumnSyncRequest[T], + data: seq[ref ForkedSignedBeaconBlock], + columns: Opt[seq[DataColumnSidecars]], + maybeFinalized: bool = false, + processingCb: ProcessingCallback = nil) + {.async: (raises: [CancelledError]).} = + + logScope: + topics = "columnsync" + + ## Push successful result to queue + mixin updateScore, updateStats, getStats + if sr.index notin cas.pending: + # If request sr not in our pending list, it only means that + # ColumnSyncerAssist.resetWait() happens and all pending requests are expired, so + # we swallow `old` requests, and in such a way sync workers are able to get + # proper new requests from ColumnSyncerAssist + return + + cas.pending.del(sr.index) + + while true: + if cas.notInRange(sr): + let reset = await cas.waitForChanges() + if reset: + # ColumnSyncerAssist reset + return + else: + let syncres = ColumnSyncResult[T](request: sr, data: data, columns: columns) + cas.readyQueue.push(syncres) + break + + while len(cas.readyQueue) > 0: + let reqres = + case cas.direction + of ColumnSyncerDirection.Forward: + let minSlot = cas.readyQueue[0].request.slot + if cas.outSlot < minSlot: + none[ColumnSyncResult[T]]() + else: + some(cas.readyQueue.pop()) + of ColumnSyncerDirection.Backward: + let maxSlot = cas.readyQueue[0].request.slot + + (cas.readyQueue[0].request.count - 1'u64) + if cas.outSlot > maxSlot: + none[ColumnSyncResult[T]]() + else: + some(cas.readyQueue.pop()) + + let item = + if reqres.isSome(): + reqres.get() + else: + let rewindSlot = cas.getRewindPoint(cas.outSlot, cas.getSafeSlot()) + warn "Got incorrect column sync result in queue, rewinding", + blocks_count = len(cas.readyQueue[0].data), + output_slot = cas.outSlot, input_slot = cas.inpSlot, + rewind_to_slot = rewindSlot + await cas.resetWait(some(rewindSlot)) + break + + if processingCb != nil: + processingCb() + + # Validating received blocks one by one + var + hasInvalidBlock = false + unviableBlock: Option[(Eth2Digest, Slot)] + missingParentSlot: Option[Slot] + goodBlock: Option[Slot] + + res: Result[void, VerifierError] + + var i=0 + for blk, col in cas.peerdas_blocks(item): + res = await cas.peerdasBlockVerifier(blk[], col, maybeFinalized) + inc i + + if res.isOk: + goodBlock = some(blk[].slot) + else: + case res.error() + of VerifierError.MissingParent: + missingParentSlot = some(blk[].slot) + break + of VerifierError.Duplicate: + debug "Attempting to persist downloaded columns into store", + topics = "columnsync" + # it means the block syncer has enqueued + # the block first, and we can now call the + # block reliable, hence we can directly + # attempt to persist the data column sidecar + # into the columns store + for cl in col.get: + cas.db.putDataColumnSidecar(cl[]) + + of VerifierError.UnviableFork: + # Keep going as to register other unviable blocks with the + # quarantine + if unviableBlock.isNone: + # Remember the first unviable block, so we can log it + unviableBlock = some((blk[].root, blk[].slot)) + + of VerifierError.Invalid: + hasInvalidBlock = true + + let req = item.request + notice "Received invalid sequence of blocks", + blocks_count = len(item.data), topics = "columnsync" + req.item.updateScore(PeerScoreBadValues) + break + + # When errors happen while processing blocks, we retry the same request + # with, hopefully, a different peer + let retryRequest = + hasInvalidBlock or unviableBlock.isSome() or missingParentSlot.isSome() + if not(retryRequest): + let numSlotsAdvanced = item.request.count - cas.numAlreadyKnownSlots(sr) + cas.advanceOutput(numSlotsAdvanced) + + if goodBlock.isSome(): + # If there no error and response was not empty we should reward peer + # with some bonus score - not for duplicate blocks though. + item.request.item.updateScore(PeerScoreGoodValues) + item.request.item.updateStats(SyncResponseKind.Good, 1'u64) + + # BlockProcessor reports good block, so we can reward all the peers + # who sent us empty response. + cas.rewardForGaps(PeerScoreGoodValues) + cas.gapList.reset() + else: + # Response was empty + item.request.item.updateStats(SyncResponseKind.Empty, 1'u64) + + cas.processGap(item) + + if numSlotsAdvanced > 0: + cas.wakeupWaiters() + + else: + debug "Block pool rejected peer's response", + blocks_count = len(item.data), + ok = goodBlock.isSome(), + unviable = unviableBlock.isSome(), + missing_parent = missingParentSlot.isSome(), + topics = "columnsync" + + # We need to move failed response to the debts queue. + cas.toDebtsQueue(item.request) + + if unviableBlock.isSome(): + let req = item.request + notice "Received blocks from an unviable fork", + blockRoot = unviableBlock.get()[0], + blockSlot = unviableBlock.get()[1], + blocks_count = len(item.data) + req.item.updateScore(PeerScoreUnviableFork) + + if missingParentSlot.isSome(): + var + resetSlot: Option[Slot] + failSlot = missingParentSlot.get() + + # If we get `VerfierError.MissingParent` it means that peer returns + # chain of blocks with holes or `block_pool` is in incomplete state. We + # going to rewind the ColumnSyncerAssist some distance back, but no more + # than `finalized_epoch`. + + let + req = item.request + safeSlot = cas.getSafeSlot() + gapsCount = len(cas.gapList) + + # We should penalize all the peers which responded with gaps. + cas.rewardForGaps(PeerScoreMissingValues) + cas.gapList.reset() + + case cas.direction + of ColumnSyncerDirection.Forward: + if goodBlock.isSome(): + # `VerifierError.MissingParent` and `Success` present in response, + # it means that we to request this range one more time. + debug "Unexpected missing parent, but no rewind needed", + finalized_slot = safeSlot, + last_good_slot = goodBlock.get(), + missing_parent_slot = missingParentSlot.get(), + blocks_count = len(item.data), + topics = "columnsync" + req.item.updateScore(PeerScoreUnviableFork) + else: + if safeSlot < req.slot: + let rewindSlot = cas.getRewindPoint(failSlot, safeSlot) + debug "Unexpected missing parent, rewind needed", + rewind_to_slot = rewindSlot, + rewind_point = cas.rewind, finalized_slot = safeSlot, + blocks_count = len(item.data), + gaps_count = gapsCount, + topics = "columnsync" + resetSlot = some(rewindSlot) + else: + error "Unexpected missing parent at finalized epoch slot", + rewind_to_slot = safeSlot, + blocks_count = len(item.data), + gaps_count = gapsCount, + topics = "columnsync" + req.item.updateScore(PeerScoreBadValues) + of ColumnSyncerDirection.Backward: + if safeSlot > failSlot: + let rewindSlot = cas.getRewindPoint(failSlot, safeSlot) + # It's quite common peers us fewer blocks than we ask for + debug "Gap in block range response, rewinding", + rewind_to_slot = rewindSlot, rewind_fail_slot = failSlot, + finalized_slot = safeSlot, blocks_count = len(item.data), + topics = "columnsync" + resetSlot = some(rewindSlot) + req.item.updateScore(PeerScoreMissingValues) + else: + error "Unexpected missing parent at safe slot", + to_slot= safeSlot, blocks_count = len(item.data) + req.item.updateScore(PeerScoreBadValues) + + if resetSlot.isSome(): + await cas.resetWait(resetSlot) + case cas.direction + of ColumnSyncerDirection.Forward: + debug "Rewind to slot has happened", reset_slot = resetSlot.get(), + queue_input_slot = cas.inpSlot, queue_output_slot = cas.outSlot, + rewind_point = cas.rewind, direction = cas.direction, + topics = "columnsync" + of ColumnSyncerDirection.Backward: + debug "Rewind to slot has happened", reset_slot = resetSlot.get(), + queue_input_slot = cas.inpSlot, queue_output_slot = cas.outSlot, + direction = cas.direction, topics = "columnsync" + break + +proc push*[T](cas: ColumnSyncerAssist[T], sr: ColumnSyncRequest[T]) = + ## Push failed request back to queue + if sr.index notin cas.pending: + # If request `sr` not in our pending list, it only a newer `safeSlot`, either + # ColumnSyncerAssist.resetWait() happens and all pending requests are expired, + # so we swallow `old` requests, and in such way sync workers are able to get + # proper new requests from ColumnSyncerAssist + return + cas.pending.del(sr.index) + cas.toDebtsQueue(sr) + +proc handlePotentialSafeSlotAdvancement[T](cas: ColumnSyncerAssist[T]) = + # It may happen that sync progress advanced to a newer `safeSlot`, either + # by a response that started with good values and only had errors late, + # or through an out-of-bound mechanism, e.g., VC/REST. + # If that happens, advance to the new `safeSlot` to avoid repeating requests + # for data is considered immutable and no longer relevant. + + let safeSlot = cas.getSafeSlot() + func numSlotsBehindSafeSlot(slot: Slot): uint64 = + case cas.direction + of ColumnSyncerDirection.Forward: + if safeSlot > slot: + safeSlot - slot + else: + 0 + of ColumnSyncerDirection.Backward: + if slot > safeSlot: + slot - safeSlot + else: + 0 + + let + numOutSlotsAdvanced = cas.outSlot.numSlotsBehindSafeSlot + numInpSlotsAdvanced = + case cas.direction + of ColumnSyncerDirection.Forward: + cas.inpSlot.numSlotsBehindSafeSlot + of ColumnSyncerDirection.Backward: + if cas.inpSlot == 0xFFFF_FFFF_FFFF_FFFF'u64: + 0'u64 + else: + cas.inpSlot.numSlotsBehindSafeSlot + + if numOutSlotsAdvanced != 0 or numInpSlotsAdvanced != 0: + debug "Sync progress advanced out-of-bound", + safeSlot, outSlot = cas.outSlot, inpSlot = cas.inpSlot, + topics = "columnsync" + if numOutSlotsAdvanced != 0: + cas.advanceOutput(numOutSlotsAdvanced) + if numInpSlotsAdvanced != 0: + cas.advanceInput(numInpSlotsAdvanced) + cas.wakeupWaiters() + +func updateRequestForNewSafeSlot[T](cas: ColumnSyncerAssist[T], sr: var ColumnSyncRequest[T]) = + # Requests may have originated before the latest `safeSlot` advancement. + # Update it to not request any data prior to `safeSlot`. + let + outSlot = cas.outSlot + lowSlot = sr.slot + highSlot = sr.lastSlot + case cas.direction + of ColumnSyncerDirection.Forward: + if outSlot <= lowSlot: + # Entire request is still relevant + discard + elif outSlot <= highSlot: + # Request is only partially relevant. + let + numSlotsDone = outSlot - lowSlot + sr.slot += numSlotsDone + sr.count -= numSlotsDone + else: + # Entire request is no longer relevant + sr.count = 0 + of ColumnSyncerDirection.Backward: + if outSlot >= highSlot: + # Entire request is still relevant + discard + elif outSlot >= lowSlot: + # Request is only partially relevant + let + numSlotsDone = highSlot - outSlot + sr.count -= numSlotsDone + else: + # Entire request is no longer relevant. + sr.count = 0 + +proc pop*[T](cas: ColumnSyncerAssist[T], maxSlot: Slot, item: T): ColumnSyncRequest[T] = + ## Create new request according to current `ColumnSyncerAssist` parameters + cas.handlePotentialSafeSlotAdvancement() + while len(cas.debtsQueue) > 0: + if maxSlot < cas.debtsQueue[0].slot: + # Peer's latest slot is less than starting request's slot + return ColumnSyncRequest.empty(cas.direction, T) + if maxSlot < cas.debtsQueue[0].slot: + # Peer's latest slot is less than finishing request's slot + return ColumnSyncRequest.empty(cas.direction, T) + var sr = cas.debtsQueue.pop() + cas.debtsCount = cas.debtsCount - sr.count + cas.updateRequestForNewSafeSlot(sr) + if sr.isEmpty: + continue + sr.setItem(item) + cas.makePending(sr) + return sr + + case cas.direction + of ColumnSyncerDirection.Forward: + if maxSlot < cas.inpSlot: + # Peer's latest slot is less than queue's input slot + return ColumnSyncRequest.empty(cas.direction, T) + if cas.inpSlot > cas.finalSlot: + # Queue's input slot is bigger than queue's final slot + return ColumnSyncRequest.empty(cas.direction, T) + let lastSlot = min(maxSlot, cas.finalSlot) + let count = min(cas.chunkSize, lastSlot + 1'u64 - cas.inpSlot) + var sr = ColumnSyncRequest.init(cas.direction, cas.inpSlot, count, item) + cas.advanceInput(count) + cas.makePending(sr) + sr + + of ColumnSyncerDirection.Backward: + if cas.inpSlot == 0xFFFF_FFFF_FFFF_FFFF'u64: + return ColumnSyncRequest.empty(cas.direction, T) + if cas.inpSlot < cas.finalSlot: + return ColumnSyncRequest.empty(cas.direction, T) + let (slot, count) = + block: + let baseSlot = cas.inpSlot + 1'u64 + if baseSlot - cas.finalSlot < cas.chunkSize: + let count = uint64(baseSlot - cas.finalSlot) + (baseSlot - count, count) + else: + (baseSlot - cas.chunkSize, cas.chunkSize) + if (maxSlot + 1'u64) < slot + count: + # Peer's latest slot is less than queue's input slot. + return ColumnSyncRequest.empty(cas.direction, T) + var sr = ColumnSyncRequest.init(cas.direction, slot, count, item) + cas.advanceInput(count) + cas.makePending(sr) + sr + +proc debtLen*[T](cas: ColumnSyncerAssist[T]): uint64 = + cas.debtsCount + +proc pendingLen*[T](cas: ColumnSyncerAssist[T]): uint64 {.inline.} = + ## Returns total number of slots in queue + case cas.direction + of ColumnSyncerDirection.Forward: + # When moving forward `outSlot` will be <= of `inpSlot` + cas.inpSlot - cas.outSlot + of ColumnSyncerDirection.Backward: + # When moving backward `outSlot` will be >= of `outSlot` + cas.outSlot - cas.inpSlot + +proc len*[T](cas: ColumnSyncerAssist[T]): uint64 {.inline.} = + ## Returns number of slots left in queue + case cas.direction + of ColumnSyncerDirection.Forward: + if cas.finalSlot >= cas.outSlot: + cas.finalSlot + 1'u64 - cas.outSlot + else: + 0'u64 + of ColumnSyncerDirection.Backward: + if cas.outSlot >= cas.finalSlot: + cas.outSlot + 1'u64 - cas.finalSlot + else: + 0'u64 + +proc total*[T](cas: ColumnSyncerAssist[T]): uint64 {.inline.} = + ## Returns total number of slots in queue + case cas.direction + of ColumnSyncerDirection.Forward: + if cas.finalSlot >= cas.startSlot: + cas.finalSlot + 1'u64 - cas.startSlot + else: + 0'u64 + of ColumnSyncerDirection.Backward: + if cas.startSlot >= cas.finalSlot: + cas.startSlot + 1'u64 - cas.finalSlot + else: + 0'u64 + +proc progress*[T](cas: ColumnSyncerAssist[T]): uint64 = + ## How many useful slots we've synced so far, adjusting for how much has + ## become obsolete by time movements + cas.total - cas.len diff --git a/beacon_chain/sync/sync_queue.nim b/beacon_chain/sync/sync_queue.nim index 272935f78c..fed490a3be 100644 --- a/beacon_chain/sync/sync_queue.nim +++ b/beacon_chain/sync/sync_queue.nim @@ -73,9 +73,9 @@ type data: SyncRange item: T - RewindPoint = object - failSlot: Slot - epochCount: uint64 + RewindPoint* = object + failSlot*: Slot + epochCount*: uint64 SyncQueue*[T] = ref object kind*: SyncQueueKind