21
21
export phase0, altair, merge, chronos, chronicles, results,
22
22
helpers, peer_scores, sync_queue, forks, sync_protocol
23
23
24
- logScope:
25
- topics = " columnsync"
26
-
27
24
const
28
25
ColumnSyncWorkerCount * = 15
29
26
# # Number of workers to spawn for column syncing
63
60
MAX_BLOBS_PER_BLOCK_ELECTRA : uint64
64
61
responseTimeout: chronos.Duration
65
62
maxHeadAge: uint64
63
+ isWithinWeakSubjectivityPeriod: GetBoolCallback
66
64
assist* : ColumnSyncerAssist [A]
67
65
getLocalHeadSlot: GetSlotCallback
68
66
getLocalWallSlot: GetSlotCallback
@@ -146,6 +144,7 @@ proc newColumnManager*[A, B](
146
144
getFinalizedSlotCb: GetSlotCallback ,
147
145
getBackfillSlotCb: GetSlotCallback ,
148
146
getFrontfillSlotCb: GetSlotCallback ,
147
+ weakSubjectivityPeriodCb: GetBoolCallback ,
149
148
progressPivot: Slot ,
150
149
peerdasBlockVerifier: PeerdasBlockVerifier ,
151
150
shutdownEvent: AsyncEvent ,
@@ -173,6 +172,7 @@ proc newColumnManager*[A, B](
173
172
MAX_BLOBS_PER_BLOCK_ELECTRA : maxBlobsPerBlockElectra,
174
173
getLocalHeadSlot: getLocalHeadSlotCb,
175
174
getLocalWallSlot: getLocalWallSlotCb,
175
+ isWithinWeakSubjectivityPeriod: weakSubjectivityPeriodCb,
176
176
getSafeSlot: getSafeSlot,
177
177
getFirstSlot: getFirstSlot,
178
178
getLastSlot: getLastSlot,
@@ -194,22 +194,21 @@ proc fetchBlocksForColumnNavigation[A, B](man: ColumnManager[A, B], peer: A,
194
194
{.async : (raises: [CancelledError ], raw: true ).} =
195
195
mixin getScore, `==`
196
196
197
- logScope:
198
- peer_score = peer.getScore ()
199
- peer_speed = peer.netKbps ()
200
- direction = man.direction
201
- topics = " columnsync"
197
+ debugEcho " fetching block for column navigation"
202
198
203
199
doAssert (not (req.isEmpty ()), " Request must not be empty!" )
204
200
debug " Requesting blocks from peer" ,
205
201
peer_score = req.item.getScore (),
206
- peer_speed = req.item.netKbps ()
202
+ peer_speed = req.item.netKbps (),
203
+ topics = " columnsync"
207
204
208
205
beaconBlocksByRange_v2 (peer, req.slot, req.count, 1 'u64 )
209
206
210
207
proc shouldGetDataColumns [A, B](
211
208
man: ColumnManager [A, B],
212
209
s: Slot ): bool =
210
+
211
+ debugEcho " checking if we should get data columns in the current fork"
213
212
let
214
213
wallEpoch = man.getLocalWallSlot ().epoch
215
214
epoch = s.epoch ()
@@ -224,6 +223,7 @@ proc shouldGetDataColumns[A, B](
224
223
225
224
proc checkDataColumns (data_columns: seq [DataColumnSidecars ]):
226
225
Result [void , string ] =
226
+ debugEcho " verifying data columns from column syncer"
227
227
for data_column_sidecars in data_columns:
228
228
for data_column_sidecar in data_column_sidecars:
229
229
? data_column_sidecar[].verify_data_column_sidecar_inclusion_proof ()
@@ -235,6 +235,7 @@ proc checkDataColumns(data_columns: seq[DataColumnSidecars]):
235
235
proc intersectionColumns [A, B](
236
236
man: ColumnManager [A, B],
237
237
peer: A): List [ColumnIndex , NUMBER_OF_COLUMNS ] =
238
+ debugEcho " computing interesecting columns for the current peer"
238
239
let
239
240
remoteNodeId =
240
241
fetchNodeIdFromPeerId (peer)
@@ -291,13 +292,8 @@ proc getDataColumnSidecarsByRange[A, B](man: ColumnManager[A, B],
291
292
{.async : (raises: [CancelledError ], raw: true ).} =
292
293
mixin getScore, `==`
293
294
294
- logScope:
295
- peer_score = peer.getScore ()
296
- peer_speed = peer.netKbps ()
297
- topics = " columnsync"
298
-
299
295
doAssert (not (r.isEmpty ()), " Request must not be empty" )
300
- debug " Requesting data column sidecars from peer" ,
296
+ debug " Requesting data column sidecars by range from peer" ,
301
297
topics = " columnsync"
302
298
dataColumnSidecarsByRange (peer, r.slot, r.count, req_cols)
303
299
@@ -324,6 +320,9 @@ proc filterRelevantPeers[A, B](man: ColumnManager[A, B],
324
320
# # and returns a refreshed peer list based on
325
321
# # whichever's peer status is recent and relevant
326
322
# #
323
+
324
+
325
+
327
326
var
328
327
refreshed_peer_set: seq [A]
329
328
for peer in peers:
@@ -530,20 +529,13 @@ proc serializeColumnTable*[A, B](
530
529
proc columnSyncStrategyImpartial [A, B](
531
530
man: ColumnManager [A, B], index: int , peer: A
532
531
) {.async : (raises: [CancelledError ]).} =
533
- logScope:
534
- peer_score = peer.getScore ()
535
- peer_speed = peer.netKbps ()
536
- index = index
537
532
538
533
var
539
534
headSlot = man.getLocalHeadSlot ()
540
535
wallSlot = man.getLocalWallSlot ()
541
536
peerSlot = peer.getHeadSlot ()
542
537
543
538
block :
544
- logScope:
545
- peer = peer
546
-
547
539
debug " Peer's syncing status" , wall_clock_slot = wallSlot,
548
540
remote_head_slot = peerSlot, local_head_slot = headSlot,
549
541
direction = man.direction, topics = " columnsync"
@@ -592,10 +584,7 @@ proc columnSyncStrategyImpartial[A, B](
592
584
wallSlot = man.getLocalWallSlot ()
593
585
594
586
if man.remainingSlots () <= man.maxHeadAge:
595
- logScope:
596
- peer = peer
597
-
598
- info " We are in sync with the network" , wall_clock_slot = wallSlot,
587
+ info " Column syncing is completed" , wall_clock_slot = wallSlot,
599
588
remote_head_slot = peerSlot, local_head_slot = headSlot,
600
589
direction = man.direction, topics = " columnsync"
601
590
@@ -629,7 +618,8 @@ proc columnSyncStrategyImpartial[A, B](
629
618
return
630
619
631
620
debug " Creating new request for peer" , wall_clock_slot = wallSlot,
632
- remote_head_slot = peerSlot, local_head_slot = headSlot
621
+ remote_head_slot = peerSlot, local_head_slot = headSlot,
622
+ topics = " columnsync"
633
623
634
624
man.workers[index].status = ColumnSyncerStatus .Downloading
635
625
@@ -650,7 +640,7 @@ proc columnSyncStrategyImpartial[A, B](
650
640
man.assist.push (req)
651
641
warn " Incorrect blocks sequence received" ,
652
642
blocks_count = len (blockData),
653
- reason = error, topics = columnsync
643
+ reason = error, topics = " columnsync"
654
644
return
655
645
656
646
let shouldGetDataColumns =
@@ -1030,7 +1020,8 @@ proc startColumnSyncWorkers[A, B](man: ColumnManager[A, B]) =
1030
1020
man.workers[i].future =
1031
1021
columnSyncWorkerImparital [A, B](man, i)
1032
1022
1033
- proc stopColumnSyncWorkers [A, B](man: ColumnManager [A, B]) =
1023
+ proc stopColumnSyncWorkers [A, B](man: ColumnManager [A, B])
1024
+ {.async : (raises: []).} =
1034
1025
# Cancelling all the column sync workers
1035
1026
let pending = man.workers.mapIt (it.future.cancelAndWait ())
1036
1027
await noCancel allFutures (pending)
@@ -1077,13 +1068,10 @@ proc columnSyncLoop[A, B](
1077
1068
man: ColumnManager [A, B]
1078
1069
) {.async : (raises: [CancelledError ]).} =
1079
1070
1080
- logScope:
1081
- direction = man.direction
1082
- topics = " columnsync"
1083
-
1084
1071
mixin getKey, getScore
1085
1072
var pauseTime = 0
1086
1073
1074
+ man.initColumnSyncerAssist ()
1087
1075
man.startColumnSyncWorkers ()
1088
1076
1089
1077
debug " Column sync loop has started" ,
@@ -1201,6 +1189,14 @@ proc columnSyncLoop[A, B](
1201
1189
man.avgSyncSpeed.formatBiggestFloat (ffDecimal, 4 ) &
1202
1190
" slots/s (" & map & " :" & currentSlot & " )"
1203
1191
1192
+ if (man.assist.direction == ColumnSyncerDirection .Forward ) and
1193
+ (ColumnSyncerMode .NoGenesisSync in man.modes):
1194
+ if not (man.isWithinWeakSubjectivityPeriod ()):
1195
+ fatal WeakSubjectivityLogMessage , current_slot = wallSlot
1196
+ await man.stopColumnSyncWorkers ()
1197
+ man.shutdownEvent.fire ()
1198
+ return
1199
+
1204
1200
if man.remainingSlots () <= man.maxHeadAge:
1205
1201
man.notInSyncEvent.clear ()
1206
1202
# We are marking SyncManager as not working only when we are in sync and
0 commit comments