Skip to content

Commit a5a7732

Browse files
fix
1 parent 9adade4 commit a5a7732

File tree

1 file changed

+14
-0
lines changed

1 file changed

+14
-0
lines changed

src/funkier/fd_funkier_rec.c

+14
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
#define MAP_IMPL_STYLE 2
2323
#include "../util/tmpl/fd_map_para.c"
2424

25+
#include "../flamenco/fd_rwlock.h"
26+
static fd_rwlock_t _pool_lock = {0};
27+
2528
fd_funkier_rec_t const *
2629
fd_funkier_rec_query_try( fd_funkier_t * funk,
2730
fd_funkier_txn_t const * txn,
@@ -203,7 +206,9 @@ fd_funkier_rec_prepare( fd_funkier_t * funk,
203206
prepare->funk = funk;
204207
prepare->wksp = fd_funkier_wksp( funk );
205208
fd_funkier_rec_pool_t rec_pool = fd_funkier_rec_pool( funk, prepare->wksp );
209+
fd_rwlock_write(&_pool_lock);
206210
fd_funkier_rec_t * rec = prepare->rec = fd_funkier_rec_pool_acquire( &rec_pool, NULL, 1, opt_err );
211+
fd_rwlock_unwrite(&_pool_lock);
207212
if( opt_err && *opt_err == FD_POOL_ERR_CORRUPT ) {
208213
FD_LOG_ERR(( "corrupt element returned from funkier rec pool" ));
209214
}
@@ -254,6 +259,7 @@ fd_funkier_rec_publish( fd_funkier_rec_prepare_t * prepare ) {
254259
/* We need to hold the pool lock whilst inserting and removing from the map, to make the updates
255260
to the prev_idx/next_idx pointers atomic. */
256261
fd_funkier_rec_pool_lock( &rec_pool, 1 );
262+
fd_rwlock_write(&_pool_lock);
257263
rec->prev_idx = rec_prev_idx;
258264
if( fd_funkier_rec_idx_is_null( rec_prev_idx ) ) {
259265
*rec_head_idx = rec_idx;
@@ -264,14 +270,17 @@ fd_funkier_rec_publish( fd_funkier_rec_prepare_t * prepare ) {
264270
if( fd_funkier_rec_map_insert( &rec_map, rec, FD_MAP_FLAG_BLOCKING ) ) {
265271
FD_LOG_CRIT(( "fd_funkier_rec_map_insert failed" ));
266272
}
273+
fd_rwlock_unwrite(&_pool_lock);
267274
fd_funkier_rec_pool_unlock( &rec_pool );
268275
}
269276

270277
void
271278
fd_funkier_rec_cancel( fd_funkier_rec_prepare_t * prepare ) {
272279
fd_funkier_val_flush( prepare->rec, fd_funkier_alloc( prepare->funk, prepare->wksp ), prepare->wksp );
273280
fd_funkier_rec_pool_t rec_pool = fd_funkier_rec_pool( prepare->funk, prepare->wksp );
281+
fd_rwlock_write(&_pool_lock);
274282
fd_funkier_rec_pool_release( &rec_pool, prepare->rec, 1 );
283+
fd_rwlock_unwrite(&_pool_lock);
275284
}
276285

277286
fd_funkier_rec_t *
@@ -335,13 +344,15 @@ fd_funkier_rec_hard_remove( fd_funkier_t * funk,
335344
fd_funkier_rec_key_copy( pair->key, key );
336345

337346
fd_funkier_rec_pool_lock( &rec_pool, 1 );
347+
fd_rwlock_write(&_pool_lock);
338348

339349
fd_funkier_rec_t * rec = NULL;
340350
for(;;) {
341351
fd_funkier_rec_map_query_t rec_query[1];
342352
int err = fd_funkier_rec_map_remove( &rec_map, pair, NULL, rec_query, FD_MAP_FLAG_BLOCKING );
343353
if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
344354
if( err == FD_MAP_ERR_KEY ) {
355+
fd_rwlock_unwrite(&_pool_lock);
345356
fd_funkier_rec_pool_unlock( &rec_pool );
346357
return;
347358
}
@@ -363,10 +374,13 @@ fd_funkier_rec_hard_remove( fd_funkier_t * funk,
363374
if( fd_funkier_rec_idx_is_null( next_idx ) ) txn->rec_tail_idx = prev_idx;
364375
else rec_pool.ele[ next_idx ].prev_idx = prev_idx;
365376
}
377+
fd_rwlock_unwrite(&_pool_lock);
366378
fd_funkier_rec_pool_unlock( &rec_pool );
367379

368380
fd_funkier_val_flush( rec, alloc, wksp );
381+
fd_rwlock_write(&_pool_lock);
369382
fd_funkier_rec_pool_release( &rec_pool, rec, 1 );
383+
fd_rwlock_unwrite(&_pool_lock);
370384
}
371385

372386
int

0 commit comments

Comments
 (0)