Skip to content

Commit 2124f0d

Browse files
committed
feat(repair): implement fec_set_chainer for repair tile
1 parent 164b1b7 commit 2124f0d

File tree

4 files changed

+883
-0
lines changed

4 files changed

+883
-0
lines changed

src/discof/repair/Local.mk

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
ifdef FD_HAS_SSE
22
$(call add-objs,fd_repair_tile,fd_discof)
3+
$(call add-objs,fd_fec_chainer,fd_discof)
4+
$(call make-unit-test,test_fec_chainer,test_fec_chainer,fd_discof fd_flamenco fd_ballet fd_util)
35
endif

src/discof/repair/fd_fec_chainer.c

+312
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
#include "fd_fec_chainer.h"
2+
3+
void *
4+
fd_fec_chainer_new( void * shmem, ulong fec_max, ulong seed ) {
5+
6+
if( FD_UNLIKELY( !shmem ) ) {
7+
FD_LOG_WARNING(( "NULL mem" ));
8+
return NULL;
9+
}
10+
11+
if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_fec_chainer_align() ) ) ) {
12+
FD_LOG_WARNING(( "misaligned mem" ));
13+
return NULL;
14+
}
15+
16+
ulong footprint = fd_fec_chainer_footprint( fec_max );
17+
if( FD_UNLIKELY( !footprint ) ) {
18+
FD_LOG_WARNING(( "bad fec_max (%lu)", fec_max ));
19+
return NULL;
20+
}
21+
22+
fd_wksp_t * wksp = fd_wksp_containing( shmem );
23+
if( FD_UNLIKELY( !wksp ) ) {
24+
FD_LOG_WARNING(( "shmem must be part of a workspace" ));
25+
return NULL;
26+
}
27+
28+
fd_memset( shmem, 0, footprint );
29+
30+
fd_fec_chainer_t * chainer;
31+
int lg_fec_max = fd_ulong_find_msb( fd_ulong_pow2_up( fec_max ) ) + 2; /* +2 for fill ratio <= 0.25 */
32+
33+
FD_SCRATCH_ALLOC_INIT( l, shmem );
34+
chainer = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_chainer_align(), sizeof( fd_fec_chainer_t ) );
35+
void * ancestry = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_ancestry_align(), fd_fec_ancestry_footprint( fec_max ) );
36+
void * frontier = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_frontier_align(), fd_fec_frontier_footprint( fec_max ) );
37+
void * orphaned = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_orphaned_align(), fd_fec_orphaned_footprint( fec_max ) );
38+
void * pool = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_pool_align(), fd_fec_pool_footprint( fec_max ) );
39+
void * parents = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_parents_align(), fd_fec_parents_footprint( lg_fec_max ) );
40+
void * children = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_children_align(), fd_fec_children_footprint( lg_fec_max ) );
41+
void * queue = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_queue_align(), fd_fec_queue_footprint( fec_max ) );
42+
void * out = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_out_align(), fd_fec_out_footprint( fec_max ) );
43+
FD_TEST( FD_SCRATCH_ALLOC_FINI( l, fd_fec_chainer_align() ) == (ulong)shmem + footprint );
44+
45+
chainer->ancestry = fd_fec_ancestry_new( ancestry, fec_max, seed );
46+
chainer->frontier = fd_fec_frontier_new( frontier, fec_max, seed );
47+
chainer->orphaned = fd_fec_orphaned_new( orphaned, fec_max, seed );
48+
chainer->pool = fd_fec_pool_new ( pool, fec_max );
49+
chainer->parents = fd_fec_parents_new ( parents, lg_fec_max );
50+
chainer->children = fd_fec_children_new( children, lg_fec_max );
51+
chainer->queue = fd_fec_queue_new ( queue, fec_max );
52+
chainer->out = fd_fec_out_new ( out, fec_max );
53+
54+
return shmem;
55+
}
56+
57+
fd_fec_chainer_t *
58+
fd_fec_chainer_join( void * shfec_chainer ) {
59+
fd_fec_chainer_t * chainer = (fd_fec_chainer_t *)shfec_chainer;
60+
61+
if( FD_UNLIKELY( !chainer ) ) {
62+
FD_LOG_WARNING(( "NULL chainer" ));
63+
return NULL;
64+
}
65+
66+
chainer->ancestry = fd_fec_ancestry_join( chainer->ancestry );
67+
chainer->frontier = fd_fec_frontier_join( chainer->frontier );
68+
chainer->orphaned = fd_fec_orphaned_join( chainer->orphaned );
69+
chainer->pool = fd_fec_pool_join ( chainer->pool );
70+
chainer->parents = fd_fec_parents_join ( chainer->parents );
71+
chainer->children = fd_fec_children_join( chainer->children );
72+
chainer->queue = fd_fec_queue_join ( chainer->queue );
73+
chainer->out = fd_fec_out_join ( chainer->out );
74+
75+
return chainer;
76+
}
77+
78+
void *
79+
fd_fec_chainer_leave( fd_fec_chainer_t * chainer ) {
80+
81+
if( FD_UNLIKELY( !chainer ) ) {
82+
FD_LOG_WARNING(( "NULL chainer" ));
83+
return NULL;
84+
}
85+
86+
return (void *)chainer;
87+
}
88+
89+
void *
90+
fd_fec_chainer_delete( void * shchainer ) {
91+
fd_fec_chainer_t * chainer = (fd_fec_chainer_t *)shchainer;
92+
93+
if( FD_UNLIKELY( !chainer ) ) {
94+
FD_LOG_WARNING(( "NULL chainer" ));
95+
return NULL;
96+
}
97+
98+
if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)chainer, fd_fec_chainer_align() ) ) ) {
99+
FD_LOG_WARNING(( "misaligned chainer" ));
100+
return NULL;
101+
}
102+
103+
return chainer;
104+
}
105+
106+
fd_fec_ele_t *
107+
fd_fec_chainer_init( fd_fec_chainer_t * chainer, ulong slot, uchar merkle_root[static FD_SHRED_MERKLE_ROOT_SZ] ) {
108+
FD_TEST( fd_fec_pool_free( chainer->pool ) );
109+
fd_fec_ele_t * root = fd_fec_pool_ele_acquire( chainer->pool );
110+
FD_TEST( root );
111+
root->key = slot << 32 | ( UINT_MAX-1 ); // maintain invariant that no fec_set_idx=UINT_MAX lives in pool_ele
112+
root->slot = slot;
113+
root->fec_set_idx = UINT_MAX-1;
114+
root->data_cnt = 0;
115+
root->data_complete = 1;
116+
root->slot_complete = 1;
117+
root->parent_off = 0;
118+
memcpy( root->merkle_root, merkle_root, FD_SHRED_MERKLE_ROOT_SZ );
119+
memset( root->chained_merkle_root, 0, FD_SHRED_MERKLE_ROOT_SZ );
120+
121+
/* For the next slot that chains off the init slot, it will use the
122+
parent_map and key with slot | UINT_MAX to look for its parent, so
123+
we need to provide the artificial parent link between the last
124+
fec_set_idx and UINT_MAX. This way it can query for
125+
UINT_MAX -> UINT_MAX-1 -> get_pool_ele(UINT_MAX-1)*/
126+
127+
fd_fec_parent_t * p = fd_fec_parents_insert( chainer->parents, slot << 32 | UINT_MAX );
128+
p->parent_key = (slot << 32) | ( UINT_MAX - 1 );
129+
130+
fd_fec_frontier_ele_insert( chainer->frontier, root, chainer->pool );
131+
return root;
132+
}
133+
134+
void *
135+
fd_fec_chainer_fini( fd_fec_chainer_t * chainer ) {
136+
return (void *)chainer;
137+
}
138+
139+
FD_FN_PURE fd_fec_ele_t *
140+
fd_fec_chainer_query( fd_fec_chainer_t * chainer, ulong slot, uint fec_set_idx ) {
141+
ulong key = slot << 32 | fec_set_idx;
142+
fd_fec_ele_t * fec;
143+
fec = fd_fec_frontier_ele_query( chainer->frontier, &key, NULL, chainer->pool );
144+
fec = fd_ptr_if( !fec, fd_fec_ancestry_ele_query( chainer->ancestry, &key, NULL, chainer->pool ), fec );
145+
fec = fd_ptr_if( !fec, fd_fec_orphaned_ele_query( chainer->orphaned, &key, NULL, chainer->pool ), fec );
146+
return fec;
147+
}
148+
149+
static int
150+
is_last_fec( ulong key ){
151+
return ( (uint)fd_ulong_extract( key, 0, 31 ) & UINT_MAX ) == UINT_MAX; // lol fix
152+
}
153+
154+
/* chain performs a BFS using chainer->deque to advance the frontier and
155+
connect orphaned FECs to the tree. */
156+
157+
static void
158+
chain( fd_fec_chainer_t * chainer ) {
159+
while( FD_LIKELY( !fd_fec_queue_empty( chainer->queue ) ) ) {
160+
ulong key = fd_fec_queue_pop_head( chainer->queue );
161+
fd_fec_ele_t * ele = fd_fec_orphaned_ele_query( chainer->orphaned, &key, NULL, chainer->pool );
162+
163+
if( FD_LIKELY( !ele ) ) continue;
164+
165+
/* Query for the parent_key. */
166+
167+
fd_fec_parent_t * parent_key = fd_fec_parents_query( chainer->parents, key, NULL );
168+
if( FD_UNLIKELY( !parent_key ) ) continue; /* still orphaned */
169+
170+
/* If the parent is in the frontier, remove the parent and insert
171+
into ancestry. Otherwise check for parent in ancestry. */
172+
173+
if( FD_UNLIKELY( is_last_fec( parent_key->parent_key ) ) ) {
174+
/* If the parent was the last fec of the previous slot, the parent_key
175+
will be UINT_MAX. Need to query for the actual fec_set_idx of the
176+
last FEC. This is the double query */
177+
parent_key = fd_fec_parents_query( chainer->parents, parent_key->parent_key, NULL );
178+
if( !parent_key ) continue; /* still orphaned */
179+
}
180+
181+
fd_fec_ele_t * parent = fd_fec_frontier_ele_remove( chainer->frontier, &parent_key->parent_key, NULL, chainer->pool );
182+
if( FD_LIKELY( parent ) ) fd_fec_ancestry_ele_insert( chainer->ancestry, parent, chainer->pool );
183+
else parent = fd_fec_ancestry_ele_query( chainer->ancestry, &parent_key->parent_key, NULL, chainer->pool );
184+
185+
/* If the parent is not in frontier or ancestry, ele is still
186+
orphaned. Note it is possible to have inserted ele's parent but
187+
have ele still be orphaned, because parent is also orphaned. */
188+
189+
if( FD_UNLIKELY( !parent ) ) continue;
190+
191+
/* Remove ele from orphaned. */
192+
193+
fd_fec_ele_t * remove = fd_fec_orphaned_ele_remove( chainer->orphaned, &ele->key, NULL, chainer->pool );
194+
FD_TEST( remove == ele );
195+
196+
/* Verify the chained merkle root. */
197+
198+
if ( FD_UNLIKELY( memcmp( ele->chained_merkle_root, parent->merkle_root, FD_SHRED_MERKLE_ROOT_SZ ) ) ) {
199+
fd_fec_out_push_tail( chainer->out, (fd_fec_out_t){ .slot = ele->slot, .fec_set_idx = ele->fec_set_idx, .err = FD_FEC_CHAINER_ERR_MERKLE } );
200+
continue;
201+
}
202+
203+
/* Insert into frontier (ele is either advancing a fork or starting
204+
a new fork) and deliver to `out`. */
205+
206+
fd_fec_frontier_ele_insert( chainer->frontier, ele, chainer->pool );
207+
fd_fec_out_push_tail( chainer->out, (fd_fec_out_t){ .slot = ele->slot, .fec_set_idx = ele->fec_set_idx, .err = FD_FEC_CHAINER_SUCCESS } );
208+
209+
/* Check whether any of ele's children are orphaned and can be
210+
chained into the frontier. */
211+
212+
/* TODO this BFS can be structured differently without using any
213+
additional memory by reusing ele->next. */
214+
215+
if( FD_UNLIKELY( ele->slot_complete ) ) {
216+
fd_fec_children_t * fec_children = fd_fec_children_query( chainer->children, ele->slot, NULL );
217+
if( FD_UNLIKELY( !fec_children ) ) continue;
218+
for( ulong off = fd_slot_child_offs_const_iter_init( fec_children->child_offs );
219+
!fd_slot_child_offs_const_iter_done( off );
220+
off = fd_slot_child_offs_const_iter_next( fec_children->child_offs, off ) ) {
221+
ulong child_key = (ele->slot + off) << 32; /* always fec_set_idx 0 */
222+
fd_fec_ele_t * child = fd_fec_orphaned_ele_query( chainer->orphaned, &child_key, NULL, chainer->pool );
223+
if( FD_LIKELY( child ) ) {
224+
fd_fec_queue_push_tail( chainer->queue, child_key );
225+
}
226+
}
227+
} else {
228+
ulong child_key = (ele->slot << 32) | (ele->key + ele->data_cnt);
229+
fd_fec_queue_push_tail( chainer->queue, child_key );
230+
}
231+
}
232+
}
233+
234+
fd_fec_ele_t *
235+
fd_fec_chainer_insert( fd_fec_chainer_t * chainer,
236+
ulong slot,
237+
uint fec_set_idx,
238+
uint data_cnt,
239+
int data_complete,
240+
int slot_complete,
241+
ushort parent_off,
242+
uchar const merkle_root[static FD_SHRED_MERKLE_ROOT_SZ],
243+
uchar const chained_merkle_root[static FD_SHRED_MERKLE_ROOT_SZ] ) {
244+
ulong key = slot << 32 | fec_set_idx;
245+
246+
if( FD_UNLIKELY( fd_fec_chainer_query( chainer, slot, fec_set_idx ) ) ) {
247+
fd_fec_out_push_tail( chainer->out, (fd_fec_out_t){ slot, fec_set_idx, .err = FD_FEC_CHAINER_ERR_UNIQUE } );
248+
return NULL;
249+
}
250+
251+
# if FD_FEC_CHAINER_USE_HANDHOLDING
252+
FD_TEST( fd_fec_pool_free( chainer->pool ) ); /* FIXME lru? */
253+
# endif
254+
255+
fd_fec_ele_t * ele = fd_fec_pool_ele_acquire( chainer->pool );
256+
ele->key = key;
257+
ele->slot = slot;
258+
ele->fec_set_idx = fec_set_idx;
259+
ele->data_cnt = data_cnt;
260+
ele->data_complete = data_complete;
261+
ele->slot_complete = slot_complete;
262+
ele->parent_off = parent_off;
263+
memcpy( ele->merkle_root, merkle_root, FD_SHRED_MERKLE_ROOT_SZ );
264+
memcpy( ele->chained_merkle_root, chained_merkle_root, FD_SHRED_MERKLE_ROOT_SZ );
265+
266+
/* If it is the first FEC set, derive and insert parent_key->key into
267+
the parents map and parent_slot->slot into the children map. */
268+
269+
if ( FD_UNLIKELY( fec_set_idx == 0 ) ) {
270+
ulong parent_slot = slot - parent_off;
271+
272+
fd_fec_parent_t * parent_key = fd_fec_parents_insert( chainer->parents, key );
273+
parent_key->parent_key = (parent_slot << 32) | UINT_MAX;
274+
275+
fd_fec_children_t * fec_children = fd_fec_children_query( chainer->children, parent_slot, NULL );
276+
if( FD_LIKELY( !fec_children ) ) {
277+
fec_children = fd_fec_children_insert( chainer->children, parent_slot );
278+
fd_slot_child_offs_null( fec_children->child_offs );
279+
}
280+
fd_slot_child_offs_insert( fec_children->child_offs, parent_off );
281+
}
282+
283+
/* Derive and insert the child_key->key into the parents map. */
284+
285+
ulong child_key = ( slot << 32 ) | ( fec_set_idx + data_cnt );
286+
if( FD_UNLIKELY( slot_complete ) ) {
287+
288+
/* This is the last FEC set. There is a special case to add
289+
a UINT_MAX -> fec_set_idx link because the child slots will chain
290+
off of UINT_MAX, but the pool_ele will be keyed on fec_set_idx. */
291+
292+
fd_fec_parent_t * parent = fd_fec_parents_insert( chainer->parents, slot << 32 | UINT_MAX );
293+
parent->parent_key = key;
294+
295+
} else {
296+
297+
/* This is not the last FEC set. */
298+
/* Key the child to point to ele (child's parent). */
299+
300+
fd_fec_parent_t * parent = fd_fec_parents_insert( chainer->parents, child_key );
301+
parent->parent_key = key;
302+
}
303+
304+
/* Push ele into the BFS deque and the orphaned map for processing. */
305+
306+
fd_fec_queue_push_tail( chainer->queue, key );
307+
fd_fec_orphaned_ele_insert( chainer->orphaned, ele, chainer->pool );
308+
309+
chain( chainer );
310+
311+
return ele;
312+
}

0 commit comments

Comments
 (0)