Skip to content

Commit 33ede17

Browse files
replay: queueing exec slices in the replay tile
1 parent 568fb30 commit 33ede17

File tree

2 files changed

+95
-70
lines changed

2 files changed

+95
-70
lines changed

src/discof/replay/fd_replay_tile.c

Lines changed: 95 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@
3939
#include <sys/types.h>
4040
#include <unistd.h>
4141

42+
#define DEQUE_NAME fd_exec_slice
43+
#define DEQUE_T ulong
44+
#define DEQUE_MAX 1024UL
45+
#include "../../util/tmpl/fd_deque.c"
46+
4247
/* An estimate of the max number of transactions in a block. If there are more
4348
transactions, they must be split into multiple sets. */
4449
#define MAX_TXNS_PER_REPLAY ( ( FD_SHRED_MAX_PER_SLOT * FD_SHRED_MAX_SZ) / FD_TXN_MIN_SERIALIZED_SZ )
@@ -316,6 +321,8 @@ struct fd_replay_tile_ctx {
316321

317322
/* Metrics */
318323
fd_replay_tile_metrics_t metrics;
324+
325+
ulong * exec_slice_deque; /* Deque to buffer exec slices */
319326
};
320327
typedef struct fd_replay_tile_ctx fd_replay_tile_ctx_t;
321328

@@ -493,8 +500,12 @@ during_frag( fd_replay_tile_ctx_t * ctx,
493500
uchar * src = (uchar *)fd_chunk_to_laddr( ctx->batch_in_mem, chunk );
494501
fd_memcpy( ctx->slot_ctx->slot_bank.epoch_account_hash.uc, src, sizeof(fd_hash_t) );
495502
FD_LOG_NOTICE(( "Epoch account hash calculated to be %s", FD_BASE58_ENC_32_ALLOCA( ctx->slot_ctx->slot_bank.epoch_account_hash.uc ) ));
496-
} else if ( in_idx==STORE_IN_IDX ) {
497-
return;
503+
} else if ( in_idx==STORE_IN_IDX ) {
504+
/* At this point we have been notified by the store tile that a
505+
slice is ready to be executed. If the replay tile is not ready
506+
to execute this slice (e.g. if it is executing another slice),
507+
then we will queue the notification to be processed later. */
508+
fd_exec_slice_push_tail( ctx->exec_slice_deque, sig );
498509
}
499510
// if( ctx->flags & REPLAY_FLAG_PACKED_MICROBLOCK ) {
500511
// /* We do not know the parent slot, pick one from fork selection */
@@ -1316,7 +1327,8 @@ init_poh( fd_replay_tile_ctx_t * ctx ) {
13161327
}
13171328

