Skip to content

Commit 08ebe62

Browse files
ptaffet-jumpmmcgee-jump
authored andcommitted
shred: allow additional destination
1 parent 28c34ff commit 08ebe62

File tree

6 files changed

+49
-8
lines changed

6 files changed

+49
-8
lines changed

src/app/fdctl/config/default.toml

+11
Original file line numberDiff line numberDiff line change
@@ -1198,6 +1198,17 @@ dynamic_port_range = "8900-9000"
11981198
# this one.
11991199
shred_listen_port = 8003
12001200

1201+
# Shreds can also be forwarded to a specific address, for
1202+
# example to run an unstaked RPC or for archiving. If
1203+
# additional_shred_destination is not empty, each new, valid
1204+
# shred that the validator receives will be forwarded to the
1205+
# specified address. additional_shred_destination must be in
1206+
# the form "ip:port", for example "1.2.3.4:5566". Shreds will
1207+
# be sent to this destination first, prior to sending to other
1208+
# validators. This applies to leader slots and non-leader
1209+
# slots.
1210+
additional_shred_destination = ""
1211+
12011212
# The metric tile receives metrics updates published from the rest
12021213
# of the tiles and serves them via. a Prometheus compatible HTTP
12031214
# endpoint.

src/app/fdctl/topology.c

