Skip to content

flamenco, choreo, app: Integrate Parallel Funk #4557

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/extra/with-handholding.mk
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ CPPFLAGS+=-DFD_SPAD_USE_HANDHOLDING=1
CPPFLAGS+=-DFD_TOWER_USE_HANDHOLDING=1
CPPFLAGS+=-DFD_TMPL_USE_HANDHOLDING=1
CPPFLAGS+=-DFD_TXN_HANDHOLDING=1
CPPFLAGS+=-DFD_FUNKIER_HANDHOLDING=1
CPPFLAGS+=-DFD_FUNK_HANDHOLDING=1
CPPFLAGS+=-DFD_RUNTIME_ERR_HANDHOLDING=1
2 changes: 1 addition & 1 deletion src/app/fdctl/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ fdctl_obj_footprint( fd_topo_t const * topo,
} else if( FD_UNLIKELY( !strcmp( obj->name, "dbl_buf" ) ) ) {
return fd_dbl_buf_footprint( VAL("mtu") );
} else if( FD_UNLIKELY( !strcmp( obj->name, "funk" ) ) ) {
return fd_funk_footprint();
return fd_funk_footprint( VAL("txn_max"), VAL("rec_max") );
} else if( FD_UNLIKELY( !strcmp( obj->name, "neigh4_hmap" ) ) ) {
return fd_neigh4_hmap_footprint( VAL("ele_max"), VAL("lock_cnt"), VAL("probe_max") );
} else if( FD_UNLIKELY( !strcmp( obj->name, "fib4" ) ) ) {
Expand Down
57 changes: 22 additions & 35 deletions src/app/ledger/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,6 @@ allocator_setup( fd_wksp_t * wksp ) {
void
fd_ledger_main_setup( fd_ledger_args_t * args ) {
fd_flamenco_boot( NULL, NULL );
fd_funk_t * funk = args->funk;

/* Setup capture context */
int has_solcap = args->capture_fpath && args->capture_fpath[0] != '\0';
Expand Down Expand Up @@ -720,9 +719,7 @@ fd_ledger_main_setup( fd_ledger_args_t * args ) {
fd_runtime_update_leaders( args->slot_ctx, args->slot_ctx->slot_bank.slot, args->runtime_spad );
fd_calculate_epoch_accounts_hash_values( args->slot_ctx );

fd_funk_start_write( funk );
fd_bpf_scan_and_create_bpf_program_cache_entry_tpool( args->slot_ctx, args->slot_ctx->funk_txn, args->tpool, args->runtime_spad );
fd_funk_end_write( funk );

/* First, load in the sysvars into the sysvar cache. This is required to
make the StakeHistory sysvar available to the rewards calculation. */
Expand Down Expand Up @@ -910,8 +907,7 @@ init_funk( fd_ledger_args_t * args ) {
}
args->funk = funk;
args->funk_wksp = fd_funk_wksp( funk );
FD_LOG_NOTICE(( "funky at global address 0x%016lx with %lu records", fd_wksp_gaddr_fast( args->funk_wksp, funk ),
fd_funk_rec_cnt( fd_funk_rec_map( funk, args->funk_wksp ) ) ));
FD_LOG_NOTICE(( "funky at global address 0x%016lx", fd_wksp_gaddr_fast( args->funk_wksp, funk ) ));
}

void
Expand Down Expand Up @@ -958,13 +954,7 @@ checkpt( fd_ledger_args_t * args ) {
}
FD_LOG_NOTICE(( "writing funk checkpt %s", args->checkpt_funk ));
unlink( args->checkpt_funk );
#ifdef FD_FUNK_WKSP_PROTECT
fd_wksp_mprotect( args->funk_wksp, 0 );
#endif
int err = fd_wksp_checkpt( args->funk_wksp, args->checkpt_funk, 0666, 0, NULL );
#ifdef FD_FUNK_WKSP_PROTECT
fd_wksp_mprotect( args->funk_wksp, 1 );
#endif
if( err ) {
FD_LOG_ERR(( "funk checkpt failed: error %d", err ));
}
Expand Down Expand Up @@ -1129,7 +1119,7 @@ ingest( fd_ledger_args_t * args ) {
args->exec_spads,
args->exec_spad_cnt,
args->runtime_spad );
FD_LOG_NOTICE(( "imported %lu records from snapshot", fd_funk_rec_cnt( fd_funk_rec_map( funk, fd_funk_wksp( funk ) ) ) ));
FD_LOG_NOTICE(( "imported records from snapshot" ));
}
if( args->incremental ) {
fd_snapshot_load_all( args->incremental,
Expand All @@ -1142,7 +1132,7 @@ ingest( fd_ledger_args_t * args ) {
args->exec_spads,
args->exec_spad_cnt,
args->runtime_spad );
FD_LOG_NOTICE(( "imported %lu records from incremental snapshot", fd_funk_rec_cnt( fd_funk_rec_map( funk, fd_funk_wksp( funk ) ) ) ));
FD_LOG_NOTICE(( "imported records from incremental snapshot" ));
}

if( args->genesis ) {
Expand Down Expand Up @@ -1185,12 +1175,14 @@ ingest( fd_ledger_args_t * args ) {
}
}

#ifdef FD_FUNK_HANDHOLDING
if( args->verify_funk ) {
FD_LOG_NOTICE(( "verifying funky" ));
if( fd_funk_verify( funk ) ) {
FD_LOG_ERR(( "verification failed" ));
}
}
#endif

checkpt( args );

Expand Down Expand Up @@ -1304,22 +1296,19 @@ replay( fd_ledger_args_t * args ) {
/* Check number of records in funk. If rec_cnt == 0, then it can be assumed
that you need to load in snapshot(s). */

ulong rec_cnt = fd_funk_rec_cnt( fd_funk_rec_map( funk, fd_funk_wksp( funk ) ) );
if( !rec_cnt ) {
/* Load in snapshot(s) */
if( args->snapshot ) {
fd_snapshot_load_all( args->snapshot,
args->slot_ctx,
NULL,
args->tpool,
args->verify_acc_hash,
args->check_acc_hash,
FD_SNAPSHOT_TYPE_FULL,
args->exec_spads,
args->exec_spad_cnt,
args->runtime_spad );
FD_LOG_NOTICE(( "imported %lu records from snapshot", fd_funk_rec_cnt( fd_funk_rec_map( funk, fd_funk_wksp( funk ) ) ) ));
}
/* Load in snapshot(s) */
if( args->snapshot ) {
fd_snapshot_load_all( args->snapshot,
args->slot_ctx,
NULL,
args->tpool,
args->verify_acc_hash,
args->check_acc_hash,
FD_SNAPSHOT_TYPE_FULL,
args->exec_spads,
args->exec_spad_cnt,
args->runtime_spad );
FD_LOG_NOTICE(( "imported from snapshot" ));
if( args->incremental ) {
fd_snapshot_load_all( args->incremental,
args->slot_ctx,
Expand All @@ -1331,13 +1320,11 @@ replay( fd_ledger_args_t * args ) {
args->exec_spads,
args->exec_spad_cnt,
args->runtime_spad );
FD_LOG_NOTICE(( "imported %lu records from snapshot", fd_funk_rec_cnt( fd_funk_rec_map( funk, fd_funk_wksp( funk ) ) ) ));
}
if( args->genesis ) {
fd_runtime_read_genesis( args->slot_ctx, args->genesis, args->snapshot != NULL, NULL, args->tpool, args->runtime_spad );
FD_LOG_NOTICE(( "imported from snapshot" ));
}
} else {
FD_LOG_NOTICE(( "found funk with %lu records", rec_cnt ));
}
if( args->genesis ) {
fd_runtime_read_genesis( args->slot_ctx, args->genesis, args->snapshot != NULL, NULL, args->tpool, args->runtime_spad );
}

FD_LOG_NOTICE(( "Used memory in spad after loading in snapshot %lu", args->runtime_spad->mem_used ));
Expand Down
5 changes: 2 additions & 3 deletions src/app/rpcserver/main.c
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
#define _DEFAULT_SOURCE

#include "../../discof/rpcserver/fd_rpc_service.h"
#include "../../funk/fd_funk_filemap.h"

#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
Expand All @@ -13,6 +10,8 @@
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "../../discof/rpcserver/fd_rpc_service.h"
#include "../../funk/fd_funk_filemap.h"

#define SHAM_LINK_CONTEXT fd_rpc_ctx_t
#define SHAM_LINK_STATE fd_replay_notif_msg_t
Expand Down
4 changes: 2 additions & 2 deletions src/choreo/epoch/fd_epoch.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ fd_epoch_init( fd_epoch_t * epoch, fd_epoch_bank_t const * epoch_bank ) {
vote_accounts->vote_accounts_root );
curr;
curr = fd_vote_accounts_pair_t_map_successor( vote_accounts->vote_accounts_pool, curr ) ) {

if( FD_UNLIKELY( curr->elem.stake > 0UL ) ) {

#if FD_EPOCH_USE_HANDHOLDING
FD_TEST( !fd_epoch_voters_query( epoch_voters, curr->elem.key, NULL ) );
FD_TEST( fd_epoch_voters_key_cnt( epoch_voters ) < fd_epoch_voters_key_max( epoch_voters ) );
Expand Down
118 changes: 74 additions & 44 deletions src/choreo/forks/fd_forks.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,14 @@ fd_forks_query_const( fd_forks_t const * forks, ulong slot ) {
// fork->slot_ctx->slot_bank.prev_slot = fork->slot_ctx->slot_bank.slot;
// fork->slot_ctx->slot_bank.slot = curr_slot;

// fork->slot_ctx->status_cache = status_cache;
// fork->slot_ctx.status_cache = status_cache;
// fd_funk_txn_xid_t xid;

// fd_memcpy( xid.uc, blockhash.uc, sizeof( fd_funk_txn_xid_t));
// xid.ul[0] = fork->slot_ctx->slot_bank.slot;
// xid.ul[0] = fork->slot_ctx.slot_bank.slot;
// /* push a new transaction on the stack */
// fd_funk_start_write( funk );
// fork->slot_ctx->funk_txn = fd_funk_txn_prepare( funk, fork->slot_ctx->funk_txn, &xid, 1 );
// fork->slot_ctx.funk_txn = fd_funk_txn_prepare( funk, fork->slot_ctx.funk_txn, &xid, 1 );
// fd_funk_end_write( funk );

// int res = fd_runtime_publish_old_txns( &fork->slot_ctx, capture_ctx );
Expand All @@ -191,64 +191,80 @@ slot_ctx_restore( ulong slot,
fd_acc_mgr_t * acc_mgr,
fd_blockstore_t * blockstore,
fd_exec_epoch_ctx_t * epoch_ctx,
fd_funk_t * funk,
fd_funk_t * funk,
fd_spad_t * runtime_spad,
fd_exec_slot_ctx_t * slot_ctx_out ) {
fd_funk_txn_t * txn_map = fd_funk_txn_map( funk, fd_funk_wksp( funk ) );
fd_funk_txn_map_t txn_map = fd_funk_txn_map( funk, fd_funk_wksp( funk ) );
bool block_exists = fd_blockstore_shreds_complete( blockstore, slot );

FD_LOG_DEBUG( ( "Current slot %lu", slot ) );
if( !block_exists )
FD_LOG_ERR( ( "missing block at slot we're trying to restore" ) );

fd_funk_txn_xid_t xid = { .ul = { slot, slot } };
fd_funk_txn_xid_t xid = { .ul = { slot, slot } };
fd_funk_rec_key_t id = fd_runtime_slot_bank_key();
fd_funk_txn_t * txn = fd_funk_txn_query( &xid, txn_map );
if( !txn ) {
memset( xid.uc, 0, sizeof( fd_funk_txn_xid_t ) );
xid.ul[0] = slot;
txn = fd_funk_txn_query( &xid, txn_map );
for( ; ; ) {
fd_funk_txn_start_read( funk );
fd_funk_txn_t * txn = fd_funk_txn_query( &xid, &txn_map );
if( !txn ) {
FD_LOG_ERR( ( "missing txn, parent slot %lu", slot ) );
memset( xid.uc, 0, sizeof( fd_funk_txn_xid_t ) );
xid.ul[0] = slot;
txn = fd_funk_txn_query( &xid, &txn_map );
if( !txn ) {
FD_LOG_ERR( ( "missing txn, parent slot %lu", slot ) );
}
}
}
fd_funk_rec_t const * rec = fd_funk_rec_query_global( funk, txn, &id, NULL );
if( rec == NULL ) FD_LOG_ERR( ( "failed to read banks record" ) );
void * val = fd_funk_val( rec, fd_funk_wksp( funk ) );
fd_funk_txn_end_read( funk );

uint magic = *(uint *)val;
fd_funk_rec_query_t query[1];
fd_funk_rec_t const * rec = fd_funk_rec_query_try_global( funk, txn, &id, NULL, query );
if( rec == NULL ) FD_LOG_ERR( ( "failed to read banks record" ) );
void * val = fd_funk_val( rec, fd_funk_wksp( funk ) );

fd_bincode_decode_ctx_t decode_ctx = {
.data = (uchar *)val + sizeof(uint),
.dataend = (uchar *)val + fd_funk_val_sz( rec )
};
uint magic = *(uint *)val;

FD_TEST( slot_ctx_out->magic == FD_EXEC_SLOT_CTX_MAGIC );
fd_bincode_decode_ctx_t decode_ctx = {
.data = (uchar *)val + sizeof(uint),
.dataend = (uchar *)val + fd_funk_val_sz( rec )
};

slot_ctx_out->funk_txn = txn;
slot_ctx_out->acc_mgr = acc_mgr;
slot_ctx_out->blockstore = blockstore;
slot_ctx_out->epoch_ctx = epoch_ctx;

if( FD_LIKELY( magic==FD_RUNTIME_ENC_BINCODE ) ) {
ulong total_sz = 0UL;
int err = fd_slot_bank_decode_footprint( &decode_ctx, &total_sz );
if( FD_UNLIKELY( err != FD_BINCODE_SUCCESS ) ) {
FD_LOG_ERR( ( "failed to decode banks record" ) );
if( slot_ctx_out == NULL || slot_ctx_out->magic != FD_EXEC_SLOT_CTX_MAGIC ) {
FD_LOG_WARNING(( "bad slot context" ));
continue;
}

uchar * mem = fd_spad_alloc( runtime_spad, fd_slot_bank_align(), total_sz );
if( FD_UNLIKELY( !mem ) ) {
FD_LOG_ERR( ( "failed to allocate memory for slot bank" ) );
}
FD_TEST( slot_ctx_out->magic == FD_EXEC_SLOT_CTX_MAGIC );

slot_ctx_out->funk_txn = txn;
slot_ctx_out->acc_mgr = acc_mgr;
slot_ctx_out->blockstore = blockstore;
slot_ctx_out->epoch_ctx = epoch_ctx;

if( FD_LIKELY( magic==FD_RUNTIME_ENC_BINCODE ) ) {
ulong total_sz = 0UL;
int err = fd_slot_bank_decode_footprint( &decode_ctx, &total_sz );
if( FD_UNLIKELY( err != FD_BINCODE_SUCCESS ) ) {
FD_LOG_WARNING( ( "failed to decode banks record" ) );
continue;
}

uchar * mem = fd_spad_alloc( runtime_spad, fd_slot_bank_align(), total_sz );
if( FD_UNLIKELY( !mem ) ) {
FD_LOG_ERR( ( "failed to allocate memory for slot bank" ) );
}

fd_slot_bank_t * slot_bank = fd_slot_bank_decode( mem, &decode_ctx );

fd_slot_bank_t * slot_bank = fd_slot_bank_decode( mem, &decode_ctx );
fd_memcpy( &slot_ctx_out->slot_bank, slot_bank, sizeof(fd_slot_bank_t) );
} else {
FD_LOG_ERR( ( "failed to read banks record: invalid magic number" ) );
}
FD_TEST( !fd_runtime_sysvar_cache_load( slot_ctx_out, runtime_spad ) );

fd_memcpy( &slot_ctx_out->slot_bank, slot_bank, sizeof(fd_slot_bank_t) );
} else {
FD_LOG_ERR( ( "failed to read banks record: invalid magic number" ) );
if( FD_LIKELY( fd_funk_rec_query_test( query ) == FD_FUNK_SUCCESS ) ) {
break;
}
}
FD_TEST( !fd_runtime_sysvar_cache_load( slot_ctx_out, runtime_spad ) );

// TODO how do i get this info, ignoring rewards for now
// slot_ctx_out->epoch_reward_status = ???
Expand Down Expand Up @@ -347,12 +363,27 @@ fd_forks_update( fd_forks_t * forks,
rec_query_global traverses all the way back to the root. */

fd_voter_t * voter = &epoch_voters[i];
fd_voter_state_t const * state = fd_voter_state( funk, txn, &voter->rec );

/* Fetch the vote account's vote slot and root slot from the vote account, re-trying if there is
a Funk conflict.

TODO: factor this out into a convenience function. */
ulong vote = 0UL;
ulong root = 0UL;
for( ; ; ) {
fd_funk_rec_query_t query[1];
fd_voter_state_t const * state = fd_voter_state( funk, query, txn, &voter->rec );
vote = fd_voter_state_vote( state );
root = fd_voter_state_root( state );

if( FD_LIKELY( fd_funk_rec_query_test( query ) == FD_FUNK_SUCCESS ) ) {
break;
}
}

/* Only process votes for slots >= root. Ghost requires vote slot
to already exist in the ghost tree. */

ulong vote = fd_voter_state_vote( state );
if( FD_LIKELY( vote != FD_SLOT_NULL && vote >= fd_ghost_root( ghost )->slot ) ) {
fd_ghost_replay_vote( ghost, voter, vote );

Expand All @@ -376,7 +407,6 @@ fd_forks_update( fd_forks_t * forks,
/* Check if this voter's root >= ghost root. We can't process
other voters' roots that precede the ghost root. */

ulong root = fd_voter_state_root( state );
if( FD_LIKELY( root != FD_SLOT_NULL && root >= fd_ghost_root( ghost )->slot ) ) {
fd_ghost_node_t const * node = fd_ghost_query( ghost, root );

Expand Down
Loading
Loading