13181329
static void
1319-
prepare_first_batch_execution( fd_replay_tile_ctx_t * ctx, fd_stem_context_t * stem ){
1330+
prepare_first_batch_execution( fd_replay_tile_ctx_t * ctx, fd_stem_context_t * stem ) {
1331+
13201332
ulong curr_slot = ctx->curr_slot;
13211333
ulong parent_slot = ctx->parent_slot;
13221334
ulong flags = ctx->flags;
@@ -1357,7 +1369,7 @@ prepare_first_batch_execution( fd_replay_tile_ctx_t * ctx, fd_stem_context_t * s
13571369
}
13581370

13591371
fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &curr_slot, NULL, ctx->forks->pool );
1360-
if( fork == NULL ) {
1372+
if( fork==NULL ) {
13611373
fork = prepare_new_block_execution( ctx, stem, curr_slot, flags );
13621374
} else {
13631375
FD_LOG_WARNING(("Fork for slot %lu already exists, so we don't make a new one. Restarting execution from batch %u", curr_slot, fork->end_idx ));
@@ -1529,75 +1541,84 @@ exec_slice( fd_replay_tile_ctx_t * ctx,
15291541
}
15301542
}
15311543

1544+
static void
1545+
handle_slice( fd_replay_tile_ctx_t * ctx,
1546+
fd_stem_context_t * stem ) {
1547+
1548+
if( fd_exec_slice_cnt( ctx->exec_slice_deque )==0UL ) {
1549+
return;
1550+
}
1551+
1552+
ulong sig = fd_exec_slice_pop_head( ctx->exec_slice_deque );
1553+
1554+
if( FD_UNLIKELY( ctx->flags!=EXEC_FLAG_READY_NEW ) ) {
1555+
FD_LOG_ERR(( "Replay is in unexpected state" ));
1556+
}
1557+
1558+
ulong slot = fd_disco_repair_replay_sig_slot( sig );
1559+
uint data_cnt = fd_disco_repair_replay_sig_data_cnt( sig );
1560+
ushort parent_off = fd_disco_repair_replay_sig_parent_off( sig );
1561+
int slot_complete = fd_disco_repair_replay_sig_slot_complete( sig );
1562+
1563+
if( FD_UNLIKELY( slot != ctx->curr_slot ) ) {
1564+
/* We need to switch forks and execution contexts. Either we
1565+
completed execution of the previous slot and are now executing
1566+
a new slot or we are interleaving batches from different slots
1567+
- all executable at the fork frontier.
1568+
1569+
Going to need to query the frontier for the fork, or create it
1570+
if its not on the frontier. I think
1571+
prepare_first_batch_execution already handles this logic. */
1572+
1573+
ctx->curr_slot = slot;
1574+
ctx->parent_slot = slot - parent_off;
1575+
prepare_first_batch_execution( ctx, stem );
1576+
} else {
1577+
/* continuing execution of the slot we have been doing */
1578+
}
1579+
1580+
/* Prepare batch for execution on following after_credit iteration */
1581+
ctx->flags = EXEC_FLAG_EXECUTING_SLICE;
1582+
fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &slot, NULL, ctx->forks->pool );
1583+
ulong slice_sz;
1584+
uint start_idx = fork->end_idx + 1;
1585+
int err = fd_blockstore_slice_query( ctx->slot_ctx->blockstore,
1586+
slot,
1587+
start_idx,
1588+
start_idx + data_cnt - 1,
1589+
FD_SLICE_MAX,
1590+
ctx->mbatch,
1591+
&slice_sz );
1592+
fork->end_idx += data_cnt;
1593+
ctx->slice_exec_ctx.sz = slice_sz;
1594+
ctx->slice_exec_ctx.last_batch = slot_complete;
1595+
ctx->slice_exec_ctx.txns_rem = 0;
1596+
ctx->slice_exec_ctx.mblks_rem = FD_LOAD( ulong, ctx->mbatch );
1597+
ctx->slice_exec_ctx.wmark = sizeof(ulong);
1598+
ctx->slice_exec_ctx.last_mblk_off = 0;
1599+
1600+
if( FD_UNLIKELY( err ) ) {
1601+
__asm__("int $3");
1602+
FD_LOG_ERR(( "Failed to query blockstore for slot %lu", slot ));
1603+
}
1604+
}
1605+
15321606
static void
15331607
after_frag( fd_replay_tile_ctx_t * ctx,
1534-
ulong in_idx,
1608+
ulong in_idx FD_PARAM_UNUSED,
15351609
ulong seq,
15361610
ulong sig FD_PARAM_UNUSED,
15371611
ulong sz FD_PARAM_UNUSED,
15381612
ulong tsorig,
15391613
ulong tspub FD_PARAM_UNUSED,
15401614
fd_stem_context_t * stem FD_PARAM_UNUSED ) {
1615+
1616+
(void)in_idx;
15411617
(void)sig;
15421618
(void)sz;
15431619

15441620
if( FD_UNLIKELY( ctx->skip_frag ) ) return;
15451621

1546-
/* If we reach the current point, this means that we are ready to
1547-
process another batch. See after_credit; if we are mid-execution of
1548-
a slice, we will not poll in for a new frag (which signals the next
1549-
batch). Only when we complete execution of batch is when we poll
1550-
for next frag. */
1551-
if( FD_LIKELY( in_idx == STORE_IN_IDX ) ) {
1552-
FD_TEST( ctx->flags == EXEC_FLAG_READY_NEW );
1553-
ulong slot = fd_disco_repair_replay_sig_slot( sig );
1554-
uint data_cnt = fd_disco_repair_replay_sig_data_cnt( sig );
1555-
ushort parent_off = fd_disco_repair_replay_sig_parent_off( sig );
1556-
int slot_complete = fd_disco_repair_replay_sig_slot_complete( sig );
1557-
1558-
if( FD_UNLIKELY( slot != ctx->curr_slot ) ) {
1559-
/* We need to switch forks and execution contexts. Either we
1560-
completed execution of the previous slot and are now executing
1561-
a new slot or we are interleaving batches from different slots
1562-
- all executable at the fork frontier.
1563-
1564-
Going to need to query the frontier for the fork, or create it
1565-
if its not on the frontier. I think
1566-
prepare_first_batch_execution already handles this logic. */
1567-
1568-
ctx->curr_slot = slot;
1569-
ctx->parent_slot = slot - parent_off;
1570-
prepare_first_batch_execution( ctx, stem );
1571-
} else {
1572-
/* continuing execution of the slot we have been doing */
1573-
}
1574-
1575-
/* Prepare batch for execution on following after_credit iteration */
1576-
ctx->flags = EXEC_FLAG_EXECUTING_SLICE;
1577-
fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &slot, NULL, ctx->forks->pool );
1578-
ulong slice_sz;
1579-
uint start_idx = fork->end_idx + 1;
1580-
int err = fd_blockstore_slice_query( ctx->slot_ctx->blockstore,
1581-
slot,
1582-
start_idx,
1583-
start_idx + data_cnt - 1,
1584-
FD_SLICE_MAX,
1585-
ctx->mbatch,
1586-
&slice_sz );
1587-
fork->end_idx += data_cnt;
1588-
ctx->slice_exec_ctx.sz = slice_sz;
1589-
ctx->slice_exec_ctx.last_batch = slot_complete;
1590-
ctx->slice_exec_ctx.txns_rem = 0;
1591-
ctx->slice_exec_ctx.mblks_rem = FD_LOAD( ulong, ctx->mbatch );
1592-
ctx->slice_exec_ctx.wmark = sizeof(ulong);
1593-
ctx->slice_exec_ctx.last_mblk_off = 0;
1594-
1595-
if( FD_UNLIKELY( err ) ) {
1596-
__asm__("int $3");
1597-
FD_LOG_ERR(( "Failed to query blockstore for slot %lu", slot ));
1598-
}
1599-
}
1600-
16011622
/**********************************************************************/
16021623
/* The rest of after_frag replays some microblocks in block curr_slot */
16031624
/**********************************************************************/
@@ -1939,7 +1960,6 @@ init_after_snapshot( fd_replay_tile_ctx_t * ctx ) {
19391960
ctx->runtime_spad );
19401961
fd_funk_end_write( ctx->slot_ctx->acc_mgr->funk );
19411962
FD_LOG_NOTICE(( "finished fd_bpf_scan_and_create_bpf_program_cache_entry..." ));
1942-
19431963
}
19441964

