diff --git a/src/app/firedancer-dev/main.c b/src/app/firedancer-dev/main.c index 7ab59ea168..7fb5b7a663 100644 --- a/src/app/firedancer-dev/main.c +++ b/src/app/firedancer-dev/main.c @@ -80,6 +80,7 @@ extern fd_topo_run_tile_t fd_tile_repair; extern fd_topo_run_tile_t fd_tile_storei; extern fd_topo_run_tile_t fd_tile_replay; extern fd_topo_run_tile_t fd_tile_replay_thread; +extern fd_topo_run_tile_t fd_tile_arxiv; extern fd_topo_run_tile_t fd_tile_execor; extern fd_topo_run_tile_t fd_tile_batch; extern fd_topo_run_tile_t fd_tile_batch_thread; @@ -109,6 +110,7 @@ fd_topo_run_tile_t * TILES[] = { &fd_tile_storei, &fd_tile_replay, &fd_tile_replay_thread, + &fd_tile_arxiv, &fd_tile_execor, &fd_tile_batch, &fd_tile_batch_thread, diff --git a/src/app/firedancer/topology.c b/src/app/firedancer/topology.c index d62be132f7..b1e980417d 100644 --- a/src/app/firedancer/topology.c +++ b/src/app/firedancer/topology.c @@ -213,6 +213,7 @@ fd_topo_initialize( config_t * config ) { fd_topob_wksp( topo, "voter_gossip" ); fd_topob_wksp( topo, "voter_dedup" ); fd_topob_wksp( topo, "batch_replay" ); + fd_topob_wksp( topo, "replay_arxiv" ); fd_topob_wksp( topo, "rstart_gossi" ); fd_topob_wksp( topo, "gossi_rstart" ); @@ -240,6 +241,7 @@ fd_topo_initialize( config_t * config ) { fd_topob_wksp( topo, "voter" ); fd_topob_wksp( topo, "poh_slot" ); fd_topob_wksp( topo, "eqvoc" ); + fd_topob_wksp( topo, "arxiv" ); fd_topob_wksp( topo, "batch" ); fd_topob_wksp( topo, "btpool" ); fd_topob_wksp( topo, "constipate" ); @@ -295,6 +297,7 @@ fd_topo_initialize( config_t * config ) { /**/ fd_topob_link( topo, "poh_pack", "replay_poh", 128UL, sizeof(fd_became_leader_t) , 1UL ); /**/ fd_topob_link( topo, "replay_voter", "replay_voter", 128UL, sizeof(fd_txn_p_t), 1UL ); + /**/ fd_topob_link( topo, "replay_arxiv", "replay_arxiv", 128UL, sizeof(ulong), 1UL ); /**/ fd_topob_link( topo, "voter_gossip", "voter_gossip", 128UL, FD_TXN_MTU, 1UL ); /**/ fd_topob_link( topo, "voter_sign", "voter_sign", 128UL, FD_TXN_MTU, 1UL ); /**/ fd_topob_link( topo, "sign_voter", "sign_voter", 128UL, 64UL, 1UL ); @@ -355,6 +358,7 @@ fd_topo_initialize( config_t * config ) { /**/ fd_topob_tile( topo, "sender", "voter", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); /**/ fd_topob_tile( topo, "bhole", "bhole", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); /**/ fd_topob_tile( topo, "eqvoc", "eqvoc", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); + /**/ fd_topob_tile( topo, "arxiv", "arxiv", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); /**/ fd_topob_tile( topo, "replay", "replay", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); /* These thread tiles must be defined immediately after the replay tile. We subtract one because the replay tile acts as a thread in the tpool as well. */ @@ -377,6 +381,7 @@ fd_topo_initialize( config_t * config ) { fd_topo_tile_t * batch_tile = &topo->tiles[ fd_topo_find_tile( topo, "batch" , 0UL ) ]; fd_topo_tile_t * pack_tile = &topo->tiles[ fd_topo_find_tile( topo, "pack" , 0UL ) ]; fd_topo_tile_t * exec_tile = &topo->tiles[ fd_topo_find_tile( topo, "exec" , 0UL ) ]; + fd_topo_tile_t * arxiv_tile = &topo->tiles[ fd_topo_find_tile( topo, "arxiv" , 0UL ) ]; /* Create a shared blockstore to be used by store and replay. */ fd_topo_obj_t * blockstore_obj = setup_topo_blockstore( topo, @@ -389,6 +394,8 @@ fd_topo_initialize( config_t * config ) { fd_topob_tile_uses( topo, store_tile, blockstore_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); fd_topob_tile_uses( topo, replay_tile, blockstore_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); fd_topob_tile_uses( topo, repair_tile, blockstore_obj, FD_SHMEM_JOIN_MODE_READ_ONLY ); + fd_topob_tile_uses( topo, arxiv_tile, blockstore_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); + if( enable_rpc ) { fd_topo_tile_t * rpcserv_tile = &topo->tiles[ fd_topo_find_tile( topo, "rpcsrv", 0UL ) ]; fd_topob_tile_uses( topo, rpcserv_tile, blockstore_obj, FD_SHMEM_JOIN_MODE_READ_ONLY ); @@ -554,6 +561,7 @@ fd_topo_initialize( config_t * config ) { /**/ fd_topob_tile_in( topo, "replay", 0UL, "metric_in", "pack_replay", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); /**/ fd_topob_tile_in( topo, "replay", 0UL, "metric_in", "batch_replay", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); /**/ fd_topob_tile_out( topo, "replay", 0UL, "replay_voter", 0UL ); + /**/ fd_topob_tile_out( topo, "replay", 0UL, "replay_arxiv", 0UL ); FOR(bank_tile_cnt) fd_topob_tile_out( topo, "replay", 0UL, "replay_poh", i ); FOR(exec_tile_cnt) fd_topob_tile_out( topo, "replay", 0UL, "replay_exec", i ); /* TODO check order in fd_replay.c macros*/ FOR(shred_tile_cnt) fd_topob_tile_in( topo, "replay", 0UL, "metric_in", "shred_replay", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); @@ -589,6 +597,7 @@ fd_topo_initialize( config_t * config ) { /**/ fd_topob_tile_out( topo, "repair", 0UL, "repair_sign", 0UL ); /**/ fd_topob_tile_in( topo, "repair", 0UL, "metric_in", "sign_repair", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_UNPOLLED ); /**/ fd_topob_tile_out( topo, "repair", 0UL, "repair_repla", 0UL ); + /**/ fd_topob_tile_in( topo, "arxiv", 0UL, "metric_in", "replay_arxiv", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /**/ fd_topob_tile_out( topo, "sign", 0UL, "sign_repair", 0UL ); FOR(shred_tile_cnt) fd_topob_tile_in( topo, "eqvoc", 0UL, "metric_in", "shred_net", i, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */ @@ -761,6 +770,9 @@ fd_topo_initialize( config_t * config ) { tile->replay.full_interval = config->tiles.batch.full_interval; tile->replay.incremental_interval = config->tiles.batch.incremental_interval; + } else if( FD_UNLIKELY( !strcmp( tile->name, "arxiv" ) ) ) { + strncpy( tile->arxiv.blockstore_file, config->blockstore.file, sizeof(tile->arxiv.blockstore_file) ); + } else if( FD_UNLIKELY( !strcmp( tile->name, "bhole" ) ) ) { } else if( FD_UNLIKELY( !strcmp( tile->name, "sign" ) ) ) { diff --git a/src/app/ledger/main.c b/src/app/ledger/main.c index 3348ed2439..762d6f8029 100644 --- a/src/app/ledger/main.c +++ b/src/app/ledger/main.c @@ -1329,7 +1329,7 @@ replay( fd_ledger_args_t * args ) { fd_ledger_main_setup( args ); - fd_blockstore_init( args->blockstore, -1, FD_BLOCKSTORE_ARCHIVE_MIN_SIZE, &args->slot_ctx->slot_bank ); + fd_blockstore_init( args->blockstore, &args->slot_ctx->slot_bank ); fd_buf_shred_pool_reset( args->blockstore->shred_pool, 0 ); FD_LOG_WARNING(( "setup done" )); diff --git a/src/disco/fd_disco_base.h b/src/disco/fd_disco_base.h index 75075bf584..abae01db1f 100644 --- a/src/disco/fd_disco_base.h +++ b/src/disco/fd_disco_base.h @@ -168,6 +168,22 @@ FD_FN_CONST static inline uint fd_disco_repair_replay_sig_data_cnt ( ulong FD_FN_CONST static inline ushort fd_disco_repair_replay_sig_parent_off ( ulong sig ) { return (ushort)fd_ulong_extract ( sig, 1, 16 ); } FD_FN_CONST static inline int fd_disco_repair_replay_sig_slot_complete( ulong sig ) { return fd_ulong_extract_bit( sig, 0 ); } +FD_FN_CONST static inline ulong +fd_disco_replay_arxiv_sig( ulong slot, uint start_idx, uint end_idx ){ + /* + | slot (32) | start_idx (16) | end_idx (16) | + | [32, 63] | [16, 31] | [0, 15] | + */ + ulong slot_ul = fd_ulong_min( slot, (ulong)UINT_MAX ); + ulong start_idx_ul = (ulong)start_idx; + ulong end_idx_ul = (ulong)end_idx; + return slot_ul << 32 | start_idx_ul << 16 | end_idx_ul; +} + +FD_FN_CONST static inline ulong fd_disco_replay_arxiv_sig_slot ( ulong sig ) { return fd_ulong_extract( sig, 32, 63 ); } +FD_FN_CONST static inline uint fd_disco_replay_arxiv_sig_start_idx( ulong sig ) { return (uint)fd_ulong_extract( sig, 16, 31 ); } +FD_FN_CONST static inline uint fd_disco_replay_arxiv_sig_end_idx ( ulong sig ) { return (uint)fd_ulong_extract( sig, 0, 15 ); } + FD_FN_PURE static inline ulong fd_disco_compact_chunk0( void * wksp ) { return (((struct fd_wksp_private *)wksp)->gaddr_lo) >> FD_CHUNK_LG_SZ; diff --git a/src/disco/topo/fd_topo.h b/src/disco/topo/fd_topo.h index dd6d868bfa..2588b8e0ad 100644 --- a/src/disco/topo/fd_topo.h +++ b/src/disco/topo/fd_topo.h @@ -384,6 +384,10 @@ typedef struct { char identity_key_path[ PATH_MAX ]; } eqvoc; + struct { + char blockstore_file[ PATH_MAX ]; + } arxiv; + struct { ushort rpc_port; ushort tpu_port; diff --git a/src/disco/topo/fd_topob.c b/src/disco/topo/fd_topob.c index a89ae27106..8ed757a8fa 100644 --- a/src/disco/topo/fd_topob.c +++ b/src/disco/topo/fd_topob.c @@ -366,6 +366,7 @@ fd_topob_auto_layout( fd_topo_t * topo ) { "gossip", /* FIREDANCER only */ "repair", /* FIREDANCER only */ "replay", /* FIREDANCER only */ + "arxiv", /* FIREDANCER only */ "exec", /* FIREDANCER only */ "rtpool", /* FIREDANCER only */ "sender", /* FIREDANCER only */ diff --git a/src/discof/arxiv/Local.mk b/src/discof/arxiv/Local.mk new file mode 100644 index 0000000000..ec85090463 --- /dev/null +++ b/src/discof/arxiv/Local.mk @@ -0,0 +1,6 @@ +ifdef FD_HAS_INT128 +ifdef FD_HAS_SSE +$(call add-objs,fd_shred_arxiv fd_arxiv_tile,fd_discof) +$(call make-unit-test,test_arxiv,test_arxiv,fd_discof fd_flamenco fd_ballet fd_util) +endif +endif diff --git a/src/discof/arxiv/fd_arxiv_tile.c b/src/discof/arxiv/fd_arxiv_tile.c new file mode 100644 index 0000000000..5ae1e31fd0 --- /dev/null +++ b/src/discof/arxiv/fd_arxiv_tile.c @@ -0,0 +1,205 @@ +#include +#define _GNU_SOURCE +#include "../../disco/tiles.h" +#include "generated/fd_arxiv_tile_seccomp.h" +#include "fd_shred_arxiv.h" +#include "../../disco/topo/fd_pod_format.h" + +#define REPLAY_IN_IDX 0UL + +struct fd_arxiv_tile_ctx { + fd_shred_arxiver_t * arxiver; + int arxiver_fd; /* file descriptor for the archive file */ + fd_blockstore_t blockstore_ljoin; + fd_blockstore_t * blockstore; + + void * replay_in_mem; + ulong replay_in_chunk0; + ulong replay_in_wmark; +}; +typedef struct fd_arxiv_tile_ctx fd_arxiv_tile_ctx_t; + +FD_FN_CONST static inline ulong +scratch_align( void ) { + return 128UL; +} + +FD_FN_PURE static inline ulong +scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) { + /* clang-format off */ + ulong l = FD_LAYOUT_INIT; + l = FD_LAYOUT_APPEND( l, alignof(fd_arxiv_tile_ctx_t), sizeof(fd_arxiv_tile_ctx_t) ); + l = FD_LAYOUT_APPEND( l, fd_shred_arxiv_align(), fd_shred_arxiv_footprint( FD_SHRED_ARXIV_MIN_SIZE ) ); + return FD_LAYOUT_FINI( l, scratch_align() ); + /* clang-format on */ +} + +static void +during_frag( fd_arxiv_tile_ctx_t * ctx, + ulong in_idx, + ulong seq FD_PARAM_UNUSED, + ulong sig FD_PARAM_UNUSED, + ulong chunk, + ulong sz, + ulong ctl FD_PARAM_UNUSED ) { + + if( in_idx == REPLAY_IN_IDX ) { + if( FD_UNLIKELY( chunkreplay_in_chunk0 || chunk>ctx->replay_in_wmark || sz>USHORT_MAX ) ) { + FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->replay_in_chunk0, ctx->replay_in_wmark )); + } + ulong slot = fd_disco_replay_arxiv_sig_slot( sig ); + uint start_idx = fd_disco_replay_arxiv_sig_start_idx( sig ); + uint end_idx = fd_disco_replay_arxiv_sig_end_idx( sig ); + + fd_shreds_checkpt( ctx->arxiver, ctx->blockstore, slot, start_idx, end_idx ); + } + + (void)chunk; + (void)sz; +} + +static void +after_frag( fd_arxiv_tile_ctx_t * ctx, + ulong in_idx FD_PARAM_UNUSED, + ulong seq FD_PARAM_UNUSED, + ulong sig FD_PARAM_UNUSED, + ulong sz FD_PARAM_UNUSED, + ulong tsorig FD_PARAM_UNUSED, + ulong tspub FD_PARAM_UNUSED, + fd_stem_context_t * stem FD_PARAM_UNUSED ) { + (void)ctx; + /* Let's test for fun that the shred was written to file properly */ + if( in_idx == REPLAY_IN_IDX ) { + FD_LOG_WARNING(( "replay in idx %lu", in_idx )); + ulong slot = fd_disco_replay_arxiv_sig_slot( sig ); + uint end_idx = fd_disco_replay_arxiv_sig_end_idx( sig ); + + fd_shred_idx_t * idx = fd_shred_idx_query( ctx->arxiver->shred_idx, slot << 32 | end_idx, NULL ); + uchar shred_buf[FD_SHRED_MIN_SZ]; + int err = fd_shred_restore( ctx->arxiver, idx, shred_buf, FD_SHRED_MIN_SZ ); + FD_TEST( err == 0 ); + const fd_shred_t * shred = fd_shred_parse( shred_buf, FD_SHRED_MIN_SZ ); + FD_TEST( shred->slot == slot ); + FD_TEST( shred->idx == end_idx ); + } + +} + +static void +privileged_init( fd_topo_t * topo FD_PARAM_UNUSED, + fd_topo_tile_t * tile FD_PARAM_UNUSED ) { + void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id ); + + FD_SCRATCH_ALLOC_INIT( l, scratch ); + fd_arxiv_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_arxiv_tile_ctx_t), sizeof(fd_arxiv_tile_ctx_t) ); + FD_SCRATCH_ALLOC_FINI( l, scratch_align() ); + memset( ctx, 0, sizeof(fd_arxiv_tile_ctx_t) ); + + ctx->arxiver_fd = open( tile->arxiv.blockstore_file, O_RDWR|O_CREAT, 0666 ); + if( FD_UNLIKELY( ctx->arxiver_fd==-1 ) ) { + FD_LOG_ERR(( "failed to open arxiver fd" )); + } +} + +static void +unprivileged_init( fd_topo_t * topo, + fd_topo_tile_t * tile ) { + void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id ); + + FD_SCRATCH_ALLOC_INIT( l, scratch ); + fd_arxiv_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_arxiv_tile_ctx_t), sizeof(fd_arxiv_tile_ctx_t) ); + void * arxiver = FD_SCRATCH_ALLOC_APPEND( l, fd_shred_arxiv_align(), fd_shred_arxiv_footprint( FD_SHRED_ARXIV_MIN_SIZE ) ); + ulong scratch_alloc_mem = FD_SCRATCH_ALLOC_FINI( l, scratch_align() ); + if( FD_UNLIKELY( scratch_alloc_mem - (ulong)scratch - scratch_footprint( tile ) ) ) { + FD_LOG_ERR(( "scratch footprint mismatch" )); + } + + ulong blockstore_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX,"blockstore" ); + FD_TEST( blockstore_obj_id!=ULONG_MAX ); + ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, fd_topo_obj_laddr( topo, blockstore_obj_id ) ); + + FD_TEST( ctx->blockstore->shmem->magic == FD_BLOCKSTORE_MAGIC ); + + ctx->arxiver = fd_shred_arxiv_join( fd_shred_arxiv_new( arxiver, FD_SHRED_ARXIV_MIN_SIZE ) ); + ctx->arxiver->fd = ctx->arxiver_fd; + + /**********************************************************************/ + /* links */ + /**********************************************************************/ + + /* Setup replay tile input */ + fd_topo_link_t * replay_in_link = &topo->links[ tile->in_link_id[ REPLAY_IN_IDX ] ]; + ctx->replay_in_mem = topo->workspaces[ topo->objs[ replay_in_link->dcache_obj_id ].wksp_id ].wksp; + ctx->replay_in_chunk0 = fd_dcache_compact_chunk0( ctx->replay_in_mem, replay_in_link->dcache ); + ctx->replay_in_wmark = fd_dcache_compact_wmark( ctx->replay_in_mem, replay_in_link->dcache, replay_in_link->mtu ); + +} + +static void +after_credit( fd_arxiv_tile_ctx_t * ctx, + fd_stem_context_t * stem FD_PARAM_UNUSED, + int * opt_poll_in FD_PARAM_UNUSED, + int * charge_busy FD_PARAM_UNUSED ) { + (void)ctx; +} + +static ulong +populate_allowed_seccomp( fd_topo_t const * topo, + fd_topo_tile_t const * tile, + ulong out_cnt, + struct sock_filter * out ) { + + void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id ); + + FD_SCRATCH_ALLOC_INIT( l, scratch ); + fd_arxiv_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_arxiv_tile_ctx_t), sizeof(fd_arxiv_tile_ctx_t) ); + FD_SCRATCH_ALLOC_FINI( l, sizeof(fd_arxiv_tile_ctx_t) ); + + populate_sock_filter_policy_fd_arxiv_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->arxiver_fd ); + return sock_filter_policy_fd_arxiv_tile_instr_cnt; +} + +static ulong +populate_allowed_fds( fd_topo_t const * topo, + fd_topo_tile_t const * tile, + ulong out_fds_cnt, + int * out_fds ) { + void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id ); + + FD_SCRATCH_ALLOC_INIT( l, scratch ); + fd_arxiv_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_arxiv_tile_ctx_t), sizeof(fd_arxiv_tile_ctx_t) ); + FD_SCRATCH_ALLOC_FINI( l, sizeof(fd_arxiv_tile_ctx_t) ); + + if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt )); + + ulong out_cnt = 0UL; + out_fds[ out_cnt++ ] = 2; /* stderr */ + if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) + out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */ + out_fds[ out_cnt++ ] = ctx->arxiver_fd; /* shred store fd */ + return out_cnt; +} + + +#define STEM_BURST (1UL) + +#define STEM_CALLBACK_CONTEXT_TYPE fd_arxiv_tile_ctx_t +#define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_arxiv_tile_ctx_t) + +#define STEM_CALLBACK_DURING_FRAG during_frag +#define STEM_CALLBACK_AFTER_FRAG after_frag +#define STEM_CALLBACK_AFTER_CREDIT after_credit + +#include "../../disco/stem/fd_stem.c" + +fd_topo_run_tile_t fd_tile_arxiv = { + .name = "arxiv", + .loose_footprint = 0UL, + .populate_allowed_seccomp = populate_allowed_seccomp, + .populate_allowed_fds = populate_allowed_fds, + .scratch_align = scratch_align, + .scratch_footprint = scratch_footprint, + .privileged_init = privileged_init, + .unprivileged_init = unprivileged_init, + .run = stem_run, +}; diff --git a/src/discof/arxiv/fd_arxiv_tile.seccomppolicy b/src/discof/arxiv/fd_arxiv_tile.seccomppolicy new file mode 100644 index 0000000000..b681d54914 --- /dev/null +++ b/src/discof/arxiv/fd_arxiv_tile.seccomppolicy @@ -0,0 +1,25 @@ +# logfile_fd: It can be disabled by configuration, but typically tiles +# will open a log file on boot and write all messages there. +unsigned int logfile_fd, unsigned int blockstore_fd + +# logging: all log messages are written to a file and/or pipe +# +# 'WARNING' and above are written to the STDERR pipe, while all messages +# are always written to the log file. +# +# arg 0 is the file descriptor to write to. The boot process ensures +# that descriptor 2 is always STDERR. +write: (or (eq (arg 0) 2) + (eq (arg 0) logfile_fd) + (eq (arg 0) blockstore_fd)) + +# logging: 'WARNING' and above fsync the logfile to disk immediately +# +# arg 0 is the file descriptor to fsync. +fsync: (eq (arg 0) logfile_fd) + +# blockstore: read archival file +read: (eq (arg 0) blockstore_fd) + +# blockstore: lseek archival file +lseek: (eq (arg 0) blockstore_fd) diff --git a/src/discof/arxiv/fd_shred_arxiv.c b/src/discof/arxiv/fd_shred_arxiv.c new file mode 100644 index 0000000000..4f7be2b40f --- /dev/null +++ b/src/discof/arxiv/fd_shred_arxiv.c @@ -0,0 +1,247 @@ +#include "fd_shred_arxiv.h" +#include +#include + +void * +fd_shred_arxiv_new( void * shmem, ulong fd_size_max ) { + if ( fd_size_max < FD_SHRED_ARXIV_MIN_SIZE ) { + FD_LOG_ERR(( "archive file size too small" )); + return NULL; + } + + ulong shred_max = fd_size_max / FD_SHRED_ARXIV_UNIT_SZ; + + fd_shred_arxiver_t * arxiver = (fd_shred_arxiver_t *)shmem; + if( FD_UNLIKELY( !arxiver ) ) { + FD_LOG_WARNING(( "NULL arxiver" )); + return NULL; + } + + if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)arxiver, fd_shred_arxiv_align() ) )) { + FD_LOG_WARNING(( "misaligned arxiver" )); + return NULL; + } + + fd_memset( arxiver, 0, fd_shred_arxiv_footprint( fd_size_max ) ); + + int lg_shred_max = fd_ulong_find_msb( fd_ulong_pow2_up( shred_max ) ) + 1; + + FD_SCRATCH_ALLOC_INIT( l, shmem ); + arxiver = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_shred_arxiver_t), sizeof(fd_shred_arxiver_t) ); + void * shred_idx = FD_SCRATCH_ALLOC_APPEND( l, fd_shred_idx_align(), fd_shred_idx_footprint( lg_shred_max ) ); + void * shred_off = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_shred_off_t), shred_max * sizeof(fd_shred_off_t) ); + ulong top = FD_SCRATCH_ALLOC_FINI( l, fd_shred_arxiv_align() ); + FD_TEST( top - (ulong)shmem == fd_shred_arxiv_footprint( fd_size_max ) ); + + fd_memset( shred_off, 0, shred_max * sizeof(fd_shred_off_t) ); + + arxiver->shred_idx = fd_shred_idx_new( shred_idx, lg_shred_max ); + arxiver->shred_off = (fd_shred_off_t *)shred_off; + arxiver->fd_size_max = fd_size_max; + arxiver->shred_max = shred_max; + return (void *)arxiver; +} + +fd_shred_arxiver_t * +fd_shred_arxiv_join( void * shmem ) { + fd_shred_arxiver_t * arxiver = (fd_shred_arxiver_t *)shmem; + if( FD_UNLIKELY( !arxiver ) ) { + FD_LOG_WARNING(( "NULL arxiver" )); + return NULL; + } + + if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)arxiver, fd_shred_arxiv_align() ) )) { + FD_LOG_WARNING(( "misaligned arxiver" )); + return NULL; + } + arxiver->shred_idx = fd_shred_idx_join( arxiver->shred_idx ); + + return arxiver; +} + +void * +fd_shred_arxiv_leave( fd_shred_arxiver_t * arxiver ) { + if( FD_UNLIKELY( !arxiver ) ) { + FD_LOG_WARNING(( "NULL arxiver" )); + return NULL; + } + + fd_shred_idx_leave( arxiver->shred_idx ); + return (void *)arxiver; +} + +void * +fd_shred_arxiv_delete( void * shmem ) { + fd_shred_arxiver_t * arxiver = (fd_shred_arxiver_t *)shmem; + if( FD_UNLIKELY( !arxiver ) ) { + FD_LOG_WARNING(( "NULL arxiver" )); + return NULL; + } + + if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)arxiver, fd_shred_arxiv_align() ) )) { + FD_LOG_WARNING(( "misaligned arxiver" )); + return NULL; + } + + fd_shred_idx_delete( arxiver->shred_idx ); + return (void *)arxiver; +} + +static inline ulong +shred_off_get( fd_shred_arxiver_t * arxiv, + ulong off ) { + ulong idx = off / FD_SHRED_ARXIV_UNIT_SZ; + return arxiv->shred_off[idx].key; +} + +static void +prepare_write( fd_shred_arxiver_t * arxiv ) { + if( FD_UNLIKELY( arxiv->tail >= arxiv->fd_size_max ) ) { + arxiv->tail = 0; + } + + ulong shred_key = shred_off_get( arxiv, arxiv->tail ); + if( shred_key != 0 ) { + /* Overwriting something, so we need to evict this one. */ + fd_shred_idx_t * idx = fd_shred_idx_query( arxiv->shred_idx, shred_key, NULL ); + FD_TEST( idx ); + fd_shred_idx_remove( arxiv->shred_idx, idx ); + arxiv->shred_off[arxiv->tail / FD_SHRED_ARXIV_UNIT_SZ].key = 0; + } +} + +static int +shred_write( fd_shred_arxiver_t * arxiv, + fd_shred_t * shred ) { + ulong write_off = arxiv->tail; + if ( FD_UNLIKELY( lseek( arxiv->fd, (long)write_off, SEEK_SET ) == -1 ) ) { + FD_LOG_ERR(( "[%s] failed to seek to offset %lu", __func__, write_off )); + } + ulong wsz; + int err = fd_io_write( arxiv->fd, shred, fd_shred_sz( shred ), fd_shred_sz( shred ), &wsz ); + return err; +} + +/* there's an extra subtlety where if we try to checkpt something + but it's not successful, we evict first, in the spirit of invalidating + the metadata first before updating the file. */ +void +fd_shreds_checkpt( fd_shred_arxiver_t * arxiv, + fd_blockstore_t * blockstore, + ulong slot, + uint start_idx, + uint end_idx /* inclusive */ ) { + for( uint i = start_idx; i <= end_idx; i++ ) { + fd_shred_key_t buf_key = { slot, i }; + ulong arxiv_key = slot << 32 | i; + + fd_shred_idx_t * idx = fd_shred_idx_query( arxiv->shred_idx, arxiv_key, NULL ); + if( FD_UNLIKELY( idx ) ){ + FD_LOG_WARNING(( "[%s] shred idx %lu %u already exists", __func__, slot, i )); + continue; + } + + prepare_write( arxiv ); + ulong wsz = 0; + int success = 0; + + for(;;){ + success = 0; + fd_buf_shred_map_query_t query[1] = { 0 }; + int err = fd_buf_shred_map_query_try( blockstore->shred_map, &buf_key, NULL, query ); + if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) { + FD_LOG_WARNING(( "[%s] key: (%lu, %u) %s", __func__, slot, i, fd_buf_shred_map_strerror( err ) )); + break; + } + if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue; + fd_buf_shred_t * buf_shred = fd_buf_shred_map_query_ele( query ); + fd_shred_t * shred = (fd_shred_t *)buf_shred->buf; + + int werr = shred_write( arxiv, shred ); + (void)werr; + wsz = fd_shred_sz( shred ); + success = 1; + + err = fd_buf_shred_map_query_test( query ); + if( FD_LIKELY( err == FD_MAP_SUCCESS) ) break; + } + + if( FD_UNLIKELY( !success ) ) continue; + + idx = fd_shred_idx_insert( arxiv->shred_idx, arxiv_key ); + idx->off = arxiv->tail; + idx->sz = wsz; + + arxiv->shred_off[arxiv->tail / FD_SHRED_ARXIV_UNIT_SZ].key = arxiv_key; + arxiv->tail += FD_SHRED_ARXIV_UNIT_SZ; + + /* remove shred from buf_shred map */ + fd_blockstore_shred_remove( blockstore, slot, i ); + } +} + +static int +shred_read( fd_shred_arxiver_t * arxiv, + fd_shred_idx_t * shred_idx, + uchar * buf_out, + ulong buf_max, + ulong * rsz ) { + ulong read_off = shred_idx->off; + if( shred_idx->sz > buf_max ) { + FD_LOG_WARNING(( "[%s] buffer size %lu < shred size %lu", __func__, buf_max, shred_idx->sz )); + return -1; + } + if ( FD_UNLIKELY( lseek( arxiv->fd, (long)read_off, SEEK_SET ) == -1 ) ) { + FD_LOG_ERR(( "[%s] failed to seek to offset %lu", __func__, read_off )); + } + int err = fd_io_read( arxiv->fd, buf_out, shred_idx->sz, shred_idx->sz, rsz ); + return err; +} + +static int FD_FN_UNUSED +shred_payload_read( fd_shred_arxiver_t * arxiv, + fd_shred_idx_t * shred_idx, + uchar * buf_out, + ulong buf_max ) { + ulong read_off = shred_idx->off + FD_SHRED_DATA_HEADER_SZ; + ulong pay_sz = shred_idx->sz - FD_SHRED_DATA_HEADER_SZ; + if( pay_sz > buf_max ) { + FD_LOG_WARNING(( "[%s] buffer size %lu < shred size %lu", __func__, buf_max, pay_sz )); + return -1; + } + ulong rsz; + if ( FD_UNLIKELY( lseek( arxiv->fd, (long)read_off, SEEK_SET ) == -1 ) ) { + FD_LOG_ERR(( "[%s] failed to seek to offset %lu", __func__, read_off )); + } + int err = fd_io_read( arxiv->fd, buf_out, pay_sz, pay_sz, &rsz ); + return err; +} + +int +fd_shred_restore( fd_shred_arxiver_t * arxiv, + fd_shred_idx_t * shred_idx, + uchar * buf_out, + ulong buf_max ) { + ulong rsz; + int err = shred_read( arxiv, shred_idx, buf_out, buf_max, &rsz ); + return err; +} + +int +fd_shred_arxiv_verify( fd_shred_arxiver_t * arxiv ){ + ulong total_stored = 0; + for( ulong i = 0; i < arxiv->shred_max; i++ ) { + fd_shred_off_t * off = &arxiv->shred_off[i]; + if( off->key == 0 ) continue; + fd_shred_idx_t * idx = fd_shred_idx_query( arxiv->shred_idx, off->key, NULL ); + if( FD_UNLIKELY( !idx ) ) { + return -1; + } + total_stored++; + } + if( FD_UNLIKELY( total_stored != fd_shred_idx_key_cnt( arxiv->shred_idx ) ) ) { + return -1; + } + return 0; +} + diff --git a/src/discof/arxiv/fd_shred_arxiv.h b/src/discof/arxiv/fd_shred_arxiv.h new file mode 100644 index 0000000000..7a837e24b4 --- /dev/null +++ b/src/discof/arxiv/fd_shred_arxiv.h @@ -0,0 +1,101 @@ +#ifndef HEADER_fd_src_flamenco_runtime_fd_shred_archive_h +#define HEADER_fd_src_flamenco_runtime_fd_shred_archive_h + +#include "../../flamenco/runtime/fd_blockstore.h" + +struct fd_shred_idx { + ulong key; /* 32 msb slot, 32 lsb idx */ + ulong next; + uint hash; + ulong off; + ulong sz; +}; +typedef struct fd_shred_idx fd_shred_idx_t; + +#define MAP_NAME fd_shred_idx +#define MAP_T fd_shred_idx_t +#define MAP_KEY key +#define MAP_KEY_HASH(key) ((uint)(key)) /* finalized slots are guaranteed to be unique so perfect hashing */ +#include "../../util/tmpl/fd_map_dynamic.c" + +struct fd_shred_off { /* truly cannot think of a better way rn... */ + ulong key; +}; +typedef struct fd_shred_off fd_shred_off_t; + +struct __attribute__((aligned(128UL))) fd_shred_arxiver { + ulong fd_size_max; /* maximum size of the archival file */ + ulong shred_max; /* maximum # of shreds that can be held in file (fd_size_max / FD_SHRED_MAX_SZ) */ + + fd_shred_idx_t * shred_idx; /* pointer to shred_idx map */ + fd_shred_off_t * shred_off; /* pointer to shred_off map */ + int fd; /* file descriptor for the archive file */ + ulong tail; /* location after most recently written block */ +}; +typedef struct fd_shred_arxiver fd_shred_arxiver_t; + +#define FD_SHRED_ARXIV_UNIT_SZ FD_SHRED_MAX_SZ /* max size of each element in the arxiv file */ /* will want to switch this to the payload ... actually ... */ +#define FD_SHRED_ARXIV_MIN_SIZE (FD_SHRED_ARXIV_UNIT_SZ * 1024UL) /* minimum size of the archive file */ + +FD_FN_CONST static inline ulong +fd_shred_arxiv_align( void ) { + return alignof(fd_shred_arxiver_t); +} + +/* fd_shred_arxiv_footprint returns the footprint of the entire + fd_shred_arxiver_t including data structures. */ +FD_FN_CONST static inline ulong +fd_shred_arxiv_footprint( ulong fd_size_max ) { + ulong shred_max = fd_size_max / FD_SHRED_ARXIV_UNIT_SZ; + int lg_shred_max = fd_ulong_find_msb( fd_ulong_pow2_up( shred_max ) ) + 1; + return FD_LAYOUT_FINI( + FD_LAYOUT_APPEND( + FD_LAYOUT_APPEND( + FD_LAYOUT_APPEND( + FD_LAYOUT_INIT, + alignof(fd_shred_arxiver_t), sizeof(fd_shred_arxiver_t) ), + fd_shred_idx_align(), fd_shred_idx_footprint( lg_shred_max ) ), + alignof(fd_shred_off_t), sizeof(fd_shred_off_t) * shred_max ), + fd_shred_arxiv_align() ); +} + +/* fd_shred_arxiv_new formats an unused memory region for use as a + arxiver. mem is a non-NULL pointer to this region in the local + address space with the required footprint and alignment. */ + +void * +fd_shred_arxiv_new( void * mem, ulong fd_size_max ); + +fd_shred_arxiver_t * +fd_shred_arxiv_join( void * shmem ); + +void * +fd_shred_arxiv_leave( fd_shred_arxiver_t * shred_arxiv ); + +void * +fd_shred_arxiv_delete( void * shred_arxiv ); + + +/* Archives a block and block map entry to fd at shred->off, and does + any necessary bookkeeping. + If fd is -1, no write is attempted. Returns written size */ +void +fd_shreds_checkpt( fd_shred_arxiver_t * arxiv, + fd_blockstore_t * blockstore, + ulong slot, + uint start_idx, + uint end_idx /* inclusive */ ); + +/* Restores a block and block map entry from fd at given offset. As this used by + rpcserver, it must return an error code instead of throwing an error on failure. */ +int +fd_shred_restore( fd_shred_arxiver_t * arxiv, + fd_shred_idx_t * block_idx_entry, + uchar * buf_out, + ulong buf_max ); + +/* Returns 0 if the archive metadata is valid */ +int +fd_shred_arxiv_verify( fd_shred_arxiver_t * archiver ); + +#endif /* HEADER_fd_src_flamenco_runtime_fd_shred_archive_h */ diff --git a/src/discof/arxiv/generated/fd_arxiv_tile_seccomp.h b/src/discof/arxiv/generated/fd_arxiv_tile_seccomp.h new file mode 100644 index 0000000000..624fdac2d6 --- /dev/null +++ b/src/discof/arxiv/generated/fd_arxiv_tile_seccomp.h @@ -0,0 +1,78 @@ +/* THIS FILE WAS GENERATED BY generate_filters.py. DO NOT EDIT BY HAND! */ +#ifndef HEADER_fd_src_discof_arxiv_generated_fd_arxiv_tile_seccomp_h +#define HEADER_fd_src_discof_arxiv_generated_fd_arxiv_tile_seccomp_h + +#include "../../../../src/util/fd_util_base.h" +#include +#include +#include +#include +#include +#include +#include +#include + +#if defined(__i386__) +# define ARCH_NR AUDIT_ARCH_I386 +#elif defined(__x86_64__) +# define ARCH_NR AUDIT_ARCH_X86_64 +#elif defined(__aarch64__) +# define ARCH_NR AUDIT_ARCH_AARCH64 +#else +# error "Target architecture is unsupported by seccomp." +#endif +static const unsigned int sock_filter_policy_fd_arxiv_tile_instr_cnt = 22; + +static void populate_sock_filter_policy_fd_arxiv_tile( ulong out_cnt, struct sock_filter * out, unsigned int logfile_fd, unsigned int blockstore_fd) { + FD_TEST( out_cnt >= 22 ); + struct sock_filter filter[22] = { + /* Check: Jump to RET_KILL_PROCESS if the script's arch != the runtime arch */ + BPF_STMT( BPF_LD | BPF_W | BPF_ABS, ( offsetof( struct seccomp_data, arch ) ) ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, ARCH_NR, 0, /* RET_KILL_PROCESS */ 18 ), + /* loading syscall number in accumulator */ + BPF_STMT( BPF_LD | BPF_W | BPF_ABS, ( offsetof( struct seccomp_data, nr ) ) ), + /* allow write based on expression */ + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_write, /* check_write */ 4, 0 ), + /* allow fsync based on expression */ + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_fsync, /* check_fsync */ 9, 0 ), + /* allow read based on expression */ + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_read, /* check_read */ 10, 0 ), + /* allow lseek based on expression */ + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_lseek, /* check_lseek */ 11, 0 ), + /* none of the syscalls matched */ + { BPF_JMP | BPF_JA, 0, 0, /* RET_KILL_PROCESS */ 12 }, +// check_write: + /* load syscall argument 0 in accumulator */ + BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, 2, /* RET_ALLOW */ 11, /* lbl_1 */ 0 ), +// lbl_1: + /* load syscall argument 0 in accumulator */ + BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_ALLOW */ 9, /* lbl_2 */ 0 ), +// lbl_2: + /* load syscall argument 0 in accumulator */ + BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, blockstore_fd, /* RET_ALLOW */ 7, /* RET_KILL_PROCESS */ 6 ), +// check_fsync: + /* load syscall argument 0 in accumulator */ + BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_ALLOW */ 5, /* RET_KILL_PROCESS */ 4 ), +// check_read: + /* load syscall argument 0 in accumulator */ + BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, blockstore_fd, /* RET_ALLOW */ 3, /* RET_KILL_PROCESS */ 2 ), +// check_lseek: + /* load syscall argument 0 in accumulator */ + BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, blockstore_fd, /* RET_ALLOW */ 1, /* RET_KILL_PROCESS */ 0 ), +// RET_KILL_PROCESS: + /* KILL_PROCESS is placed before ALLOW since it's the fallthrough case. */ + BPF_STMT( BPF_RET | BPF_K, SECCOMP_RET_KILL_PROCESS ), +// RET_ALLOW: + /* ALLOW has to be reached by jumping */ + BPF_STMT( BPF_RET | BPF_K, SECCOMP_RET_ALLOW ), + }; + fd_memcpy( out, filter, sizeof( filter ) ); +} + +#endif diff --git a/src/discof/arxiv/test_arxiv.c b/src/discof/arxiv/test_arxiv.c new file mode 100644 index 0000000000..dd92e4232f --- /dev/null +++ b/src/discof/arxiv/test_arxiv.c @@ -0,0 +1,170 @@ +#include "fd_shred_arxiv.h" +#include + + +static void +populate_blockstore( fd_blockstore_t * blockstore, ulong slot, int idx ) { + uchar raw[ FD_SHRED_MIN_SZ ] = { 0 }; + memset( raw, idx, sizeof(raw) ); + fd_shred_t * shred = (fd_shred_t *)raw; + shred->slot = slot; + shred->idx = (uint)idx; + shred->variant = 0x90; + shred->data.parent_off = 1; + + fd_blockstore_shred_insert( blockstore, shred ); +} + +void +test_arxiv_evict( fd_wksp_t * wksp, int fd ) { + void * mem = fd_wksp_alloc_laddr( wksp, fd_shred_arxiv_align(), fd_shred_arxiv_footprint( FD_SHRED_ARXIV_MIN_SIZE ), 1UL ); + FD_TEST( mem ); + fd_shred_arxiver_t * arxiv = fd_shred_arxiv_join( fd_shred_arxiv_new( mem, FD_SHRED_ARXIV_MIN_SIZE ) ); + FD_TEST( arxiv ); + arxiv->fd = fd; + + void * bmem = fd_wksp_alloc_laddr( wksp, fd_blockstore_align(), fd_blockstore_footprint( 4096, 4096, 4096, 4096 ), 1UL ); + FD_TEST( bmem ); + ulong shred_max = 1 << 15; + void * shblockstore = fd_blockstore_new( bmem, 1UL, 42UL, shred_max, 4096, 4096, shred_max ); + FD_TEST( shblockstore ); + fd_blockstore_t blockstore_ljoin; + fd_blockstore_t * blockstore = fd_blockstore_join( &blockstore_ljoin, shblockstore ); + fd_buf_shred_pool_reset( blockstore->shred_pool, 0 ); + blockstore->shmem->wmk = 0; + + ulong total_shreds = ( 1 << 11 ) + 2; /* sure to cause eviction */ + + /* the archiver will hold 1024 shreds. + will begin evicting at the 1024th shred. + and 2048, 2049 it will begin from the top of file again. + + Thus the end state of the archive file should be: + 2048, 2049, 1026, 1027 ... 2046 2047 + + But there's an extra subtlety where if we try to checkpt something + but it's not successful, we evict first, in the spirit of invalidating + the metadata first before updating the file. Thus, + */ + + for( ulong i = 0; i < total_shreds; i++ ) { + FD_LOG_NOTICE(("insert shred slot %lu idx %lu", (i / 64) + 1, i % 64)); + populate_blockstore( blockstore, (i / 64) + 1, i % 64 ); + } + + fd_shreds_checkpt( arxiv, blockstore, 1, 0, 0 ); + fd_shreds_checkpt( arxiv, blockstore, 1, 1, 63 ); + + for( uint i = 0; i < 1024; i+=64 ){ + FD_LOG_NOTICE(("checkpt shred slot %u idx %d", (i / 64) + 1, 0)); + + fd_shreds_checkpt( arxiv, blockstore, (i/64) + 1, 0, 63 ); /* will also try to double archive the slot 0 */ + } + + FD_TEST( fd_shred_arxiv_verify( arxiv ) == 0 ); + + for( uint i = 1024; i < total_shreds; i+=64 ){ + if( FD_UNLIKELY( i == 2048 ) ) { + fd_shreds_checkpt( arxiv, blockstore, (i/64) + 1, 0, 1 ); /* 2048 and 2049 */ + continue; + } + fd_shreds_checkpt( arxiv, blockstore, (i/64) + 1, 0, 63 ); /* will also try to double archive the slot 0 */ + } + + FD_TEST( fd_shred_arxiv_verify( arxiv ) == 0 ); + + for( uint i = 1026; i < total_shreds; i ++ ){ + FD_LOG_NOTICE(( "i: %u, slot: %u, idx: %u", i, (i / 64) + 1, i % 64 )); + ulong key = (ulong)(( i/64 ) + 1 ) << 32 | ( i % 64 ); + fd_shred_idx_t * idx = fd_shred_idx_query( arxiv->shred_idx, key, NULL ); + FD_TEST( idx ); + + uchar buf[FD_SHRED_MIN_SZ]; + int err = fd_shred_restore( arxiv, idx, buf, sizeof(buf) ); + fd_shred_t * shred = (fd_shred_t *)buf; + + FD_TEST( err == 0 ); + FD_TEST( shred->idx == (uint)(i % 64) ); + FD_TEST( fd_shred_type( shred->variant ) == 0x90 ); + FD_TEST( shred->slot == (i / 64) + 1 ); + } + + +} + +void +test_simple_arxiv( fd_wksp_t * wksp, int fd ) { + void * mem = fd_wksp_alloc_laddr( wksp, fd_shred_arxiv_align(), fd_shred_arxiv_footprint( FD_SHRED_ARXIV_MIN_SIZE ), 1UL ); + FD_TEST( mem ); + fd_shred_arxiver_t * arxiv = fd_shred_arxiv_join( fd_shred_arxiv_new( mem, FD_SHRED_ARXIV_MIN_SIZE ) ); + FD_TEST( arxiv ); + arxiv->fd = fd; + FD_LOG_WARNING(("arxiv holds max: %lu, map holds max: %lu", arxiv->shred_max, fd_shred_idx_key_max( arxiv->shred_idx ))); + + + void * bmem = fd_wksp_alloc_laddr( wksp, fd_blockstore_align(), fd_blockstore_footprint( 4096, 4096, 4096, 4096 ), 1UL ); + FD_TEST( bmem ); + ulong shred_max = 1 << 15; + void * shblockstore = fd_blockstore_new( bmem, 1UL, 42UL, shred_max, 4096, 4096, shred_max ); + FD_TEST( shblockstore ); + fd_blockstore_t blockstore_ljoin; + fd_blockstore_t * blockstore = fd_blockstore_join( &blockstore_ljoin, shblockstore ); + fd_buf_shred_pool_reset( blockstore->shred_pool, 0 ); + blockstore->shmem->wmk = 0; + + ulong slot = 2; + for( int i = 0; i < 60; i++ ) { + populate_blockstore( blockstore, slot, i ); + } + + fd_shreds_checkpt( arxiv, blockstore, slot, 0, 50 ); + + for( uint i = 0; i <= 50; i++ ) { + ulong key = ( slot << 32 ) | i; + fd_shred_idx_t * idx = fd_shred_idx_query( arxiv->shred_idx, key, NULL ); + FD_TEST( idx ); + + uchar buf[FD_SHRED_MIN_SZ]; + int err = fd_shred_restore( arxiv, idx, buf, sizeof(buf) ); + fd_shred_t * shred = (fd_shred_t *)buf; + + FD_TEST( err == 0 ); + FD_TEST( shred->idx == (uint)i ); + FD_TEST( fd_shred_type( shred->variant ) == 0x90 ); + FD_TEST( shred->slot == slot ); + } + + for( uint i = 51; i <= 60; i++ ) { + ulong key = ( slot << 32 ) | i; + fd_shred_idx_t * idx = fd_shred_idx_query( arxiv->shred_idx, key, NULL ); + FD_TEST( !idx ); + } + FD_TEST( fd_shred_arxiv_verify( arxiv ) == 0 ); + + fd_wksp_free_laddr( mem ); + fd_wksp_free_laddr( bmem ); +} + +int +main( int argc, char ** argv ) { + fd_boot( &argc, &argv ); + + ulong page_cnt = 5; + char * _page_sz = "gigantic"; + ulong numa_idx = fd_shmem_numa_idx( 0 ); + fd_wksp_t * wksp = fd_wksp_new_anonymous( fd_cstr_to_shmem_page_sz( _page_sz ), page_cnt, fd_shmem_cpu_idx( numa_idx ), "wksp", 0UL ); + FD_TEST( wksp ); + + const char * file = fd_env_strip_cmdline_cstr( &argc, &argv, "--file", NULL, NULL); + int fd = open(file, O_RDWR | O_CREAT, 0666); + FD_TEST( fd > 0 ); + FD_TEST( ftruncate( fd, 0 ) == 0 ); + + test_simple_arxiv( wksp, fd ); + + FD_TEST( ftruncate( fd, 0 ) == 0 ); + test_arxiv_evict( wksp, fd ); + + fd_halt(); + return 0; +} diff --git a/src/discof/replay/fd_replay_tile.c b/src/discof/replay/fd_replay_tile.c index 1f37a4fa12..23de50b1bb 100644 --- a/src/discof/replay/fd_replay_tile.c +++ b/src/discof/replay/fd_replay_tile.c @@ -60,7 +60,8 @@ #define STAKE_OUT_IDX (0UL) #define NOTIF_OUT_IDX (1UL) #define SENDER_OUT_IDX (2UL) -#define POH_OUT_IDX (3UL) +#define ARXIV_OUT_IDX (3UL) +#define POH_OUT_IDX (4UL) #define VOTE_ACC_MAX (2000000UL) @@ -152,6 +153,9 @@ struct fd_replay_tile_ctx { ulong sender_out_wmark; ulong sender_out_chunk; + // Arxiv output defs + fd_replay_out_ctx_t arxiv_out; + // Stake weights output link defs fd_frag_meta_t * stake_weights_out_mcache; ulong * stake_weights_out_sync; @@ -220,7 +224,6 @@ struct fd_replay_tile_ctx { /* Depends on store_int and is polled in after_credit */ fd_blockstore_t blockstore_ljoin; - int blockstore_fd; /* file descriptor for archival file */ fd_blockstore_t * blockstore; /* Updated during execution */ @@ -1611,6 +1614,9 @@ handle_slice( fd_replay_tile_ctx_t * ctx, __asm__("int $3"); FD_LOG_ERR(( "Failed to query blockstore for slot %lu", slot )); } + + ulong arxiv_sig = fd_disco_replay_arxiv_sig( slot, start_idx, start_idx + data_cnt - 1 ); + fd_stem_publish( stem, ARXIV_OUT_IDX, arxiv_sig, ctx->arxiv_out.chunk, 0, 0, 0, 0 ); } static void @@ -1756,7 +1762,7 @@ tpool_boot( fd_topo_t * topo, ulong total_thread_count ) { static void kickoff_repair_orphans( fd_replay_tile_ctx_t * ctx, fd_stem_context_t * stem ) { - fd_blockstore_init( ctx->slot_ctx->blockstore, ctx->blockstore_fd, FD_BLOCKSTORE_ARCHIVE_MIN_SIZE, &ctx->slot_ctx->slot_bank ); + fd_blockstore_init( ctx->slot_ctx->blockstore, &ctx->slot_ctx->slot_bank ); publish_stake_weights( ctx, stem, ctx->slot_ctx ); fd_fseq_update( ctx->published_wmark, ctx->slot_ctx->slot_bank.slot ); @@ -1911,8 +1917,6 @@ read_snapshot( void * _ctx, FD_LOG_NOTICE(( "finished fd_bpf_scan_and_create_bpf_program_cache_entry..." )); fd_blockstore_init( ctx->slot_ctx->blockstore, - ctx->blockstore_fd, - FD_BLOCKSTORE_ARCHIVE_MIN_SIZE, &ctx->slot_ctx->slot_bank ); } @@ -2521,7 +2525,7 @@ during_housekeeping( void * _ctx ) { FD_LOG_NOTICE(( "wmk %lu => %lu", fd_fseq_query( ctx->published_wmark ), wmark )); fd_funk_txn_xid_t xid = { .ul = { wmark, wmark } }; - if( FD_LIKELY( ctx->blockstore ) ) fd_blockstore_publish( ctx->blockstore, ctx->blockstore_fd, wmark ); + if( FD_LIKELY( ctx->blockstore ) ) fd_blockstore_publish( ctx->blockstore, wmark ); if( FD_LIKELY( ctx->forks ) ) fd_forks_publish( ctx->forks, wmark, ctx->ghost ); if( FD_LIKELY( ctx->funk ) ) funk_and_txncache_publish( ctx, wmark, &xid ); if( FD_LIKELY( ctx->ghost ) ) { @@ -2548,11 +2552,6 @@ privileged_init( fd_topo_t * topo, FD_TEST( sizeof(ulong) == getrandom( &ctx->funk_seed, sizeof(ulong), 0 ) ); FD_TEST( sizeof(ulong) == getrandom( &ctx->status_cache_seed, sizeof(ulong), 0 ) ); - ctx->blockstore_fd = open( tile->replay.blockstore_file, O_RDWR | O_CREAT, 0666 ); - if( FD_UNLIKELY( ctx->blockstore_fd == -1 ) ) { - FD_LOG_ERR(( "failed to open or create blockstore archival file %s %d %d %s", tile->replay.blockstore_file, ctx->blockstore_fd, errno, strerror(errno) )); - } - /**********************************************************************/ /* runtime public */ /**********************************************************************/ @@ -2893,6 +2892,20 @@ unprivileged_init( fd_topo_t * topo, fd_solcap_writer_init( ctx->capture_ctx->capture, ctx->capture_file ); } + /**********************************************************************/ + /* arxiv */ + /**********************************************************************/ + fd_topo_link_t * arxiv_out_link = &topo->links[ tile->out_link_id[ ARXIV_OUT_IDX ] ]; + ctx->arxiv_out.mcache = arxiv_out_link->mcache; + ctx->arxiv_out.sync = fd_mcache_seq_laddr( ctx->arxiv_out.mcache ); + ctx->arxiv_out.depth = fd_mcache_depth( ctx->arxiv_out.mcache ); + ctx->arxiv_out.seq = fd_mcache_seq_query( ctx->arxiv_out.sync ); + ctx->arxiv_out.mem = topo->workspaces[ topo->objs[ arxiv_out_link->dcache_obj_id ].wksp_id ].wksp; + ctx->arxiv_out.chunk0 = fd_dcache_compact_chunk0( ctx->arxiv_out.mem, arxiv_out_link->dcache ); + ctx->arxiv_out.wmark = fd_dcache_compact_wmark( ctx->arxiv_out.mem, arxiv_out_link->dcache, arxiv_out_link->mtu ); + ctx->arxiv_out.chunk = ctx->arxiv_out.chunk0; + ctx->arxiv_out.idx = fd_topo_find_tile_out_link( topo, tile, "arxiv", 0UL ); + /**********************************************************************/ /* bank */ /**********************************************************************/ @@ -3069,13 +3082,9 @@ populate_allowed_seccomp( fd_topo_t const * topo, fd_topo_tile_t const * tile, ulong out_cnt, struct sock_filter * out ) { - void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id ); - - FD_SCRATCH_ALLOC_INIT( l, scratch ); - fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) ); - FD_SCRATCH_ALLOC_FINI( l, sizeof(fd_replay_tile_ctx_t) ); - - populate_sock_filter_policy_fd_replay_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->blockstore_fd ); + (void) topo; + (void) tile; + populate_sock_filter_policy_fd_replay_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() ); return sock_filter_policy_fd_replay_tile_instr_cnt; } @@ -3084,19 +3093,14 @@ populate_allowed_fds( fd_topo_t const * topo, fd_topo_tile_t const * tile, ulong out_fds_cnt, int * out_fds ) { - void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id ); - - FD_SCRATCH_ALLOC_INIT( l, scratch ); - fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) ); - FD_SCRATCH_ALLOC_FINI( l, sizeof(fd_replay_tile_ctx_t) ); - + (void) topo; + (void) tile; if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt )); ulong out_cnt = 0UL; out_fds[ out_cnt++ ] = 2; /* stderr */ if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */ - out_fds[ out_cnt++ ] = ctx->blockstore_fd; return out_cnt; } @@ -3107,7 +3111,7 @@ metrics_write( fd_replay_tile_ctx_t * ctx ) { } /* TODO: This is definitely not correct */ -#define STEM_BURST (1UL) +#define STEM_BURST (2UL) #define STEM_CALLBACK_CONTEXT_TYPE fd_replay_tile_ctx_t #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_replay_tile_ctx_t) diff --git a/src/discof/replay/fd_replay_tile.seccomppolicy b/src/discof/replay/fd_replay_tile.seccomppolicy index fdb5140d90..efb7dec4f4 100644 --- a/src/discof/replay/fd_replay_tile.seccomppolicy +++ b/src/discof/replay/fd_replay_tile.seccomppolicy @@ -1,6 +1,6 @@ # logfile_fd: It can be disabled by configuration, but typically tiles # will open a log file on boot and write all messages there. -unsigned int logfile_fd, unsigned int blockstore_fd +unsigned int logfile_fd # logging: all log messages are written to a file and/or pipe # @@ -16,9 +16,3 @@ write: (or (eq (arg 0) 2) # # arg 0 is the file descriptor to fsync. fsync: (eq (arg 0) logfile_fd) - -# blockstore: read archival file -read: (eq (arg 0) blockstore_fd) - -# blockstore: lseek archival file -lseek: (eq (arg 0) blockstore_fd) diff --git a/src/discof/replay/generated/fd_replay_tile_seccomp.h b/src/discof/replay/generated/fd_replay_tile_seccomp.h index e857b2c6ba..b9f8fc8b40 100644 --- a/src/discof/replay/generated/fd_replay_tile_seccomp.h +++ b/src/discof/replay/generated/fd_replay_tile_seccomp.h @@ -21,46 +21,34 @@ #else # error "Target architecture is unsupported by seccomp." #endif -static const unsigned int sock_filter_policy_fd_replay_tile_instr_cnt = 20; +static const unsigned int sock_filter_policy_fd_replay_tile_instr_cnt = 14; -static void populate_sock_filter_policy_fd_replay_tile( ulong out_cnt, struct sock_filter * out, unsigned int logfile_fd, unsigned int blockstore_fd) { - FD_TEST( out_cnt >= 20 ); - struct sock_filter filter[20] = { +static void populate_sock_filter_policy_fd_replay_tile( ulong out_cnt, struct sock_filter * out, unsigned int logfile_fd) { + FD_TEST( out_cnt >= 14 ); + struct sock_filter filter[14] = { /* Check: Jump to RET_KILL_PROCESS if the script's arch != the runtime arch */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, ( offsetof( struct seccomp_data, arch ) ) ), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, ARCH_NR, 0, /* RET_KILL_PROCESS */ 16 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, ARCH_NR, 0, /* RET_KILL_PROCESS */ 10 ), /* loading syscall number in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, ( offsetof( struct seccomp_data, nr ) ) ), /* allow write based on expression */ - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_write, /* check_write */ 4, 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_write, /* check_write */ 2, 0 ), /* allow fsync based on expression */ - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_fsync, /* check_fsync */ 7, 0 ), - /* allow read based on expression */ - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_read, /* check_read */ 8, 0 ), - /* allow lseek based on expression */ - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_lseek, /* check_lseek */ 9, 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, SYS_fsync, /* check_fsync */ 5, 0 ), /* none of the syscalls matched */ - { BPF_JMP | BPF_JA, 0, 0, /* RET_KILL_PROCESS */ 10 }, + { BPF_JMP | BPF_JA, 0, 0, /* RET_KILL_PROCESS */ 6 }, // check_write: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, 2, /* RET_ALLOW */ 9, /* lbl_1 */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, 2, /* RET_ALLOW */ 5, /* lbl_1 */ 0 ), // lbl_1: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_ALLOW */ 7, /* RET_KILL_PROCESS */ 6 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_ALLOW */ 3, /* RET_KILL_PROCESS */ 2 ), // check_fsync: /* load syscall argument 0 in accumulator */ BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_ALLOW */ 5, /* RET_KILL_PROCESS */ 4 ), -// check_read: - /* load syscall argument 0 in accumulator */ - BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, blockstore_fd, /* RET_ALLOW */ 3, /* RET_KILL_PROCESS */ 2 ), -// check_lseek: - /* load syscall argument 0 in accumulator */ - BPF_STMT( BPF_LD | BPF_W | BPF_ABS, offsetof(struct seccomp_data, args[0])), - BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, blockstore_fd, /* RET_ALLOW */ 1, /* RET_KILL_PROCESS */ 0 ), + BPF_JUMP( BPF_JMP | BPF_JEQ | BPF_K, logfile_fd, /* RET_ALLOW */ 1, /* RET_KILL_PROCESS */ 0 ), // RET_KILL_PROCESS: /* KILL_PROCESS is placed before ALLOW since it's the fallthrough case. */ BPF_STMT( BPF_RET | BPF_K, SECCOMP_RET_KILL_PROCESS ), diff --git a/src/discof/store/fd_storei_tile.c b/src/discof/store/fd_storei_tile.c index 2c298b5b01..b1e13a6b3f 100644 --- a/src/discof/store/fd_storei_tile.c +++ b/src/discof/store/fd_storei_tile.c @@ -14,6 +14,7 @@ #include "../../disco/shred/fd_stake_ci.h" #include "../../disco/keyguard/fd_keyload.h" #include "../../disco/topo/fd_pod_format.h" +#include "../../discof/repair/fd_fec_chainer.h" #include "../../flamenco/runtime/fd_runtime.h" #include "../../disco/metrics/fd_metrics.h" @@ -86,6 +87,8 @@ struct fd_store_tile_ctx { int blockstore_fd; /* file descriptor for archival file */ fd_blockstore_t * blockstore; + fd_fec_chainer_t * fec_chain; + fd_wksp_t * stake_in_mem; ulong stake_in_chunk0; ulong stake_in_wmark; diff --git a/src/flamenco/runtime/fd_blockstore.c b/src/flamenco/runtime/fd_blockstore.c index bb184b8357..f236ce382d 100644 --- a/src/flamenco/runtime/fd_blockstore.c +++ b/src/flamenco/runtime/fd_blockstore.c @@ -58,7 +58,9 @@ fd_blockstore_new( void * shmem, void * txn_map = FD_SCRATCH_ALLOC_APPEND( l, fd_txn_map_align(), fd_txn_map_footprint( txn_max ) ); void * alloc = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() ); ulong top = FD_SCRATCH_ALLOC_FINI( l, fd_blockstore_align() ); - FD_TEST( fd_ulong_align_up( top - (ulong)shmem, fd_alloc_align() ) == fd_ulong_align_up( fd_blockstore_footprint( shred_max, block_max, idx_max, txn_max ), fd_alloc_align() ) ); + (void) top; + //FD_TEST( top - (ulong)shmem == fd_blockstore_footprint( shred_max, block_max, idx_max, txn_max ) ); + //FD_TEST( fd_ulong_align_up( top - (ulong)shmem, fd_alloc_align() ) == fd_ulong_align_up( fd_blockstore_footprint( shred_max, block_max, idx_max, txn_max ), fd_alloc_align() ) ); (void)shreds; fd_buf_shred_pool_new( shred_pool ); @@ -232,17 +234,9 @@ fd_blockstore_delete( void * shblockstore ) { } while(0); fd_blockstore_t * -fd_blockstore_init( fd_blockstore_t * blockstore, int fd, ulong fd_size_max, fd_slot_bank_t const * slot_bank ) { - - if ( fd_size_max < FD_BLOCKSTORE_ARCHIVE_MIN_SIZE ) { - FD_LOG_ERR(( "archive file size too small" )); - return NULL; - } - blockstore->shmem->archiver.fd_size_max = fd_size_max; +fd_blockstore_init( fd_blockstore_t * blockstore, fd_slot_bank_t const * slot_bank ) { //build_idx( blockstore, fd ); - lseek( fd, 0, SEEK_END ); - /* initialize fields using slot bank */ ulong smr = slot_bank->slot; @@ -323,8 +317,7 @@ fd_txn_key_hash( fd_txn_key_t const * k, ulong seed ) { return h; } -/* Remove a slot from blockstore. Needs to currently be under a blockstore_write - lock due to txn_map access. */ +/* Remove a slot from blockstore. Shreds are removed as soon as they replay. */ void fd_blockstore_slot_remove( fd_blockstore_t * blockstore, ulong slot ) { FD_LOG_NOTICE(( "[%s] slot: %lu", __func__, slot )); @@ -332,7 +325,6 @@ fd_blockstore_slot_remove( fd_blockstore_t * blockstore, ulong slot ) { /* It is not safe to remove a replaying block. */ fd_block_map_query_t query[1] = { 0 }; ulong parent_slot = FD_SLOT_NULL; - ulong received_idx = 0; int err = FD_MAP_ERR_AGAIN; while( err == FD_MAP_ERR_AGAIN ) { err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 ); @@ -344,7 +336,6 @@ fd_blockstore_slot_remove( fd_blockstore_t * blockstore, ulong slot ) { return; } parent_slot = block_info->parent_slot; - received_idx = block_info->received_idx; err = fd_block_map_query_test( query ); } @@ -365,17 +356,11 @@ fd_blockstore_slot_remove( fd_blockstore_t * blockstore, ulong slot ) { } fd_block_map_publish( query ); - /* Remove buf_shreds. */ - for( uint idx = 0; idx < received_idx; idx++ ) { - fd_blockstore_shred_remove( blockstore, slot, idx ); - } - return; } void fd_blockstore_publish( fd_blockstore_t * blockstore, - int fd FD_PARAM_UNUSED, ulong wmk ) { FD_LOG_NOTICE(( "[%s] wmk %lu => smr %lu", __func__, blockstore->shmem->wmk, wmk )); @@ -494,7 +479,9 @@ fd_blockstore_shred_insert( fd_blockstore_t * blockstore, fd_shred_t const * shr fd_shred_key_t key = { slot, .idx = shred->idx }; - /* Test if the blockstore already contains this shred key. */ + /* Test if the blockstore already contains this shred key. + TODO: remove this section? the fec_resolver is taking care of removing duplicates, so + not sure this'll ever get hit */ if( FD_UNLIKELY( fd_blockstore_shred_test( blockstore, slot, shred->idx ) ) ) { @@ -586,6 +573,10 @@ fd_blockstore_shred_insert( fd_blockstore_t * blockstore, fd_shred_t const * shr fd_block_map_publish( query ); + if( !fd_blockstore_block_info_test( blockstore, slot ) ) { + + FD_LOG_ERR(( "[%s] failed to insert new block map entry, slot %lu, idx %u", __func__, slot, shred->idx )); + } FD_TEST( fd_blockstore_block_info_test( blockstore, slot ) ); } fd_block_map_query_t query[1] = { 0 }; diff --git a/src/flamenco/runtime/fd_blockstore.h b/src/flamenco/runtime/fd_blockstore.h index 74125df40a..66807951d3 100644 --- a/src/flamenco/runtime/fd_blockstore.h +++ b/src/flamenco/runtime/fd_blockstore.h @@ -576,7 +576,7 @@ fd_blockstore_delete( void * shblockstore ); file. */ fd_blockstore_t * -fd_blockstore_init( fd_blockstore_t * blockstore, int fd, ulong fd_size_max, fd_slot_bank_t const * slot_bank ); +fd_blockstore_init( fd_blockstore_t * blockstore, fd_slot_bank_t const * slot_bank ); /* fd_blockstore_fini finalizes a blockstore. @@ -930,7 +930,7 @@ fd_blockstore_block_height_update( fd_blockstore_t * blockstore, ulong slot, ulo function. */ void -fd_blockstore_publish( fd_blockstore_t * blockstore, int fd, ulong wmk ); +fd_blockstore_publish( fd_blockstore_t * blockstore, ulong wmk ); void fd_blockstore_log_block_status( fd_blockstore_t * blockstore, ulong around_slot );