+17
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,23 @@ fd_topo_initialize( config_t * config ) {
440440
tile->shred.expected_shred_version = config->consensus.expected_shred_version;
441441
tile->shred.shred_listen_port = config->tiles.shred.shred_listen_port;
442442
tile->shred.larger_shred_limits_per_block = config->development.bench.larger_shred_limits_per_block;
443+
char adtl_dest[ sizeof("255.255.255.255:65536") ];
444+
memcpy( adtl_dest, config->tiles.shred.additional_shred_destination, sizeof(adtl_dest) );
445+
if( FD_UNLIKELY( strcmp( adtl_dest, "" ) ) ) {
446+
char * ip_end = strchr( adtl_dest, ':' );
447+
if( FD_UNLIKELY( !ip_end ) ) FD_LOG_ERR(( "[tiles.shred.additional_shred_destination] must be empty or in the form ip:port" ));
448+
*ip_end = '\0';
449+
450+
if( FD_UNLIKELY( !fd_cstr_to_ip4_addr( adtl_dest, &(tile->shred.adtl_dest.ip) ) ) ) {
451+
FD_LOG_ERR(( "could not parse IP %s in [tiles.shred.additional_shred_destination]", adtl_dest ));
452+
}
453+
454+
tile->shred.adtl_dest.port = fd_cstr_to_ushort( ip_end+1 );
455+
if( FD_UNLIKELY( !tile->shred.adtl_dest.port ) ) FD_LOG_ERR(( "could not parse port %s in [tiles.shred.additional_shred_destination]", ip_end+1 ));
456+
} else {
457+
tile->shred.adtl_dest.ip = 0U;
458+
tile->shred.adtl_dest.port = 0;
459+
}
443460

444461
} else if( FD_UNLIKELY( !strcmp( tile->name, "store" ) ) ) {
445462
tile->store.disable_blockstore_from_slot = config->development.bench.disable_blockstore_from_slot;

src/app/shared/fd_config.h

+1
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,7 @@ struct fd_config {
371371
struct {
372372
uint max_pending_shred_sets;
373373
ushort shred_listen_port;
374+
char additional_shred_destination[ sizeof("255.255.255.255:65536") ];
374375
} shred;
375376

376377
struct {

src/app/shared/fd_config_parse.c

+1
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ fdctl_pod_to_cfg( config_t * config,
325325

326326
CFG_POP ( uint, tiles.shred.max_pending_shred_sets );
327327
CFG_POP ( ushort, tiles.shred.shred_listen_port );
328+
CFG_POP ( cstr, tiles.shred.additional_shred_destination );
328329

329330
CFG_POP ( cstr, tiles.metric.prometheus_listen_address );
330331
CFG_POP ( ushort, tiles.metric.prometheus_listen_port );

src/disco/shred/fd_shred_tile.c

+15-8
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ typedef struct {
145145

146146
int skip_frag;
147147

148+
fd_shred_dest_weighted_t adtl_dest[1];
149+
148150
fd_ip4_udp_hdrs_t data_shred_net_hdr [1];
149151
fd_ip4_udp_hdrs_t parity_shred_net_hdr[1];
150152

@@ -509,12 +511,10 @@ during_frag( fd_shred_ctx_t * ctx,
509511
}
510512

511513
static inline void
512-
send_shred( fd_shred_ctx_t * ctx,
513-
fd_shred_t const * shred,
514-
fd_shred_dest_t * sdest,
515-
fd_shred_dest_idx_t dest_idx,
516-
ulong tsorig ) {
517-
fd_shred_dest_weighted_t * dest = fd_shred_dest_idx_to_dest( sdest, dest_idx );
514+
send_shred( fd_shred_ctx_t * ctx,
515+
fd_shred_t const * shred,
516+
fd_shred_dest_weighted_t const * dest,
517+
ulong tsorig ) {
518518

519519
if( FD_UNLIKELY( !dest->ip4 ) ) return;
520520

@@ -646,7 +646,8 @@ after_frag( fd_shred_ctx_t * ctx,
646646
fd_shred_dest_idx_t * dests = fd_shred_dest_compute_children( sdest, &shred, 1UL, ctx->scratchpad_dests, 1UL, fanout, fanout, max_dest_cnt );
647647
if( FD_UNLIKELY( !dests ) ) break;
648648

649-
for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, *out_shred, sdest, dests[ j ], ctx->tsorig );
649+
send_shred( ctx, *out_shred, ctx->adtl_dest, ctx->tsorig );
650+
for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, *out_shred, fd_shred_dest_idx_to_dest( sdest, dests[ j ]), ctx->tsorig );
650651
} while( 0 );
651652

652653
if( FD_LIKELY( ctx->blockstore && rv==FD_FEC_RESOLVER_SHRED_OKAY ) ) { /* optimize for the compiler - branch predictor will still be correct */
@@ -763,7 +764,10 @@ after_frag( fd_shred_ctx_t * ctx,
763764
if( FD_UNLIKELY( !dests ) ) return;
764765

765766
/* Send only the ones we didn't receive. */
766-
for( ulong i=0UL; i<k; i++ ) for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, new_shreds[ i ], sdest, dests[ j*out_stride+i ], ctx->tsorig );
767+
for( ulong i=0UL; i<k; i++ ) {
768+
send_shred( ctx, new_shreds[ i ], ctx->adtl_dest, ctx->tsorig );
769+
for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, new_shreds[ i ], fd_shred_dest_idx_to_dest( sdest, dests[ j*out_stride+i ]), ctx->tsorig );
770+
}
767771
}
768772

769773
static void
@@ -921,6 +925,9 @@ unprivileged_init( fd_topo_t * topo,
921925
fd_ip4_udp_hdr_init( ctx->data_shred_net_hdr, FD_SHRED_MIN_SZ, 0, tile->shred.shred_listen_port );
922926
fd_ip4_udp_hdr_init( ctx->parity_shred_net_hdr, FD_SHRED_MAX_SZ, 0, tile->shred.shred_listen_port );
923927

928+
ctx->adtl_dest->ip4 = tile->shred.adtl_dest.ip;
929+
ctx->adtl_dest->port = tile->shred.adtl_dest.port;
930+
924931
for( ulong i=0UL; i<tile->in_cnt; i++ ) {
925932
fd_topo_link_t const * link = &topo->links[ tile->in_link_id[ i ] ];
926933
fd_topo_wksp_t const * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];

src/disco/topo/fd_topo.h

+4
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,10 @@ typedef struct {
230230
ushort shred_listen_port;
231231
int larger_shred_limits_per_block;
232232
ulong expected_shred_version;
233+
struct {
234+
uint ip;
235+
ushort port;
236+
} adtl_dest;
233237
} shred;
234238

235239
struct {

0 commit comments

Comments
 (0)