19451965
ctx->curr_slot = snapshot_slot;
@@ -2121,16 +2141,15 @@ after_credit( fd_replay_tile_ctx_t * ctx,
21212141
int * charge_busy ) {
21222142
(void)opt_poll_in;
21232143

2124-
if( ctx->flags & EXEC_FLAG_EXECUTING_SLICE ){
2125-
exec_slice( ctx, stem, ctx->curr_slot );
2144+
/* If we are ready to process a new slice, we will poll for it and try
2145+
to setup execution for it. */
2146+
if( ctx->flags & EXEC_FLAG_READY_NEW ) {
2147+
handle_slice( ctx, stem );
21262148
}
21272149

2128-
// if we are still mid-execution of this slice then
2129-
if( ctx->flags & EXEC_FLAG_EXECUTING_SLICE ){
2130-
/* We are not ready to poll for a new frag. Leave it in mcache until
2131-
we are ready */
2132-
*opt_poll_in = 0;
2133-
return;
2150+
/* If we are in a state where we are executing a slice, proceed. */
2151+
if( ctx->flags & EXEC_FLAG_EXECUTING_SLICE ) {
2152+
exec_slice( ctx, stem, ctx->curr_slot );
21342153
}
21352154

21362155
ulong curr_slot = ctx->curr_slot;
@@ -2918,6 +2937,13 @@ unprivileged_init( fd_topo_t * topo,
29182937
ctx->fecs_inserted = 0UL;
29192938
ctx->fecs_removed = 0UL;
29202939
FD_TEST( ctx->replay_public!=NULL );
2940+
2941+
uchar * deque_mem = fd_spad_alloc( ctx->runtime_spad, fd_exec_slice_align(), fd_exec_slice_footprint() );
2942+
ctx->exec_slice_deque = fd_exec_slice_join( fd_exec_slice_new( deque_mem ) );
2943+
if( FD_UNLIKELY( !ctx->exec_slice_deque ) ) {
2944+
FD_LOG_ERR(( "failed to join and create exec slice deque" ));
2945+
}
2946+
29212947
}
29222948

29232949
static ulong

src/flamenco/runtime/sysvar/fd_sysvar_slot_hashes.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ void
3737
fd_sysvar_slot_hashes_update( fd_exec_slot_ctx_t * slot_ctx, fd_spad_t * runtime_spad ) {
3838
fd_slot_hashes_t * slot_hashes = fd_sysvar_slot_hashes_read( slot_ctx, runtime_spad );
3939
if( !slot_hashes ) {
40-
FD_LOG_WARNING(("NEVER CREATED THIS MF BEFORE"));
4140
uchar * deque_mem = fd_spad_alloc( runtime_spad,
4241
deq_fd_slot_hash_t_align(),
4342
deq_fd_slot_hash_t_footprint( FD_SYSVAR_SLOT_HASHES_CAP ) );

0 commit comments

Comments
 (0)