@@ -240,6 +240,7 @@ struct fd_replay_tile_ctx {
240
240
241
241
ulong fecs_inserted ;
242
242
ulong fecs_removed ;
243
+ fd_shred_t shred [1 ];
243
244
/* Other metadata */
244
245
245
246
ulong funk_seed ;
@@ -430,13 +431,27 @@ publish_stake_weights( fd_replay_tile_ctx_t * ctx,
430
431
431
432
/* Polls the blockstore block info object for newly completed slices of
432
433
slot. Adds it to the tail of slice_deque (which should be the
433
- slice_deque object of the slot, slice_map[slot]) */
434
+ slice_deque object of the slot, slice_map[slot]). Currently used as
435
+ a fallback, as there are shreds that pass through repair but not shred
436
+ tile before getting added to blockstore, so replay doesn't know of
437
+ their existence until they poll the block info itself. Typically will
438
+ be used heavily during repair / catching up, and only occasionally
439
+ after we are caught up / executing turbine shreds. */
434
440
435
441
int
436
- slice_poll ( fd_replay_tile_ctx_t * ctx ,
442
+ slice_poll_block_info ( fd_replay_tile_ctx_t * ctx ,
437
443
fd_replay_slice_t * slice_deque ,
438
444
ulong slot ) {
439
445
uint consumed_idx , slices_added ;
446
+
447
+ fd_replay_idxs_t * shred_idxs = fd_replay_idxs_map_query ( ctx -> replay -> idxs_map , slot , NULL );
448
+ uint turbine_shred_wmark ;
449
+ if ( !shred_idxs ) {
450
+ turbine_shred_wmark = UINT_MAX ;
451
+ } else {
452
+ turbine_shred_wmark = (uint )shred_idxs -> wmark ;
453
+ }
454
+
440
455
for (;;) { /* speculative query */
441
456
fd_block_map_query_t query [1 ] = { 0 };
442
457
int err = fd_block_map_query_try ( ctx -> blockstore -> block_map , & slot , NULL , query , 0 );
@@ -445,7 +460,7 @@ slice_poll( fd_replay_tile_ctx_t * ctx,
445
460
if ( FD_UNLIKELY ( err == FD_MAP_ERR_KEY ) ) return 0 ;
446
461
if ( FD_UNLIKELY ( err == FD_MAP_ERR_AGAIN ) ) continue ;
447
462
448
- consumed_idx = block_info -> consumed_idx ;
463
+ consumed_idx = fd_uint_min ( block_info -> consumed_idx , turbine_shred_wmark ) ;
449
464
slices_added = 0 ;
450
465
451
466
if ( FD_UNLIKELY ( block_info -> buffered_idx == UINT_MAX ) ) return 1 ;
@@ -454,7 +469,7 @@ slice_poll( fd_replay_tile_ctx_t * ctx,
454
469
if ( FD_UNLIKELY ( fd_block_set_test ( block_info -> data_complete_idxs , idx ) ) ) {
455
470
slices_added ++ ;
456
471
fd_replay_slice_deque_push_tail ( slice_deque -> deque , ((ulong )(consumed_idx + 1 ) << 32 ) | ((ulong )idx ) );
457
- FD_LOG_INFO (( "adding slice replay: slot %lu, slice start: %u, slice end: %u" , slot , consumed_idx + 1 , idx ));
472
+ FD_LOG_INFO (( "adding slice replay from repair : slot %lu, slice start: %u, slice end: %u" , slot , consumed_idx + 1 , idx ));
458
473
consumed_idx = idx ;
459
474
}
460
475
}
@@ -476,6 +491,33 @@ slice_poll( fd_replay_tile_ctx_t * ctx,
476
491
return 0 ;
477
492
}
478
493
494
+ ulong
495
+ slice_poll_shred_idxs ( fd_replay_tile_ctx_t * ctx ,
496
+ fd_replay_slice_t * slice_deque ,
497
+ ulong slot ) {
498
+ fd_replay_idxs_t * shred_idxs = fd_replay_idxs_map_query ( ctx -> replay -> idxs_map , slot , NULL );
499
+ ulong slices_added = 0 ;
500
+ for ( ulong idx = shred_idxs -> wmark + 1 ; idx < FD_SHRED_MAX_PER_SLOT ; idx ++ ) {
501
+ if ( !fd_replay_idxs_set_test ( shred_idxs -> shred_received_idxs , idx ) ) {
502
+ /* Contiguous shred not recieved; exit now.
503
+ There may be ways to exit even earlier; */
504
+ return slices_added ;
505
+ }
506
+ if ( FD_UNLIKELY ( fd_replay_idxs_set_test ( shred_idxs -> data_completes_idxs , idx ) ) ) {
507
+ /* Batch completed */
508
+ slices_added ++ ;
509
+ fd_replay_slice_deque_push_tail ( slice_deque -> deque , ((ulong )(shred_idxs -> wmark + 1 ) << 32 ) | ((ulong )idx ) );
510
+ FD_LOG_INFO (( "adding slice replay: slot %lu, slice start: %lu, slice end: %lu" , slot , shred_idxs -> wmark + 1 , idx ));
511
+ shred_idxs -> wmark = idx ;
512
+ }
513
+ }
514
+ /* TODO: how do I know if it's done to evict from the map? We need
515
+ slot complete idx maybe. */
516
+ return slices_added ;
517
+ }
518
+
519
+
520
+
479
521
static int
480
522
before_frag ( fd_replay_tile_ctx_t * ctx ,
481
523
ulong in_idx ,
@@ -484,6 +526,10 @@ before_frag( fd_replay_tile_ctx_t * ctx,
484
526
(void )ctx ;
485
527
(void )seq ;
486
528
529
+ if ( in_idx == STORE_IN_IDX ) {
530
+ FD_LOG_WARNING ((" RECEIVE STORE MESSAGE: slot %lu" , fd_disco_replay_old_sig_slot ( sig )));
531
+ }
532
+
487
533
if ( in_idx == SHRED_IN_IDX ) {
488
534
// FD_LOG_NOTICE(( "shred in idx: %lu, seq: %lu, sig: %lu", in_idx, seq, sig ));
489
535
@@ -505,7 +551,8 @@ before_frag( fd_replay_tile_ctx_t * ctx,
505
551
fd_replay_idxs_t * idxs = fd_replay_idxs_map_query ( ctx -> replay -> idxs_map , slot , NULL );
506
552
if ( FD_UNLIKELY ( !idxs ) ) {
507
553
idxs = fd_replay_idxs_map_insert ( ctx -> replay -> idxs_map , slot );
508
- idxs -> wmark = 0 ;
554
+ idxs -> wmark = ULONG_MAX ;
555
+ idxs -> slot = slot ;
509
556
fd_replay_idxs_set_null ( idxs -> shred_received_idxs );
510
557
fd_replay_idxs_set_null ( idxs -> data_completes_idxs );
511
558
}
@@ -519,7 +566,10 @@ before_frag( fd_replay_tile_ctx_t * ctx,
519
566
520
567
fd_replay_fec_remove ( ctx -> replay , slot , fec_set_idx );
521
568
ctx -> fecs_removed ++ ;
522
- slice_poll ( ctx , slice_deque , slot );
569
+ if ( FD_UNLIKELY ( data_completes ) ) {
570
+ /* Batch complete flag, potential to get an executable slice. */
571
+ slice_poll_shred_idxs ( ctx , slice_deque , slot );
572
+ }
523
573
return 0 ; /* don't skip - contains merkle root and chained merkle root */
524
574
}
525
575
@@ -531,7 +581,7 @@ before_frag( fd_replay_tile_ctx_t * ctx,
531
581
532
582
fd_replay_fec_t * fec = fd_replay_fec_query ( ctx -> replay , slot , fec_set_idx );
533
583
if ( FD_UNLIKELY ( !fec ) ) { /* first time receiving a shred for this FEC set */
534
- FD_LOG_NOTICE (( "inserting FEC set %u from slot %lu" , fec_set_idx , slot ));
584
+ FD_LOG_INFO (( "inserting FEC set %u into slot %lu" , fec_set_idx , slot ));
535
585
fec = fd_replay_fec_insert ( ctx -> replay , slot , fec_set_idx );
536
586
ctx -> fecs_inserted ++ ;
537
587
/* TODO implement eviction */
@@ -557,8 +607,6 @@ during_frag( fd_replay_tile_ctx_t * ctx,
557
607
ulong chunk ,
558
608
ulong sz ,
559
609
ulong ctl FD_PARAM_UNUSED ) {
560
- return ;
561
-
562
610
ctx -> skip_frag = 0 ;
563
611
564
612
if ( in_idx == STORE_IN_IDX ) {
@@ -572,6 +620,7 @@ during_frag( fd_replay_tile_ctx_t * ctx,
572
620
Microblock as a list of fd_txn_p_t (sz * sizeof(fd_txn_p_t)) */
573
621
574
622
ctx -> curr_slot = fd_disco_replay_old_sig_slot ( sig );
623
+ FD_LOG_WARNING (("store sent slot %lu" , ctx -> curr_slot ));
575
624
/* slot changes */
576
625
if ( FD_UNLIKELY ( ctx -> curr_slot < fd_fseq_query ( ctx -> published_wmark ) ) ) {
577
626
FD_LOG_WARNING (( "store sent slot %lu before our root." , ctx -> curr_slot ));
@@ -631,8 +680,8 @@ during_frag( fd_replay_tile_ctx_t * ctx,
631
680
if ( FD_UNLIKELY ( chunk < shred_in -> chunk0 || chunk > shred_in -> wmark || sz > sizeof (fd_shred34_t ) ) ) {
632
681
FD_LOG_ERR (( "chunk %lu %lu corrupt, not in range [%lu,%lu]" , chunk , sz , shred_in -> chunk0 , shred_in -> wmark ));
633
682
}
634
- // uchar * src = (uchar *)fd_chunk_to_laddr( shred_in->mem, chunk );
635
- // fd_memcpy( (uchar *)ctx->shred, src, sz ); /* copy the hdr to read the code_cnt & data_cnt */
683
+ uchar * src = (uchar * )fd_chunk_to_laddr ( shred_in -> mem , chunk );
684
+ fd_memcpy ( (uchar * )ctx -> shred , src , sz ); /* copy the hdr to read the code_cnt & data_cnt */
636
685
637
686
ctx -> skip_frag = 1 ;
638
687
@@ -1690,7 +1739,7 @@ exec_slices( fd_replay_tile_ctx_t * ctx,
1690
1739
1691
1740
if ( ctx -> last_completed_slot != slot && fd_replay_slice_deque_cnt ( slice -> deque ) == 0 ) {
1692
1741
FD_LOG_INFO (( "Failed to query slice deque for slot %lu. Likely shreds were recieved through repair. Manually adding." , slot ));
1693
- slice_poll ( ctx , slice , slot );
1742
+ slice_poll_block_info ( ctx , slice , slot );
1694
1743
}
1695
1744
1696
1745
//ulong free_exec_tiles = ctx->exec_cnt;
@@ -1889,12 +1938,10 @@ after_frag( fd_replay_tile_ctx_t * ctx,
1889
1938
(void )sig ;
1890
1939
(void )sz ;
1891
1940
(void )seq ;
1892
- return ;
1941
+ // return;
1893
1942
1894
- /*if( FD_LIKELY( in_idx == SHRED_IN_IDX ) ) {
1895
-
1896
- after_frag only called if it's the first code shred we're
1897
- receiving for the FEC set
1943
+ if ( FD_LIKELY ( in_idx == SHRED_IN_IDX ) ) {
1944
+ /* We are here if any of the slot / shred idx numbers have been overrun */
1898
1945
1899
1946
ulong slot = fd_disco_shred_replay_sig_slot ( sig );
1900
1947
uint fec_set_idx = fd_disco_shred_replay_sig_fec_set_idx ( sig );
@@ -1904,7 +1951,7 @@ after_frag( fd_replay_tile_ctx_t * ctx,
1904
1951
fec -> data_cnt = ctx -> shred -> code .data_cnt ;
1905
1952
1906
1953
return ;
1907
- }*/
1954
+ }
1908
1955
1909
1956
if ( FD_UNLIKELY ( ctx -> skip_frag ) ) return ;
1910
1957
if ( FD_UNLIKELY ( in_idx == STORE_IN_IDX ) ) {
@@ -2445,9 +2492,9 @@ after_credit( fd_replay_tile_ctx_t * ctx,
2445
2492
}
2446
2493
}
2447
2494
2448
- // exec_slices( ctx, stem, ctx->curr_slot );
2495
+ exec_slices ( ctx , stem , ctx -> curr_slot );
2449
2496
2450
- return ;
2497
+ // return;
2451
2498
2452
2499
ulong curr_slot = ctx -> curr_slot ;
2453
2500
ulong parent_slot = ctx -> parent_slot ;
@@ -2710,7 +2757,7 @@ after_credit( fd_replay_tile_ctx_t * ctx,
2710
2757
2711
2758
static void
2712
2759
during_housekeeping ( void * _ctx ) {
2713
- return ;
2760
+ // return;
2714
2761
2715
2762
fd_replay_tile_ctx_t * ctx = (fd_replay_tile_ctx_t * )_ctx ;
2716
2763
0 commit comments