Skip to content

replay/arxiv: set up shred archiving tile #4751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/app/firedancer-dev/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ extern fd_topo_run_tile_t fd_tile_repair;
extern fd_topo_run_tile_t fd_tile_storei;
extern fd_topo_run_tile_t fd_tile_replay;
extern fd_topo_run_tile_t fd_tile_replay_thread;
extern fd_topo_run_tile_t fd_tile_arxiv;
extern fd_topo_run_tile_t fd_tile_execor;
extern fd_topo_run_tile_t fd_tile_batch;
extern fd_topo_run_tile_t fd_tile_batch_thread;
Expand Down Expand Up @@ -109,6 +110,7 @@ fd_topo_run_tile_t * TILES[] = {
&fd_tile_storei,
&fd_tile_replay,
&fd_tile_replay_thread,
&fd_tile_arxiv,
&fd_tile_execor,
&fd_tile_batch,
&fd_tile_batch_thread,
Expand Down
12 changes: 12 additions & 0 deletions src/app/firedancer/topology.c
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ fd_topo_initialize( config_t * config ) {
fd_topob_wksp( topo, "voter_gossip" );
fd_topob_wksp( topo, "voter_dedup" );
fd_topob_wksp( topo, "batch_replay" );
fd_topob_wksp( topo, "replay_arxiv" );

fd_topob_wksp( topo, "rstart_gossi" );
fd_topob_wksp( topo, "gossi_rstart" );
Expand Down Expand Up @@ -240,6 +241,7 @@ fd_topo_initialize( config_t * config ) {
fd_topob_wksp( topo, "voter" );
fd_topob_wksp( topo, "poh_slot" );
fd_topob_wksp( topo, "eqvoc" );
fd_topob_wksp( topo, "arxiv" );
fd_topob_wksp( topo, "batch" );
fd_topob_wksp( topo, "btpool" );
fd_topob_wksp( topo, "constipate" );
Expand Down Expand Up @@ -295,6 +297,7 @@ fd_topo_initialize( config_t * config ) {
/**/ fd_topob_link( topo, "poh_pack", "replay_poh", 128UL, sizeof(fd_became_leader_t) , 1UL );

/**/ fd_topob_link( topo, "replay_voter", "replay_voter", 128UL, sizeof(fd_txn_p_t), 1UL );
/**/ fd_topob_link( topo, "replay_arxiv", "replay_arxiv", 128UL, sizeof(ulong), 1UL );
/**/ fd_topob_link( topo, "voter_gossip", "voter_gossip", 128UL, FD_TXN_MTU, 1UL );
/**/ fd_topob_link( topo, "voter_sign", "voter_sign", 128UL, FD_TXN_MTU, 1UL );
/**/ fd_topob_link( topo, "sign_voter", "sign_voter", 128UL, 64UL, 1UL );
Expand Down Expand Up @@ -355,6 +358,7 @@ fd_topo_initialize( config_t * config ) {
/**/ fd_topob_tile( topo, "sender", "voter", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 );
/**/ fd_topob_tile( topo, "bhole", "bhole", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 );
/**/ fd_topob_tile( topo, "eqvoc", "eqvoc", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 );
/**/ fd_topob_tile( topo, "arxiv", "arxiv", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 );

/**/ fd_topob_tile( topo, "replay", "replay", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 );
/* These thread tiles must be defined immediately after the replay tile. We subtract one because the replay tile acts as a thread in the tpool as well. */
Expand All @@ -377,6 +381,7 @@ fd_topo_initialize( config_t * config ) {
fd_topo_tile_t * batch_tile = &topo->tiles[ fd_topo_find_tile( topo, "batch" , 0UL ) ];
fd_topo_tile_t * pack_tile = &topo->tiles[ fd_topo_find_tile( topo, "pack" , 0UL ) ];
fd_topo_tile_t * exec_tile = &topo->tiles[ fd_topo_find_tile( topo, "exec" , 0UL ) ];
fd_topo_tile_t * arxiv_tile = &topo->tiles[ fd_topo_find_tile( topo, "arxiv" , 0UL ) ];

/* Create a shared blockstore to be used by store and replay. */
fd_topo_obj_t * blockstore_obj = setup_topo_blockstore( topo,
Expand All @@ -389,6 +394,8 @@ fd_topo_initialize( config_t * config ) {
fd_topob_tile_uses( topo, store_tile, blockstore_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
fd_topob_tile_uses( topo, replay_tile, blockstore_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
fd_topob_tile_uses( topo, repair_tile, blockstore_obj, FD_SHMEM_JOIN_MODE_READ_ONLY );
fd_topob_tile_uses( topo, arxiv_tile, blockstore_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );

if( enable_rpc ) {
fd_topo_tile_t * rpcserv_tile = &topo->tiles[ fd_topo_find_tile( topo, "rpcsrv", 0UL ) ];
fd_topob_tile_uses( topo, rpcserv_tile, blockstore_obj, FD_SHMEM_JOIN_MODE_READ_ONLY );
Expand Down Expand Up @@ -554,6 +561,7 @@ fd_topo_initialize( config_t * config ) {
/**/ fd_topob_tile_in( topo, "replay", 0UL, "metric_in", "pack_replay", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_in( topo, "replay", 0UL, "metric_in", "batch_replay", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_out( topo, "replay", 0UL, "replay_voter", 0UL );
/**/ fd_topob_tile_out( topo, "replay", 0UL, "replay_arxiv", 0UL );
FOR(bank_tile_cnt) fd_topob_tile_out( topo, "replay", 0UL, "replay_poh", i );
FOR(exec_tile_cnt) fd_topob_tile_out( topo, "replay", 0UL, "replay_exec", i ); /* TODO check order in fd_replay.c macros*/
FOR(shred_tile_cnt) fd_topob_tile_in( topo, "replay", 0UL, "metric_in", "shred_replay", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
Expand Down Expand Up @@ -589,6 +597,7 @@ fd_topo_initialize( config_t * config ) {
/**/ fd_topob_tile_out( topo, "repair", 0UL, "repair_sign", 0UL );
/**/ fd_topob_tile_in( topo, "repair", 0UL, "metric_in", "sign_repair", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_UNPOLLED );
/**/ fd_topob_tile_out( topo, "repair", 0UL, "repair_repla", 0UL );
/**/ fd_topob_tile_in( topo, "arxiv", 0UL, "metric_in", "replay_arxiv", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_out( topo, "sign", 0UL, "sign_repair", 0UL );

FOR(shred_tile_cnt) fd_topob_tile_in( topo, "eqvoc", 0UL, "metric_in", "shred_net", i, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
Expand Down Expand Up @@ -761,6 +770,9 @@ fd_topo_initialize( config_t * config ) {
tile->replay.full_interval = config->tiles.batch.full_interval;
tile->replay.incremental_interval = config->tiles.batch.incremental_interval;

} else if( FD_UNLIKELY( !strcmp( tile->name, "arxiv" ) ) ) {
strncpy( tile->arxiv.blockstore_file, config->blockstore.file, sizeof(tile->arxiv.blockstore_file) );

} else if( FD_UNLIKELY( !strcmp( tile->name, "bhole" ) ) ) {

} else if( FD_UNLIKELY( !strcmp( tile->name, "sign" ) ) ) {
Expand Down
2 changes: 1 addition & 1 deletion src/app/ledger/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -1329,7 +1329,7 @@ replay( fd_ledger_args_t * args ) {

fd_ledger_main_setup( args );

fd_blockstore_init( args->blockstore, -1, FD_BLOCKSTORE_ARCHIVE_MIN_SIZE, &args->slot_ctx->slot_bank );
fd_blockstore_init( args->blockstore, &args->slot_ctx->slot_bank );
fd_buf_shred_pool_reset( args->blockstore->shred_pool, 0 );

FD_LOG_WARNING(( "setup done" ));
Expand Down
16 changes: 16 additions & 0 deletions src/disco/fd_disco_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,22 @@ FD_FN_CONST static inline uint fd_disco_repair_replay_sig_data_cnt ( ulong
FD_FN_CONST static inline ushort fd_disco_repair_replay_sig_parent_off ( ulong sig ) { return (ushort)fd_ulong_extract ( sig, 1, 16 ); }
FD_FN_CONST static inline int fd_disco_repair_replay_sig_slot_complete( ulong sig ) { return fd_ulong_extract_bit( sig, 0 ); }

FD_FN_CONST static inline ulong
fd_disco_replay_arxiv_sig( ulong slot, uint start_idx, uint end_idx ){
/*
| slot (32) | start_idx (16) | end_idx (16) |
| [32, 63] | [16, 31] | [0, 15] |
*/
ulong slot_ul = fd_ulong_min( slot, (ulong)UINT_MAX );
ulong start_idx_ul = (ulong)start_idx;
ulong end_idx_ul = (ulong)end_idx;
return slot_ul << 32 | start_idx_ul << 16 | end_idx_ul;
}

FD_FN_CONST static inline ulong fd_disco_replay_arxiv_sig_slot ( ulong sig ) { return fd_ulong_extract( sig, 32, 63 ); }
FD_FN_CONST static inline uint fd_disco_replay_arxiv_sig_start_idx( ulong sig ) { return (uint)fd_ulong_extract( sig, 16, 31 ); }
FD_FN_CONST static inline uint fd_disco_replay_arxiv_sig_end_idx ( ulong sig ) { return (uint)fd_ulong_extract( sig, 0, 15 ); }

FD_FN_PURE static inline ulong
fd_disco_compact_chunk0( void * wksp ) {
return (((struct fd_wksp_private *)wksp)->gaddr_lo) >> FD_CHUNK_LG_SZ;
Expand Down
4 changes: 4 additions & 0 deletions src/disco/topo/fd_topo.h
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,10 @@ typedef struct {
char identity_key_path[ PATH_MAX ];
} eqvoc;

struct {
char blockstore_file[ PATH_MAX ];
} arxiv;

struct {
ushort rpc_port;
ushort tpu_port;
Expand Down
1 change: 1 addition & 0 deletions src/disco/topo/fd_topob.c
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ fd_topob_auto_layout( fd_topo_t * topo ) {
"gossip", /* FIREDANCER only */
"repair", /* FIREDANCER only */
"replay", /* FIREDANCER only */
"arxiv", /* FIREDANCER only */
"exec", /* FIREDANCER only */
"rtpool", /* FIREDANCER only */
"sender", /* FIREDANCER only */
Expand Down
6 changes: 6 additions & 0 deletions src/discof/arxiv/Local.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
ifdef FD_HAS_INT128
ifdef FD_HAS_SSE
$(call add-objs,fd_shred_arxiv fd_arxiv_tile,fd_discof)
$(call make-unit-test,test_arxiv,test_arxiv,fd_discof fd_flamenco fd_ballet fd_util)
endif
endif
205 changes: 205 additions & 0 deletions src/discof/arxiv/fd_arxiv_tile.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
#include <stdlib.h>
#define _GNU_SOURCE
#include "../../disco/tiles.h"
#include "generated/fd_arxiv_tile_seccomp.h"
#include "fd_shred_arxiv.h"
#include "../../disco/topo/fd_pod_format.h"

#define REPLAY_IN_IDX 0UL

struct fd_arxiv_tile_ctx {
fd_shred_arxiver_t * arxiver;
int arxiver_fd; /* file descriptor for the archive file */
fd_blockstore_t blockstore_ljoin;
fd_blockstore_t * blockstore;

void * replay_in_mem;
ulong replay_in_chunk0;
ulong replay_in_wmark;
};
typedef struct fd_arxiv_tile_ctx fd_arxiv_tile_ctx_t;

FD_FN_CONST static inline ulong
scratch_align( void ) {
return 128UL;
}

FD_FN_PURE static inline ulong
scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
/* clang-format off */
ulong l = FD_LAYOUT_INIT;
l = FD_LAYOUT_APPEND( l, alignof(fd_arxiv_tile_ctx_t), sizeof(fd_arxiv_tile_ctx_t) );
l = FD_LAYOUT_APPEND( l, fd_shred_arxiv_align(), fd_shred_arxiv_footprint( FD_SHRED_ARXIV_MIN_SIZE ) );
return FD_LAYOUT_FINI( l, scratch_align() );
/* clang-format on */
}

static void
during_frag( fd_arxiv_tile_ctx_t * ctx,
ulong in_idx,
ulong seq FD_PARAM_UNUSED,
ulong sig FD_PARAM_UNUSED,
ulong chunk,
ulong sz,
ulong ctl FD_PARAM_UNUSED ) {

if( in_idx == REPLAY_IN_IDX ) {
if( FD_UNLIKELY( chunk<ctx->replay_in_chunk0 || chunk>ctx->replay_in_wmark || sz>USHORT_MAX ) ) {
FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->replay_in_chunk0, ctx->replay_in_wmark ));
}
ulong slot = fd_disco_replay_arxiv_sig_slot( sig );
uint start_idx = fd_disco_replay_arxiv_sig_start_idx( sig );
uint end_idx = fd_disco_replay_arxiv_sig_end_idx( sig );

fd_shreds_checkpt( ctx->arxiver, ctx->blockstore, slot, start_idx, end_idx );
}

(void)chunk;
(void)sz;
}

static void
after_frag( fd_arxiv_tile_ctx_t * ctx,
ulong in_idx FD_PARAM_UNUSED,
ulong seq FD_PARAM_UNUSED,
ulong sig FD_PARAM_UNUSED,
ulong sz FD_PARAM_UNUSED,
ulong tsorig FD_PARAM_UNUSED,
ulong tspub FD_PARAM_UNUSED,
fd_stem_context_t * stem FD_PARAM_UNUSED ) {
(void)ctx;
/* Let's test for fun that the shred was written to file properly */
if( in_idx == REPLAY_IN_IDX ) {
FD_LOG_WARNING(( "replay in idx %lu", in_idx ));
ulong slot = fd_disco_replay_arxiv_sig_slot( sig );
uint end_idx = fd_disco_replay_arxiv_sig_end_idx( sig );

fd_shred_idx_t * idx = fd_shred_idx_query( ctx->arxiver->shred_idx, slot << 32 | end_idx, NULL );
uchar shred_buf[FD_SHRED_MIN_SZ];
int err = fd_shred_restore( ctx->arxiver, idx, shred_buf, FD_SHRED_MIN_SZ );
FD_TEST( err == 0 );
const fd_shred_t * shred = fd_shred_parse( shred_buf, FD_SHRED_MIN_SZ );
FD_TEST( shred->slot == slot );
FD_TEST( shred->idx == end_idx );
}

}

static void
privileged_init( fd_topo_t * topo FD_PARAM_UNUSED,
fd_topo_tile_t * tile FD_PARAM_UNUSED ) {
void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );

FD_SCRATCH_ALLOC_INIT( l, scratch );
fd_arxiv_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_arxiv_tile_ctx_t), sizeof(fd_arxiv_tile_ctx_t) );
FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
memset( ctx, 0, sizeof(fd_arxiv_tile_ctx_t) );

ctx->arxiver_fd = open( tile->arxiv.blockstore_file, O_RDWR|O_CREAT, 0666 );
if( FD_UNLIKELY( ctx->arxiver_fd==-1 ) ) {
FD_LOG_ERR(( "failed to open arxiver fd" ));
}
}

static void
unprivileged_init( fd_topo_t * topo,
fd_topo_tile_t * tile ) {
void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );

FD_SCRATCH_ALLOC_INIT( l, scratch );
fd_arxiv_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_arxiv_tile_ctx_t), sizeof(fd_arxiv_tile_ctx_t) );
void * arxiver = FD_SCRATCH_ALLOC_APPEND( l, fd_shred_arxiv_align(), fd_shred_arxiv_footprint( FD_SHRED_ARXIV_MIN_SIZE ) );
ulong scratch_alloc_mem = FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
if( FD_UNLIKELY( scratch_alloc_mem - (ulong)scratch - scratch_footprint( tile ) ) ) {
FD_LOG_ERR(( "scratch footprint mismatch" ));
}

ulong blockstore_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX,"blockstore" );
FD_TEST( blockstore_obj_id!=ULONG_MAX );
ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, fd_topo_obj_laddr( topo, blockstore_obj_id ) );

FD_TEST( ctx->blockstore->shmem->magic == FD_BLOCKSTORE_MAGIC );

ctx->arxiver = fd_shred_arxiv_join( fd_shred_arxiv_new( arxiver, FD_SHRED_ARXIV_MIN_SIZE ) );
ctx->arxiver->fd = ctx->arxiver_fd;

/**********************************************************************/
/* links */
/**********************************************************************/

/* Setup replay tile input */
fd_topo_link_t * replay_in_link = &topo->links[ tile->in_link_id[ REPLAY_IN_IDX ] ];
ctx->replay_in_mem = topo->workspaces[ topo->objs[ replay_in_link->dcache_obj_id ].wksp_id ].wksp;
ctx->replay_in_chunk0 = fd_dcache_compact_chunk0( ctx->replay_in_mem, replay_in_link->dcache );
ctx->replay_in_wmark = fd_dcache_compact_wmark( ctx->replay_in_mem, replay_in_link->dcache, replay_in_link->mtu );

}

static void
after_credit( fd_arxiv_tile_ctx_t * ctx,
fd_stem_context_t * stem FD_PARAM_UNUSED,
int * opt_poll_in FD_PARAM_UNUSED,
int * charge_busy FD_PARAM_UNUSED ) {
(void)ctx;
}

static ulong
populate_allowed_seccomp( fd_topo_t const * topo,
fd_topo_tile_t const * tile,
ulong out_cnt,
struct sock_filter * out ) {

void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );

FD_SCRATCH_ALLOC_INIT( l, scratch );
fd_arxiv_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_arxiv_tile_ctx_t), sizeof(fd_arxiv_tile_ctx_t) );
FD_SCRATCH_ALLOC_FINI( l, sizeof(fd_arxiv_tile_ctx_t) );

populate_sock_filter_policy_fd_arxiv_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->arxiver_fd );
return sock_filter_policy_fd_arxiv_tile_instr_cnt;
}

static ulong
populate_allowed_fds( fd_topo_t const * topo,
fd_topo_tile_t const * tile,
ulong out_fds_cnt,
int * out_fds ) {
void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );

FD_SCRATCH_ALLOC_INIT( l, scratch );
fd_arxiv_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_arxiv_tile_ctx_t), sizeof(fd_arxiv_tile_ctx_t) );
FD_SCRATCH_ALLOC_FINI( l, sizeof(fd_arxiv_tile_ctx_t) );

if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));

ulong out_cnt = 0UL;
out_fds[ out_cnt++ ] = 2; /* stderr */
if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
out_fds[ out_cnt++ ] = ctx->arxiver_fd; /* shred store fd */
return out_cnt;
}


#define STEM_BURST (1UL)

#define STEM_CALLBACK_CONTEXT_TYPE fd_arxiv_tile_ctx_t
#define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_arxiv_tile_ctx_t)

#define STEM_CALLBACK_DURING_FRAG during_frag
#define STEM_CALLBACK_AFTER_FRAG after_frag
#define STEM_CALLBACK_AFTER_CREDIT after_credit

#include "../../disco/stem/fd_stem.c"

fd_topo_run_tile_t fd_tile_arxiv = {
.name = "arxiv",
.loose_footprint = 0UL,
.populate_allowed_seccomp = populate_allowed_seccomp,
.populate_allowed_fds = populate_allowed_fds,
.scratch_align = scratch_align,
.scratch_footprint = scratch_footprint,
.privileged_init = privileged_init,
.unprivileged_init = unprivileged_init,
.run = stem_run,
};
25 changes: 25 additions & 0 deletions src/discof/arxiv/fd_arxiv_tile.seccomppolicy
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# logfile_fd: It can be disabled by configuration, but typically tiles
# will open a log file on boot and write all messages there.
unsigned int logfile_fd, unsigned int blockstore_fd

# logging: all log messages are written to a file and/or pipe
#
# 'WARNING' and above are written to the STDERR pipe, while all messages
# are always written to the log file.
#
# arg 0 is the file descriptor to write to. The boot process ensures
# that descriptor 2 is always STDERR.
write: (or (eq (arg 0) 2)
(eq (arg 0) logfile_fd)
(eq (arg 0) blockstore_fd))

# logging: 'WARNING' and above fsync the logfile to disk immediately
#
# arg 0 is the file descriptor to fsync.
fsync: (eq (arg 0) logfile_fd)

# blockstore: read archival file
read: (eq (arg 0) blockstore_fd)

# blockstore: lseek archival file
lseek: (eq (arg 0) blockstore_fd)
Loading
Loading