@@ -1092,15 +1092,17 @@ buf_flush_write_block_low(
1092
1092
#if defined(UNIV_PMEMOBJ_BUF)
1093
1093
1094
1094
// printf("\n [begin pm_buf_write space %zu page %zu==>", bpage->id.space(),bpage->id.page_no());
1095
- // if (!fsp_is_system_temporary(bpage->id.space())){
1096
- #if defined (UNIV_PMEMOBJ_BUF_V2)
1097
- int ret = pm_buf_write_no_free_pool (gb_pmw->pop , gb_pmw->pbuf , bpage->id , bpage->size , frame, sync );
1095
+
1096
+ #if defined(UNIV_PMEMOBJ_LSB)
1097
+ int ret = pm_lsb_write (gb_pmw->pop , gb_pmw->plsb , bpage->id , bpage->size , frame, sync );
1098
+ #elif defined (UNIV_PMEMOBJ_BUF_V2)
1099
+ int ret = pm_buf_write_no_free_pool (gb_pmw->pop , gb_pmw->pbuf , bpage->id , bpage->size , frame, sync );
1098
1100
#elif defined (UNIV_PMEMOBJ_BUF_FLUSHER)
1099
- int ret = pm_buf_write_with_flusher (gb_pmw->pop , gb_pmw->pbuf , bpage->id , bpage->size , frame, sync );
1101
+ int ret = pm_buf_write_with_flusher (gb_pmw->pop , gb_pmw->pbuf , bpage->id , bpage->size , frame, sync );
1100
1102
#elif defined (UNIV_PMEMOBJ_BUF_APPEND)
1101
- int ret = pm_buf_write_with_flusher_append (gb_pmw->pop , gb_pmw->pbuf , bpage->id , bpage->size , frame, sync );
1103
+ int ret = pm_buf_write_with_flusher_append (gb_pmw->pop , gb_pmw->pbuf , bpage->id , bpage->size , frame, sync );
1102
1104
#else
1103
- int ret = pm_buf_write (gb_pmw->pop , gb_pmw->pbuf , bpage->id , bpage->size , frame, sync );
1105
+ int ret = pm_buf_write (gb_pmw->pop , gb_pmw->pbuf , bpage->id , bpage->size , frame, sync );
1104
1106
#endif
1105
1107
// printf("END pm_buf_write space %zu page %zu]\n", bpage->id.space(),bpage->id.page_no());
1106
1108
// printf(" END pm_buf_write]");
@@ -3520,7 +3522,12 @@ DECLARE_THREAD(buf_flush_page_cleaner_coordinator)(
3520
3522
buf_page_cleaner_is_active = false ;
3521
3523
#if defined (UNIV_PMEMOBJ_BUF_FLUSHER)
3522
3524
printf (" PMEM_DEBUG buf_page_cleaner_is_active = false\n " );
3525
+
3526
+ #if defined (UNIV_PMEMOBJ_LSB)
3527
+ PMEM_FLUSHER* flusher = gb_pmw->plsb ->flusher ;
3528
+ #else
3523
3529
PMEM_FLUSHER* flusher = gb_pmw->pbuf ->flusher ;
3530
+ #endif
3524
3531
os_event_set (flusher->is_req_not_empty );
3525
3532
#endif
3526
3533
@@ -3910,9 +3917,8 @@ FlushObserver::flush()
3910
3917
3911
3918
#if defined (UNIV_PMEMOBJ_BUF_FLUSHER)
3912
3919
// ******************* FLUSHER implementation **********/
3913
- void
3920
+ PMEM_FLUSHER*
3914
3921
pm_flusher_init (
3915
- PMEM_BUF* buf,
3916
3922
const size_t size) {
3917
3923
PMEM_FLUSHER* flusher;
3918
3924
ulint i;
@@ -3935,52 +3941,55 @@ pm_flusher_init(
3935
3941
flusher->n_requested = 0 ;
3936
3942
flusher->is_running = false ;
3937
3943
3938
- // flusher->flush_list_arr = static_cast <PMEM_BUF_BLOCK_LIST*> ( calloc(size, sizeof(PMEM_BUF_BLOCK_LIST*)));
3939
3944
flusher->flush_list_arr = static_cast <PMEM_BUF_BLOCK_LIST**> ( calloc (size, sizeof (PMEM_BUF_BLOCK_LIST*)));
3940
3945
for (i = 0 ; i < size; i++) {
3941
- // flusher->flush_list_arr[i] = static_cast <PMEM_BUF_BLOCK_LIST*> (
3942
- // malloc(sizeof(PMEM_BUF_BLOCK_LIST)));
3943
3946
flusher->flush_list_arr [i] = NULL ;
3944
3947
}
3945
- buf->flusher = flusher;
3948
+ #if defined (UNIV_PMEMOBJ_LSB)
3949
+ flusher->bucket_arr = static_cast <PMEM_LSB_HASH_BUCKET**> ( calloc (size, sizeof (PMEM_LSB_HASH_BUCKET*)));
3950
+ for (i = 0 ; i < size; i++) {
3951
+ flusher->bucket_arr [i] = NULL ;
3952
+ }
3953
+ #endif
3954
+ return flusher;
3946
3955
}
3947
3956
void
3948
3957
pm_buf_flusher_close (
3949
- PMEM_BUF* buf ) {
3958
+ PMEM_FLUSHER* flusher ) {
3950
3959
ulint i;
3951
3960
3952
3961
// wait for all workers finish their work
3953
- while (buf-> flusher ->n_workers > 0 ) {
3962
+ while (flusher->n_workers > 0 ) {
3954
3963
os_thread_sleep (10000 );
3955
3964
}
3956
3965
3957
- for (i = 0 ; i < buf-> flusher ->size ; i++) {
3958
- if (buf-> flusher ->flush_list_arr [i]){
3966
+ for (i = 0 ; i < flusher->size ; i++) {
3967
+ if (flusher->flush_list_arr [i]){
3959
3968
// free(buf->flusher->flush_list_arr[i]);
3960
- buf-> flusher ->flush_list_arr [i] = NULL ;
3969
+ flusher->flush_list_arr [i] = NULL ;
3961
3970
}
3962
3971
3963
3972
}
3964
3973
3965
- if (buf-> flusher ->flush_list_arr ){
3966
- free (buf-> flusher ->flush_list_arr );
3967
- buf-> flusher ->flush_list_arr = NULL ;
3974
+ if (flusher->flush_list_arr ){
3975
+ free (flusher->flush_list_arr );
3976
+ flusher->flush_list_arr = NULL ;
3968
3977
}
3969
3978
// printf("free array ok\n");
3970
3979
3971
- mutex_destroy (&buf-> flusher ->mutex );
3980
+ mutex_destroy (&flusher->mutex );
3972
3981
3973
- os_event_destroy (buf-> flusher ->is_req_not_empty );
3974
- os_event_destroy (buf-> flusher ->is_req_full );
3982
+ os_event_destroy (flusher->is_req_not_empty );
3983
+ os_event_destroy (flusher->is_req_full );
3975
3984
// os_event_destroy(buf->flusher->is_flush_full);
3976
3985
3977
- os_event_destroy (buf-> flusher ->is_all_finished );
3978
- os_event_destroy (buf-> flusher ->is_all_closed );
3986
+ os_event_destroy (flusher->is_all_finished );
3987
+ os_event_destroy (flusher->is_all_closed );
3979
3988
// printf("destroys mutex and events ok\n");
3980
3989
3981
- if (buf-> flusher ){
3982
- buf-> flusher = NULL ;
3983
- free (buf-> flusher );
3990
+ if (flusher){
3991
+ flusher = NULL ;
3992
+ free (flusher);
3984
3993
}
3985
3994
// printf("free flusher ok\n");
3986
3995
}
@@ -4024,7 +4033,12 @@ DECLARE_THREAD(pm_flusher_coordinator)(
4024
4033
}
4025
4034
#endif /* UNIV_LINUX */
4026
4035
4036
+ #if defined (UNIV_PMEMOBJ_LSB)
4037
+ PMEM_FLUSHER* flusher = gb_pmw->plsb ->flusher ;
4038
+ #else
4027
4039
PMEM_FLUSHER* flusher = gb_pmw->pbuf ->flusher ;
4040
+ #endif // UNIV_PMEMOBJ_LSB
4041
+
4028
4042
flusher->is_running = true ;
4029
4043
// ulint ret;
4030
4044
@@ -4066,7 +4080,12 @@ DECLARE_THREAD(pm_flusher_worker)(
4066
4080
{
4067
4081
ulint i;
4068
4082
4083
+ #if defined (UNIV_PMEMOBJ_LSB)
4084
+ PMEM_FLUSHER* flusher = gb_pmw->plsb ->flusher ;
4085
+ #else
4069
4086
PMEM_FLUSHER* flusher = gb_pmw->pbuf ->flusher ;
4087
+ #endif // UNIV_PMEM_LSB
4088
+
4070
4089
PMEM_BUF_BLOCK_LIST* plist = NULL ;
4071
4090
4072
4091
my_thread_init ();
@@ -4087,6 +4106,25 @@ DECLARE_THREAD(pm_flusher_worker)(
4087
4106
mutex_enter (&flusher->mutex );
4088
4107
if (flusher->n_requested > 0 ) {
4089
4108
4109
+ #if defined (UNIV_PMEMOBJ_LSB)
4110
+ // Case B: Implement of LSB
4111
+ // find the first non-null pointer and do aio flush for the bucket
4112
+ for (i = 0 ; i < flusher->size ; i++) {
4113
+ PMEM_LSB_HASH_BUCKET* bucket = flusher->bucket_arr [i];
4114
+ if (bucket){
4115
+ pm_lsb_flush_bucket (gb_pmw->pop , gb_pmw->plsb , bucket);
4116
+ flusher->n_requested --;
4117
+ os_event_set (flusher->is_req_full );
4118
+ flusher->bucket_arr [i] = NULL ;
4119
+ #if defined (UNIV_PMEMOBJ_LSB_DEBUG)
4120
+ // printf("LSB [2] pm_flusher_worker flusher->size %zu bucket pointer index %zu\n", flusher->size, i);
4121
+ #endif
4122
+ break ;
4123
+ }
4124
+ }
4125
+
4126
+ #else // UNIV_PMEMOBJ_BUF
4127
+ // Case A: Implement of PB-NVM
4090
4128
for (i = 0 ; i < flusher->size ; i++) {
4091
4129
plist = flusher->flush_list_arr [i];
4092
4130
if (plist)
@@ -4106,6 +4144,8 @@ DECLARE_THREAD(pm_flusher_worker)(
4106
4144
break ;
4107
4145
}
4108
4146
}
4147
+ #endif // UNIV_PMEMOBJ_LSB
4148
+
4109
4149
} // end if flusher->n_requested > 0
4110
4150
4111
4151
if (flusher->n_requested == 0 ) {
@@ -4294,6 +4334,76 @@ pm_handle_finished_block_with_flusher(
4294
4334
// the list has some unfinished aio
4295
4335
pmemobj_rwlock_unlock (pop, &pflush_list->lock );
4296
4336
}
4337
+
4338
+ #if defined (UNIV_PMEMOBJ_LSB)
4339
+ /*
4340
+ *Handle finish block in the aio
4341
+ Note that this function may has contention between flush threads
4342
+ * */
4343
+ void
4344
+ pm_lsb_handle_finished_block (
4345
+ PMEMobjpool* pop,
4346
+ PMEM_LSB* lsb,
4347
+ PMEM_BUF_BLOCK* pblock)
4348
+ {
4349
+ PMEM_FLUSHER* flusher;
4350
+ ulint i;
4351
+
4352
+ // (1) handle the lsb_list
4353
+ PMEM_BUF_BLOCK_LIST* plsb_list = D_RW (lsb->lsb_list );
4354
+
4355
+ // Unlike PB-NVM, LSB implement lock the lsb list until all pages finish propagation, so we don't need to lock the list
4356
+ // pmemobj_rwlock_wrlock(pop, &pflush_list->lock);
4357
+ pmemobj_rwlock_wrlock (pop, &lsb->lsb_aio_lock );
4358
+ ++lsb->n_aio_completed ;
4359
+ pmemobj_rwlock_unlock (pop, &lsb->lsb_aio_lock );
4360
+
4361
+ if (lsb->n_aio_completed == plsb_list->cur_pages )
4362
+ // if (lsb->n_aio_completed == lsb->n_aio_submitted)
4363
+ {
4364
+ #if defined (UNIV_PMEMOBJ_LSB_DEBUG)
4365
+ printf (" LSB [5] pm_lsb_handle_finished_block ALL FINISHED lsb->n_aio_completed/n_aio_submitted %zu/%zu cur_pages %zu max_pages %zu \n " , lsb->n_aio_completed , lsb->n_aio_submitted , plsb_list->cur_pages , plsb_list->max_pages );
4366
+ #endif
4367
+ // (0) flush spaces
4368
+ pm_lsb_flush_spaces_in_list (pop, lsb, plsb_list);
4369
+ //
4370
+ // Reset the param_array
4371
+ ulint arr_idx;
4372
+ arr_idx = plsb_list->param_arr_index ;
4373
+ assert (arr_idx >= 0 );
4374
+
4375
+ for (i = 0 ; i < lsb->param_arr_size ; ++i){
4376
+ lsb->param_arrs [i].is_free = true ;
4377
+ }
4378
+ lsb->cur_free_param = 0 ;
4379
+
4380
+ // (1) Reset blocks in the list
4381
+ for (i = 0 ; i < plsb_list->max_pages ; i++) {
4382
+ D_RW (D_RW (plsb_list->arr )[i])->state = PMEM_FREE_BLOCK;
4383
+ D_RW (D_RW (plsb_list->arr )[i])->sync = false ;
4384
+ }
4385
+ plsb_list->cur_pages = 0 ;
4386
+ plsb_list->is_flush = false ;
4387
+
4388
+ // (2) Reset the hashtable
4389
+ pm_lsb_hashtable_reset (pop, lsb);
4390
+ lsb->n_aio_submitted = lsb->n_aio_completed = 0 ;
4391
+
4392
+ // (3) Reset the flusher
4393
+ flusher = lsb->flusher ;
4394
+ mutex_enter (&flusher->mutex );
4395
+ for (i = 0 ; i < flusher->size ; ++i) {
4396
+ flusher->bucket_arr [i] = NULL ;
4397
+ }
4398
+ flusher->n_requested = 0 ;
4399
+ mutex_exit (&flusher->mutex );
4400
+
4401
+ // (4) wakeup the write thread
4402
+ os_event_set (lsb->all_aio_finished );
4403
+ }
4404
+ }
4405
+ #endif // UNIV_PMEMOBJ_LSB
4406
+
4297
4407
#endif // UNIV_PMEMOBJ_BUF_FLUSHER
4298
4408
4299
4409
// ///////////////////////////////////////////////
@@ -4333,24 +4443,19 @@ DECLARE_THREAD(pm_buf_flush_list_cleaner_coordinator)(
4333
4443
pfs_register_thread (pm_list_cleaner_thread_key);
4334
4444
#endif /* UNIV_PFS_THREAD */
4335
4445
4336
- #if defined(UNIV_PMEMOBJ_BUF_FLUSHER)
4337
- PMEM_FLUSHER* flusher = gb_pmw->pbuf ->flusher ;
4338
- #endif
4339
4446
4340
4447
while (srv_shutdown_state == SRV_SHUTDOWN_NONE) {
4341
4448
// print out each 10s
4342
4449
os_thread_sleep (10000000 );
4343
4450
if (srv_shutdown_state != SRV_SHUTDOWN_NONE) {
4344
4451
break ;
4345
4452
}
4453
+ #if defined (UNIV_PMEMOBJ_LSB)
4454
+ printf (" cur lsb_list cur pages/max_pages = %zu/%zu\n " , D_RW (gb_pmw->plsb ->lsb_list )->cur_pages , D_RW (gb_pmw->plsb ->lsb_list )->max_pages );
4455
+ #else
4346
4456
printf (" cur free list = %zu, cur spec_list = %zu\n " ,
4347
4457
D_RW (gb_pmw->pbuf ->free_pool )->cur_lists ,
4348
4458
D_RW (gb_pmw->pbuf ->spec_list )->cur_pages );
4349
-
4350
- #if defined(UNIV_PMEMOBJ_BUF_FLUSHER)
4351
- // mutex_enter(&flusher->mutex);
4352
- // printf(" n_requested/size %zu/%zu \n", flusher->n_requested, flusher->size);
4353
- // mutex_exit(&flusher->mutex);
4354
4459
#endif
4355
4460
} // end while thread
4356
4461
0 commit comments