diff --git a/src/api/include/pdc_client_connect.h b/src/api/include/pdc_client_connect.h index 52208588b..8ebbd562e 100644 --- a/src/api/include/pdc_client_connect.h +++ b/src/api/include/pdc_client_connect.h @@ -226,7 +226,12 @@ perr_t PDC_Client_transfer_request(void *buf, pdcid_t obj_id, uint32_t data_serv int PDC_Client_get_var_type_size(pdc_var_type_t dtype); perr_t PDC_Client_transfer_request_all(int n_objs, pdc_access_t access_type, uint32_t data_server_id, - char *bulk_buf, hg_size_t bulk_size, uint64_t *metadata_id); + char *bulk_buf, hg_size_t bulk_size, uint64_t *metadata_id, +#ifdef ENABLE_MPI + MPI_Comm comm); +#else + int comm); +#endif perr_t PDC_Client_transfer_request_metadata_query(char *buf, uint64_t total_buf_size, int n_objs, uint32_t metadata_server_id, uint8_t is_write, diff --git a/src/api/pdc_client_connect.c b/src/api/pdc_client_connect.c index a6d05dc35..6af623396 100644 --- a/src/api/pdc_client_connect.c +++ b/src/api/pdc_client_connect.c @@ -3214,7 +3214,12 @@ PDC_Client_flush_obj_all() perr_t PDC_Client_transfer_request_all(int n_objs, pdc_access_t access_type, uint32_t data_server_id, char *bulk_buf, - hg_size_t bulk_size, uint64_t *metadata_id) + hg_size_t bulk_size, uint64_t *metadata_id, +#ifdef ENABLE_MPI + MPI_Comm comm) +#else + int comm) +#endif { perr_t ret_value = SUCCEED; hg_return_t hg_ret = HG_SUCCESS; @@ -3278,7 +3283,8 @@ PDC_Client_transfer_request_all(int n_objs, pdc_access_t access_type, uint32_t d #endif #ifdef ENABLE_MPI - MPI_Barrier(MPI_COMM_WORLD); + if (comm != 0) + MPI_Barrier(comm); #endif PDC_Client_transfer_pthread_create(); @@ -3297,11 +3303,6 @@ PDC_Client_transfer_request_all(int n_objs, pdc_access_t access_type, uint32_t d PGOTO_ERROR(FAIL, "PDC_Client_send_transfer_request_all(): Could not start HG_Forward() @ line %d\n", __LINE__); - /* if (hg_progress_flag_g == -1) { */ - /* pthread_create(&hg_progress_tid_g, NULL, hg_progress_fn, send_context_g); */ - /* hg_progress_flag_g = 0; */ - /* } */ - /* PDC_Client_check_response(&send_context_g); */ PDC_Client_wait_pthread_progress(); @@ -3312,7 +3313,8 @@ PDC_Client_transfer_request_all(int n_objs, pdc_access_t access_type, uint32_t d #endif #ifdef ENABLE_MPI - MPI_Barrier(MPI_COMM_WORLD); + if (comm != 0) + MPI_Barrier(comm); #endif #ifdef PDC_TIMING diff --git a/src/api/pdc_obj/pdc_obj.c b/src/api/pdc_obj/pdc_obj.c index 9b307e5fa..7457e4d79 100644 --- a/src/api/pdc_obj/pdc_obj.c +++ b/src/api/pdc_obj/pdc_obj.c @@ -393,23 +393,23 @@ PDC_obj_close(struct _pdc_obj_info *op) perr_t ret_value = SUCCEED; pdcid_t * transfer_request_id; pdc_local_transfer_request *temp, *previous; - int i; + int i, n; FUNC_ENTER(NULL); if (op->local_transfer_request_size) { transfer_request_id = (pdcid_t *)malloc(sizeof(pdcid_t) * op->local_transfer_request_size); temp = op->local_transfer_request_head; - i = 0; + n = 0; while (temp != NULL) { - transfer_request_id[i] = temp->local_id; + transfer_request_id[n] = temp->local_id; previous = temp; temp = temp->next; free(previous); - ++i; + ++n; } - PDCregion_transfer_wait_all(transfer_request_id, op->local_transfer_request_size); - for (i = 0; i < op->local_transfer_request_size; ++i) { + PDCregion_transfer_wait_all(transfer_request_id, n); + for (i = 0; i < n; ++i) { PDCregion_transfer_close(transfer_request_id[i]); } free(transfer_request_id); diff --git a/src/api/pdc_region/include/pdc_region.h b/src/api/pdc_region/include/pdc_region.h index f30c2e595..9a534e9b9 100644 --- a/src/api/pdc_region/include/pdc_region.h +++ b/src/api/pdc_region/include/pdc_region.h @@ -27,6 +27,9 @@ #include "pdc_public.h" #include "pdc_obj.h" +#ifdef ENABLE_MPI +#include "mpi.h" +#endif /**************************/ /* Library Public Struct */ @@ -97,30 +100,101 @@ perr_t PDCregion_close(pdcid_t region_id); */ void PDCregion_free(struct pdc_region_info *region); -pdcid_t PDCregion_transfer_create(void *buf, pdc_access_t access_type, pdcid_t obj_id, pdcid_t local_reg, - pdcid_t remote_reg); /** - * Start a region transfer from local region to remote region for an object on buf. + * Create a region transfer request (asynchronously) * * \param buf [IN] Start point of an application buffer - * \param obj_id [IN] ID of the target object - * \param data_type [IN] Data type of data in memory + * \param access_type[IN] Read or write operation + * \param obj_id [IN] Object ID * \param local_reg [IN] ID of the source region * \param remote_reg [IN] ID of the target region * + * \return ID of the newly create region transfer request + */ +pdcid_t PDCregion_transfer_create(void *buf, pdc_access_t access_type, pdcid_t obj_id, pdcid_t local_reg, + pdcid_t remote_reg); + +/** + * Start a region transfer from local region to remote region for an object on buf. + * + * \param transfer_request_id [IN] ID of the region transfer request + * * \return Non-negative on success/Negative on failure */ perr_t PDCregion_transfer_start(pdcid_t transfer_request_id); +/** + * Start several region transfer requests (asynchronously), can be for different objects. + * + * \param transfer_request_id [IN] ID pointer array of the region transfer requests + * \param size [IN] Number of requests in transfer_request_id + * + * \return Non-negative on success/Negative on failure + */ perr_t PDCregion_transfer_start_all(pdcid_t *transfer_request_id, int size); +#ifdef ENABLE_MPI +/** + * Start a region transfer request (asynchronously), MPI collective version for better performance at scale. + * + * \param transfer_request_id [IN] ID of the region transfer request + * \param comm [IN] MPI communicator + * + * \return Non-negative on success/Negative on failure + */ +perr_t PDCregion_transfer_start_mpi(pdcid_t transfer_request_id, MPI_Comm comm); + +/** + * Start several region transfer requests (asynchronously), MPI collective version for better performance at + * scale. + * + * \param transfer_request_id [IN] ID pointer array of the region transfer requests + * \param size [IN] Number of requests in transfer_request_id + * \param comm [IN] MPI communicator + * + * \return Non-negative on success/Negative on failure + */ +perr_t PDCregion_transfer_start_all_mpi(pdcid_t *transfer_request_id, int size, MPI_Comm comm); +#endif + +/** + * Retrieve the status of a region transfer request + * + * \param transfer_request_id [IN] ID of the region transfer request + * \param completed [OUT] Result + * + * \return Non-negative on success/Negative on failure + */ perr_t PDCregion_transfer_status(pdcid_t transfer_request_id, pdc_transfer_status_t *completed); +/** + * Block and wait for a region transfer request to finish + * + * \param transfer_request_id [IN] ID of the region transfer request + * + * \return Non-negative on success/Negative on failure + */ perr_t PDCregion_transfer_wait(pdcid_t transfer_request_id); +/** + * Block and wait for several region transfer request to finish + * + * \param transfer_request_id [IN] ID of the region transfer request + * \param size [IN] Number of requests in transfer_request_id + * + * \return Non-negative on success/Negative on failure + */ perr_t PDCregion_transfer_wait_all(pdcid_t *transfer_request_id, int size); +/** + * Close a transfer request, free internal resources + * + * \param transfer_request_id [IN] ID of the region transfer request + * + * \return Non-negative on success/Negative on failure + */ perr_t PDCregion_transfer_close(pdcid_t transfer_request_id); + /** * Map an application buffer to an object * diff --git a/src/api/pdc_region/pdc_region_transfer.c b/src/api/pdc_region/pdc_region_transfer.c index 2d2cacf5b..f343e40f9 100644 --- a/src/api/pdc_region/pdc_region_transfer.c +++ b/src/api/pdc_region/pdc_region_transfer.c @@ -44,12 +44,14 @@ #include "pdc_analysis_pkg.h" #include +#define PDC_MERGE_TRANSFER_MIN_COUNT 50 /* #define TANG_DEBUG 1 */ // pdc region transfer class. Contains essential information for performing non-blocking PDC client I/O // perations. typedef struct pdc_transfer_request { pdcid_t obj_id; + pdcid_t local_obj_id; // Data server ID for sending data to, used by object static only. uint32_t data_server_id; // Metadata server ID for sending data to, used by region_dynamic only. @@ -104,6 +106,9 @@ typedef struct pdc_transfer_request { uint64_t *obj_dims; // Pointer to object info, can be useful sometimes. We do not want to go through PDC ID list many times. struct _pdc_obj_info *obj_pointer; + // Tang: for merging transfer requests with transfer start_all/wait_all + pdcid_t merged_request_id; + int is_done; } pdc_transfer_request; // We pack all arguments for a start_all call to the same data server in a single structure, so we do not need @@ -213,6 +218,7 @@ PDCregion_transfer_create(void *buf, pdc_access_t access_type, pdcid_t obj_id, p p = (pdc_transfer_request *)PDC_malloc(sizeof(pdc_transfer_request)); p->obj_pointer = obj2; p->mem_type = obj2->obj_pt->obj_prop_pub->type; + p->local_obj_id = obj_id; p->obj_id = obj2->obj_info_pub->meta_id; p->access_type = access_type; p->buf = buf; @@ -228,6 +234,8 @@ PDCregion_transfer_create(void *buf, pdc_access_t access_type, pdcid_t obj_id, p p->metadata_server_id = obj2->obj_info_pub->metadata_server_id; p->unit = PDC_get_var_type_size(p->mem_type); p->consistency = obj2->obj_pt->obj_prop_pub->consistency; + p->merged_request_id = 0; + p->is_done = 0; unit = p->unit; /* @@ -289,9 +297,9 @@ PDCregion_transfer_close(pdcid_t transfer_request_id) goto done; } transfer_request = (pdc_transfer_request *)(transferinfo->obj_ptr); - if (transfer_request->metadata_id == NULL) { - goto done; - } + /* if (transfer_request->metadata_id == NULL) { */ + /* goto done; */ + /* } */ // Check for consistency /* @@ -301,9 +309,12 @@ PDCregion_transfer_close(pdcid_t transfer_request_id) PDCregion_transfer_wait(transfer_request_id); } */ - free(transfer_request->local_region_offset); - free(transfer_request->metadata_id); - free(transfer_request); + if (transfer_request->local_region_offset) + free(transfer_request->local_region_offset); + if (transfer_request->metadata_id) + free(transfer_request->metadata_id); + if (transfer_request) + free(transfer_request); /* When the reference count reaches zero the resources are freed */ if (PDC_dec_ref(transfer_request_id) < 0) @@ -934,11 +945,13 @@ prepare_start_all_requests(pdcid_t *transfer_request_id, int size, *posix_transfer_request_id_ptr = (pdcid_t *)malloc(sizeof(pdcid_t) * size); for (i = 0; i < size; ++i) { - transferinfo = PDC_find_id(transfer_request_id[i]); + transferinfo = PDC_find_id(transfer_request_id[i]); + if (NULL == transferinfo) + continue; transfer_request = (pdc_transfer_request *)(transferinfo->obj_ptr); if (transfer_request->metadata_id != NULL) { - printf("PDC Client %d: %s attempt to start existing transfer request @ line %d\n", - pdc_client_mpi_rank_g, __func__, __LINE__); + printf("==PDC_CLIENT[%d]: %s cannot start transfer request @ line %d\n", pdc_client_mpi_rank_g, + __func__, __LINE__); return FAIL; } if (transfer_request->consistency == PDC_CONSISTENCY_POSIX) { @@ -1233,7 +1246,7 @@ PDC_Client_pack_all_requests(int n_objs, pdc_transfer_request_start_all_pkg **tr } static perr_t -PDC_Client_start_all_requests(pdc_transfer_request_start_all_pkg **transfer_requests, int size) +PDC_Client_start_all_requests(pdc_transfer_request_start_all_pkg **transfer_requests, int size, MPI_Comm comm) { perr_t ret_value = SUCCEED; int index, i, j; @@ -1273,7 +1286,7 @@ PDC_Client_start_all_requests(pdc_transfer_request_start_all_pkg **transfer_requ * transfer_requests[index]->data_server_id); */ PDC_Client_transfer_request_all(n_objs, transfer_requests[index]->transfer_request->access_type, transfer_requests[index]->data_server_id, bulk_buf, bulk_buf_size, - metadata_id + index); + metadata_id + index, comm); // printf("transfer request towards data server %d\n", transfer_requests[index]->data_server_id); for (j = index; j < i; ++j) { // All requests share the same bulk buffer, reference counter is also shared among all @@ -1310,7 +1323,7 @@ PDC_Client_start_all_requests(pdc_transfer_request_start_all_pkg **transfer_requ * transfer_requests[index]->data_server_id); */ PDC_Client_transfer_request_all(n_objs, transfer_requests[index]->transfer_request->access_type, transfer_requests[index]->data_server_id, bulk_buf, bulk_buf_size, - metadata_id + index); + metadata_id + index, comm); // printf("transfer request towards data server %d\n", transfer_requests[index]->data_server_id); for (j = index; j < size; ++j) { @@ -1338,13 +1351,124 @@ PDC_Client_start_all_requests(pdc_transfer_request_start_all_pkg **transfer_requ FUNC_LEAVE(ret_value); } +// Try to merge smaller requests to a large one, currently only merge write requests on same object and +// contiguous +static perr_t +merge_transfer_request_ids(pdcid_t *transfer_request_id, int size, pdcid_t *merged_request_id, + int *merged_size) +{ + struct _pdc_id_info * transferinfo; + pdcid_t obj_id, new_local_reg, new_remote_reg; + int flag = 0, i; + void * new_buf; + pdc_access_t access_type; + pdc_transfer_request **all_transfer_request; + uint64_t new_buf_size = 0, copy_off = 0; + uint64_t offset_local[3], size_local[3], offset_remote[3], size_remote[3]; + + all_transfer_request = (pdc_transfer_request **)PDC_calloc(size, sizeof(pdc_transfer_request *)); + + for (i = 0; i < size; ++i) { + transferinfo = PDC_find_id(transfer_request_id[i]); + if (NULL == transferinfo) { + printf("==PDC_CLIENT[%d]: %s cannot find transfer request info @ line %d\n", + pdc_client_mpi_rank_g, __func__, __LINE__); + return FAIL; + } + all_transfer_request[i] = (pdc_transfer_request *)(transferinfo->obj_ptr); + if (NULL == all_transfer_request[i]) { + printf("==PDC_CLIENT[%d]: %s transfer request is NULL @ line %d\n", pdc_client_mpi_rank_g, + __func__, __LINE__); + return FAIL; + } + + // Check if every requests are REGION_LOCAL + if (all_transfer_request[i]->region_partition != PDC_REGION_LOCAL) { + flag = 1; + break; + } + + /* // Check if every requests are write operations */ + /* if (all_transfer_request[i]->access_type != PDC_WRITE) { */ + /* flag = 1; */ + /* break; */ + /* } */ + + // Check if every requests are on the same object + if (i == 0) + obj_id = all_transfer_request[i]->local_obj_id; + else { + if (all_transfer_request[i]->local_obj_id != obj_id) { + flag = 1; + break; + } + // Check for contiguous + if (all_transfer_request[i]->local_region_ndim == 1) { + if (all_transfer_request[i]->remote_region_offset[0] != + all_transfer_request[i - 1]->remote_region_offset[0] + + all_transfer_request[i - 1]->remote_region_size[0]) { + flag = 1; + break; + } + } + else { + // TODO: currently only check for 1D + flag = 1; + break; + } + } + + new_buf_size += all_transfer_request[i]->total_data_size; + } + + if (flag == 0) { + // Copy data to merged new_buf + new_buf = (void *)PDC_malloc(new_buf_size); + + if (all_transfer_request[0]->local_region_ndim == 1) { + offset_local[0] = all_transfer_request[0]->local_region_offset[0]; + size_local[0] = new_buf_size; + new_local_reg = PDCregion_create(1, offset_local, size_local); + + offset_remote[0] = all_transfer_request[0]->remote_region_offset[0]; + size_remote[0] = new_buf_size; + new_remote_reg = PDCregion_create(1, offset_remote, size_remote); + + copy_off = offset_local[0]; + for (i = 0; i < size; ++i) { + memcpy(new_buf + copy_off, all_transfer_request[i]->buf, + all_transfer_request[i]->total_data_size); + copy_off += all_transfer_request[i]->total_data_size; + // Mark the original requests a done + all_transfer_request[i]->is_done = 1; + } + } + + /* *merged_request_id = */ + /* PDCregion_transfer_create(new_buf, PDC_WRITE, obj_id, new_local_reg, new_remote_reg); */ + *merged_request_id = PDCregion_transfer_create(new_buf, all_transfer_request[0]->access_type, obj_id, + new_local_reg, new_remote_reg); + *merged_size = 1; + // Add new xfer id to the first request for later wait_all use + all_transfer_request[0]->merged_request_id = *merged_request_id; + } + + free(all_transfer_request); + + return SUCCEED; +} + perr_t -PDCregion_transfer_start_all(pdcid_t *transfer_request_id, int size) +#ifdef ENABLE_MPI +PDCregion_transfer_start_all_common(pdcid_t *transfer_request_id, int size, MPI_Comm comm) +#else +PDCregion_transfer_start_all_common(pdcid_t *transfer_request_id, int size, int comm) +#endif { perr_t ret_value = SUCCEED; - int write_size = 0, read_size = 0, posix_size = 0; + int write_size = 0, read_size = 0, posix_size = 0, merged_size = 0; pdc_transfer_request_start_all_pkg **write_transfer_requests = NULL, **read_transfer_requests = NULL; - pdcid_t * posix_transfer_request_id; + pdcid_t * posix_transfer_request_id, *merged_request_id; FUNC_ENTER(NULL); @@ -1354,6 +1478,16 @@ PDCregion_transfer_start_all(pdcid_t *transfer_request_id, int size) printf("%s PDC_CLIENT[%d] enter %s\n", cur_time, pdc_client_mpi_rank_g, __func__); #endif + // Merge the transfer_request_ids when they are operating on the same obj and have contiguous off, len + if (size > PDC_MERGE_TRANSFER_MIN_COUNT) { + merged_request_id = PDC_malloc(sizeof(pdcid_t)); + merge_transfer_request_ids(transfer_request_id, size, merged_request_id, &merged_size); + if (merged_size == 1) { + size = merged_size; + transfer_request_id = merged_request_id; + } + } + // Split write and read requests. Handle them separately. // printf("%s: checkpoint %d\n", __func__, __LINE__); // [Tang] NOTE: prepare_start_all_requests include several metadata RPC operations @@ -1370,26 +1504,22 @@ PDCregion_transfer_start_all(pdcid_t *transfer_request_id, int size) } */ PDC_Client_transfer_pthread_cnt_add(size); - /* PDC_Client_transfer_pthread_create(); */ #ifdef ENABLE_MPI - // [Tang] TODO: change to user provided comm - /* MPI_Comm world_comm; */ - /* MPI_Comm_dup(MPI_COMM_WORLD, &world_comm); */ - /* MPI_Barrier(world_comm); */ - MPI_Barrier(MPI_COMM_WORLD); + if (comm != 0) + MPI_Barrier(comm); #endif // Start write requests if (write_size > 0) - PDC_Client_start_all_requests(write_transfer_requests, write_size); + PDC_Client_start_all_requests(write_transfer_requests, write_size, comm); // printf("%s: checkpoint %d\n", __func__, __LINE__); // Start read requests if (read_size > 0) - PDC_Client_start_all_requests(read_transfer_requests, read_size); + PDC_Client_start_all_requests(read_transfer_requests, read_size, comm); /* fprintf(stderr, "%s: checkpoint %d\n", __func__, __LINE__); - MPI_Barrier(MPI_COMM_WORLD); + MPI_Barrier(comm); */ // For POSIX consistency, we block here until the data is received by the server @@ -1404,14 +1534,39 @@ PDCregion_transfer_start_all(pdcid_t *transfer_request_id, int size) // fprintf(stderr, "%s: checkpoint %d\n", __func__, __LINE__); #ifdef ENABLE_MPI - MPI_Barrier(MPI_COMM_WORLD); - /* MPI_Barrier(world_comm); */ - /* MPI_Comm_free(&world_comm); */ + if (comm != 0) + MPI_Barrier(comm); #endif FUNC_LEAVE(ret_value); } +perr_t +PDCregion_transfer_start_all(pdcid_t *transfer_request_id, int size) +{ + perr_t ret_value = SUCCEED; + + FUNC_ENTER(NULL); + + ret_value = PDCregion_transfer_start_all_common(transfer_request_id, size, 0); + + FUNC_LEAVE(ret_value); +} + +#ifdef ENABLE_MPI +perr_t +PDCregion_transfer_start_all_mpi(pdcid_t *transfer_request_id, int size, MPI_Comm comm) +{ + perr_t ret_value = SUCCEED; + + FUNC_ENTER(NULL); + + ret_value = PDCregion_transfer_start_all_common(transfer_request_id, size, comm); + + FUNC_LEAVE(ret_value); +} +#endif + /** * Input: Sorted arrays * Output: A single array that is sorted, and the union of sorted arrays. @@ -1469,8 +1624,14 @@ static int sorted_array_unions(const int **array, const int *input_size, int n_a return 0; } #endif + perr_t -PDCregion_transfer_start(pdcid_t transfer_request_id) +PDCregion_transfer_start_common(pdcid_t transfer_request_id, +#ifdef ENABLE_MPI + MPI_Comm comm) +#else + int comm) +#endif { perr_t ret_value = SUCCEED; struct _pdc_id_info * transferinfo; @@ -1481,12 +1642,13 @@ PDCregion_transfer_start(pdcid_t transfer_request_id) FUNC_ENTER(NULL); transferinfo = PDC_find_id(transfer_request_id); + if (NULL == transferinfo) + goto done; transfer_request = (pdc_transfer_request *)(transferinfo->obj_ptr); if (transfer_request->metadata_id != NULL) { - printf("PDC Client PDCregion_transfer_start attempt to start existing transfer request @ line %d\n", - __LINE__); + printf("PDC_Client %s attempt to start existing transfer request @ line %d\n", __func__, __LINE__); ret_value = FAIL; goto done; } @@ -1495,12 +1657,15 @@ PDCregion_transfer_start(pdcid_t transfer_request_id) // Aggregated method will take care of this type of operation. if (transfer_request->region_partition == PDC_REGION_DYNAMIC || transfer_request->region_partition == PDC_REGION_LOCAL) { +#ifdef ENABLE_MPI + PDCregion_transfer_start_all_mpi(&transfer_request_id, 1, comm); +#else PDCregion_transfer_start_all(&transfer_request_id, 1); +#endif goto done; } PDC_Client_transfer_pthread_cnt_add(1); - /* PDC_Client_transfer_pthread_create(); */ attach_local_transfer_request(transfer_request->obj_pointer, transfer_request_id); @@ -1582,6 +1747,32 @@ PDCregion_transfer_start(pdcid_t transfer_request_id) FUNC_LEAVE(ret_value); } +perr_t +PDCregion_transfer_start(pdcid_t transfer_request_id) +{ + perr_t ret_value = SUCCEED; + + FUNC_ENTER(NULL); + + ret_value = PDCregion_transfer_start_common(transfer_request_id, 0); + + FUNC_LEAVE(ret_value); +} + +#ifdef ENABLE_MPI +perr_t +PDCregion_transfer_start_mpi(pdcid_t transfer_request_id, MPI_Comm comm) +{ + perr_t ret_value = SUCCEED; + + FUNC_ENTER(NULL); + + ret_value = PDCregion_transfer_start_common(transfer_request_id, comm); + + FUNC_LEAVE(ret_value); +} +#endif + static perr_t release_region_buffer(char *buf, uint64_t *obj_dims, int local_ndim, uint64_t *local_offset, uint64_t *local_size, size_t unit, pdc_access_t access_type, int bulk_buf_size, @@ -1652,7 +1843,12 @@ PDCregion_transfer_status(pdcid_t transfer_request_id, pdc_transfer_status_t *co FUNC_ENTER(NULL); - transferinfo = PDC_find_id(transfer_request_id); + transferinfo = PDC_find_id(transfer_request_id); + if (NULL == transferinfo) { + *completed = PDC_TRANSFER_STATUS_COMPLETE; + goto done; + } + transfer_request = (pdc_transfer_request *)(transferinfo->obj_ptr); if (transfer_request->metadata_id != NULL) { unit = transfer_request->unit; @@ -1733,6 +1929,7 @@ PDCregion_transfer_status(pdcid_t transfer_request_id, pdc_transfer_status_t *co } free(transfer_request->metadata_id); transfer_request->metadata_id = NULL; + transfer_request->is_done = 1; remove_local_transfer_request(transfer_request->obj_pointer, transfer_request_id); } else { @@ -1747,34 +1944,59 @@ perr_t PDCregion_transfer_wait_all(pdcid_t *transfer_request_id, int size) { perr_t ret_value = SUCCEED; - int index, i, j; + int index, i, j, merged_xfer = 0, ori_size = size, is_first = 1; size_t unit; int total_requests, n_objs; - uint64_t * metadata_ids; + uint64_t * metadata_ids, merge_off = 0, cur_off = 0; pdc_transfer_request_wait_all_pkg **transfer_requests, *transfer_request_head, *transfer_request_end, *temp; - struct _pdc_id_info * transferinfo; - pdc_transfer_request *transfer_request; + struct _pdc_id_info **transferinfo; + pdc_transfer_request *transfer_request, *merged_request; + pdcid_t * my_transfer_request_id = transfer_request_id; + + double t0, t1; FUNC_ENTER(NULL); if (!size) { goto done; } + transferinfo = (struct _pdc_id_info **)PDC_malloc(size * sizeof(struct _pdc_id_info *)); + +#ifdef ENABLE_MPI + t0 = MPI_Wtime(); +#endif + + // Check if we merged the previous request + if (size > PDC_MERGE_TRANSFER_MIN_COUNT) { + transferinfo[0] = PDC_find_id(transfer_request_id[0]); + transfer_request = (pdc_transfer_request *)(transferinfo[0]->obj_ptr); + if (transfer_request->merged_request_id != 0) { + my_transfer_request_id = &transfer_request->merged_request_id; + size = 1; + merged_xfer = 1; + } + } + // printf("entered %s @ line %d\n", __func__, __LINE__); total_requests = 0; transfer_request_head = NULL; for (i = 0; i < size; ++i) { - transferinfo = PDC_find_id(transfer_request_id[i]); - transfer_request = (pdc_transfer_request *)(transferinfo->obj_ptr); + transferinfo[i] = PDC_find_id(my_transfer_request_id[i]); + if (NULL == transferinfo[i]) + continue; + transfer_request = (pdc_transfer_request *)(transferinfo[i]->obj_ptr); + if (1 == transfer_request->is_done) + continue; if (!transfer_request->metadata_id) { fprintf(stderr, - "%s [rank %d] @ line %d: Attempt to wait for a transfer request " - "that has not been started.\n", - __func__, pdc_client_mpi_rank_g, __LINE__); - ret_value = FAIL; - goto done; + "PDCregion_transfer_wait_all [rank %d] @ line %d: Attempt to wait for a transfer request " + "that has not been started. %d/%d\n", + pdc_client_mpi_rank_g, __LINE__, i, size); + /* ret_value = FAIL; */ + continue; + /* goto done; */ } total_requests += transfer_request->n_obj_servers; for (j = 0; j < transfer_request->n_obj_servers; ++j) { @@ -1801,6 +2023,12 @@ PDCregion_transfer_wait_all(pdcid_t *transfer_request_id, int size) transfer_request_end->index = j; } } + + /* #ifdef ENABLE_MPI */ + /* t1 = MPI_Wtime(); */ + /* fprintf(stderr, "Rank %d, Part 1 took %.6f\n", pdc_client_mpi_rank_g, t1 - t0); */ + /* #endif */ + transfer_requests = (pdc_transfer_request_wait_all_pkg **)malloc( sizeof(pdc_transfer_request_wait_all_pkg *) * total_requests); temp = transfer_request_head; @@ -1813,10 +2041,15 @@ PDCregion_transfer_wait_all(pdcid_t *transfer_request_id, int size) qsort(transfer_requests, total_requests, sizeof(pdc_transfer_request_wait_all_pkg *), sort_by_data_server_wait_all); - for (i = 0; i < total_requests; ++i) { - // printf("checkpoint %d, data_server_id = %u, metadata_id = %lu\n", __LINE__, - // transfer_requests[i]->data_server_id, transfer_requests[i]->metadata_id); - } + /* for (i = 0; i < total_requests; ++i) { */ + // printf("checkpoint %d, data_server_id = %u, metadata_id = %lu\n", __LINE__, + // transfer_requests[i]->data_server_id, transfer_requests[i]->metadata_id); + /* } */ + + /* #ifdef ENABLE_MPI */ + /* t0 = MPI_Wtime(); */ + /* fprintf(stderr, "Rank %d, Part 2 took %.6f\n", pdc_client_mpi_rank_g, t0 - t1); */ + /* #endif */ metadata_ids = (uint64_t *)malloc(sizeof(uint64_t) * total_requests); index = 0; @@ -1861,6 +2094,12 @@ PDCregion_transfer_wait_all(pdcid_t *transfer_request_id, int size) index = i; } } + + /* #ifdef ENABLE_MPI */ + /* t1 = MPI_Wtime(); */ + /* fprintf(stderr, "Rank %d, Part 3 took %.6f\n", pdc_client_mpi_rank_g, t1 - t0); */ + /* #endif */ + if (total_requests) { // Freed at the wait operation (inside PDC_client_connect call) n_objs = total_requests - index; @@ -1906,10 +2145,39 @@ PDCregion_transfer_wait_all(pdcid_t *transfer_request_id, int size) } } + /* #ifdef ENABLE_MPI */ + /* t0 = MPI_Wtime(); */ + /* fprintf(stderr, "Rank %d, Part 4 took %.6f\n", pdc_client_mpi_rank_g, t0 - t1); */ + /* #endif */ + + // Deal with merged read requests, need to copy a large buffer to each of the original request buf + // TODO: Currently only supports 1D merging, so only consider 1D for now + if (merged_xfer == 1) { + merged_request = (pdc_transfer_request *)(PDC_find_id(my_transfer_request_id[0])->obj_ptr); + for (i = 0; i < ori_size; ++i) { + transfer_request = (pdc_transfer_request *)(PDC_find_id(transfer_request_id[i])->obj_ptr); + if (transfer_request->access_type == PDC_READ) { + if (is_first == 1) + merge_off = transfer_request->remote_region_offset[0]; + cur_off = transfer_request->remote_region_offset[0] - merge_off; + if (!transfer_request->new_buf) + transfer_request->new_buf = PDC_malloc(transfer_request->total_data_size); + + memcpy(transfer_request->new_buf, merged_request->read_bulk_buf[0] + merge_off, + transfer_request->total_data_size); + + is_first = 0; + } + } + } + for (i = 0; i < size; ++i) { - transferinfo = PDC_find_id(transfer_request_id[i]); - transfer_request = (pdc_transfer_request *)(transferinfo->obj_ptr); - unit = transfer_request->unit; + if (NULL == transferinfo[i]) + continue; + transfer_request = (pdc_transfer_request *)(transferinfo[i]->obj_ptr); + if (1 == transfer_request->is_done) + continue; + unit = transfer_request->unit; if (transfer_request->region_partition == PDC_OBJ_STATIC && transfer_request->access_type == PDC_READ) { @@ -1936,14 +2204,21 @@ PDCregion_transfer_wait_all(pdcid_t *transfer_request_id, int size) } free(transfer_request->metadata_id); transfer_request->metadata_id = NULL; + transfer_request->is_done = 1; remove_local_transfer_request(transfer_request->obj_pointer, transfer_request_id[i]); } + /* #ifdef ENABLE_MPI */ + /* t1 = MPI_Wtime(); */ + /* fprintf(stderr, "Rank %d, Part 5 took %.6f\n", pdc_client_mpi_rank_g, t1 - t0); */ + /* #endif */ + for (i = 0; i < total_requests; ++i) { free(transfer_requests[i]); } free(transfer_requests); free(metadata_ids); + free(transferinfo); /* for (i = 0; i < size; ++i) { PDCregion_transfer_wait(transfer_request_id[i]); @@ -1965,7 +2240,10 @@ PDCregion_transfer_wait(pdcid_t transfer_request_id) FUNC_ENTER(NULL); - transferinfo = PDC_find_id(transfer_request_id); + transferinfo = PDC_find_id(transfer_request_id); + if (NULL == transferinfo) + goto done; + transfer_request = (pdc_transfer_request *)(transferinfo->obj_ptr); if (transfer_request->metadata_id != NULL) { // For region dynamic case, it is implemented in the aggregated version for portability. @@ -2045,6 +2323,7 @@ PDCregion_transfer_wait(pdcid_t transfer_request_id) } free(transfer_request->metadata_id); transfer_request->metadata_id = NULL; + transfer_request->is_done = 1; remove_local_transfer_request(transfer_request->obj_pointer, transfer_request_id); } else { diff --git a/src/server/pdc_server_metadata.c b/src/server/pdc_server_metadata.c index 02380fcc8..11e80e2b4 100644 --- a/src/server/pdc_server_metadata.c +++ b/src/server/pdc_server_metadata.c @@ -1200,11 +1200,8 @@ PDC_insert_metadata_to_hash_table(gen_obj_id_in_t *in, gen_obj_id_out_t *out) perr_t ret_value = SUCCEED; pdc_metadata_t *metadata; uint32_t * hash_key, i; -#ifdef ENABLE_MULTITHREAD - // Obtain lock for hash table - int unlocked = 0; - hg_thread_mutex_lock(&pdc_metadata_hash_table_mutex_g); -#endif + int unlocked = 0; + // DEBUG int debug_flag = 0; @@ -1265,15 +1262,13 @@ PDC_insert_metadata_to_hash_table(gen_obj_id_in_t *in, gen_obj_id_out_t *out) pdc_hash_table_entry_head *lookup_value; pdc_metadata_t * found_identical; + if (debug_flag == 1) + printf("checking hash table with key=%d\n", *hash_key); + #ifdef ENABLE_MULTITHREAD // Obtain lock for hash table - unlocked = 0; hg_thread_mutex_lock(&pdc_metadata_hash_table_mutex_g); #endif - - if (debug_flag == 1) - printf("checking hash table with key=%d\n", *hash_key); - if (metadata_hash_table_g != NULL) { // lookup lookup_value = hash_table_lookup(metadata_hash_table_g, hash_key); diff --git a/src/server/pdc_server_region/pdc_server_region_transfer_metadata_query.c b/src/server/pdc_server_region/pdc_server_region_transfer_metadata_query.c index 4e41132f8..357bd8424 100644 --- a/src/server/pdc_server_region/pdc_server_region_transfer_metadata_query.c +++ b/src/server/pdc_server_region/pdc_server_region_transfer_metadata_query.c @@ -371,7 +371,7 @@ transfer_request_metadata_query_lookup_query_buf(uint64_t query_id, char **buf_p pdc_metadata_query_buf *metadata_query, *previous; perr_t ret_value = SUCCEED; FUNC_ENTER(NULL); - pthread_mutex_lock(&metadata_query_mutex); + /* pthread_mutex_lock(&metadata_query_mutex); */ previous = NULL; int i = 0; @@ -398,7 +398,7 @@ transfer_request_metadata_query_lookup_query_buf(uint64_t query_id, char **buf_p } *buf_ptr = NULL; done: - pthread_mutex_unlock(&metadata_query_mutex); + /* pthread_mutex_unlock(&metadata_query_mutex); */ fflush(stdout); FUNC_LEAVE(ret_value); } @@ -421,7 +421,7 @@ transfer_request_metadata_query_parse(int32_t n_objs, char *buf, uint8_t is_writ pdc_obj_region_metadata *region_metadata; FUNC_ENTER(NULL); - pthread_mutex_lock(&metadata_query_mutex); + /* pthread_mutex_lock(&metadata_query_mutex); */ region_metadata = (pdc_obj_region_metadata *)malloc(sizeof(pdc_obj_region_metadata) * n_objs); @@ -451,7 +451,7 @@ transfer_request_metadata_query_parse(int32_t n_objs, char *buf, uint8_t is_writ free(region_metadata); // printf("transfer_request_metadata_query_parse: checkpoint %d\n", __LINE__); - pthread_mutex_unlock(&metadata_query_mutex); + /* pthread_mutex_unlock(&metadata_query_mutex); */ fflush(stdout); FUNC_LEAVE(query_id); } diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 30667ddcd..1f8eaddf4 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -133,6 +133,7 @@ set(PROGRAMS #query_vpic_exyz_nopreload #query_vpic_exyz_preload query_data + producer_waitall ) # TODO: Check if import_vpic.c is needed. If yes, we have to add the following : diff --git a/src/tests/producer_waitall.c b/src/tests/producer_waitall.c new file mode 100644 index 000000000..379b8a255 --- /dev/null +++ b/src/tests/producer_waitall.c @@ -0,0 +1,209 @@ +#include +#include +#include +#include + +#include "mpi.h" +#include "pdc.h" + +/** + * write -> read -> wait_all() + * + */ + +int mpi_rank, mpi_size; +MPI_Comm mpi_comm; + +void +write_read_wait_all(pdcid_t obj_id, int iterations) +{ + pdcid_t region_local, region_remote; + pdcid_t transfer_request; + perr_t ret; + + int ndim = 1; + uint64_t offset_local = 0; + uint64_t offset_remote = 0; + uint64_t chunk_size = 2880; + + char *data_out = (char *)malloc(chunk_size * sizeof(char)); + memset(data_out, 'a', chunk_size * sizeof(char)); + + double stime = MPI_Wtime(); + + pdcid_t *tids = (pdcid_t *)malloc(sizeof(pdcid_t) * (iterations)); + for (int i = 0; i < iterations; i++) { + region_local = PDCregion_create(ndim, &offset_local, &chunk_size); + region_remote = PDCregion_create(ndim, &offset_remote, &chunk_size); + offset_remote += chunk_size; + tids[i] = PDCregion_transfer_create(data_out, PDC_WRITE, obj_id, region_local, region_remote); + if (tids[i] == 0) + printf("transfer request creation failed\n"); + /* ret = PDCregion_transfer_start(tids[i]); */ + /* if (ret != SUCCEED) */ + /* printf("Failed to start transfer\n"); */ + } + + /* printf("rank %d call wait_all on tids.\n", mpi_rank); */ + fprintf(stderr, "Rank %4d: create took %.6f\n", mpi_rank, MPI_Wtime() - stime); + + MPI_Barrier(MPI_COMM_WORLD); + + stime = MPI_Wtime(); + + ret = PDCregion_transfer_start_all(tids, iterations); + if (ret != SUCCEED) + printf("Failed to start transfer\n"); + + fprintf(stderr, "Rank %4d: start all tids took %.6f\n", mpi_rank, MPI_Wtime() - stime); + + stime = MPI_Wtime(); + + MPI_Barrier(MPI_COMM_WORLD); + + ret = PDCregion_transfer_wait_all(tids, iterations); + if (ret != SUCCEED) + printf("Failed to wait all transfer\n"); + + /* printf("rank %d read before wait_all()\n", mpi_rank); */ + fprintf(stderr, "Rank %4d: wait all tids took %.6f\n", mpi_rank, MPI_Wtime() - stime); + + MPI_Barrier(MPI_COMM_WORLD); + + char *data_in = (char *)malloc(chunk_size * sizeof(char)); + offset_local = 0; + offset_remote = 0; + region_local = PDCregion_create(ndim, &offset_local, &chunk_size); + region_remote = PDCregion_create(ndim, &offset_remote, &chunk_size); + pdcid_t read_tid = PDCregion_transfer_create(data_in, PDC_READ, obj_id, region_local, region_remote); + ret = PDCregion_transfer_start(read_tid); + ret = PDCregion_transfer_wait(read_tid); + ret = PDCregion_transfer_close(read_tid); + /* printf("rank %d read success!\n", mpi_rank); */ + fprintf(stderr, "Rank %4d: create wait read took %.6f\n", mpi_rank, MPI_Wtime() - stime); + + // Write more + stime = MPI_Wtime(); + int N = 10; + pdcid_t *tids2 = (pdcid_t *)malloc(sizeof(pdcid_t) * N); + offset_local = 0; + offset_remote = iterations * chunk_size; // start from the end of the last write + for (int i = 0; i < N; i++) { + region_local = PDCregion_create(ndim, &offset_local, &chunk_size); + region_remote = PDCregion_create(ndim, &offset_remote, &chunk_size); + offset_remote += chunk_size; + tids2[i] = PDCregion_transfer_create(data_out, PDC_WRITE, obj_id, region_local, region_remote); + if (tids2[i] == 0) + printf("transfer request creation failed\n"); + /* ret = PDCregion_transfer_start(tids2[i]); */ + /* if (ret != SUCCEED) */ + /* printf("Failed to start transfer\n"); */ + } + fprintf(stderr, "Rank %4d: create tids2 took %.6f\n", mpi_rank, MPI_Wtime() - stime); + + MPI_Barrier(MPI_COMM_WORLD); + stime = MPI_Wtime(); + + ret = PDCregion_transfer_start_all(tids2, N); + if (ret != SUCCEED) + printf("Failed to start transfer\n"); + + fprintf(stderr, "Rank %4d: start tids2 took %.6f\n", mpi_rank, MPI_Wtime() - stime); + /* ret = PDCregion_transfer_wait_all(tids, (iterations)); */ + /* if (ret != SUCCEED) */ + /* printf("Failed to transfer wait\n"); */ + + MPI_Barrier(MPI_COMM_WORLD); + stime = MPI_Wtime(); + /* printf("rank %d call wait_all on tids2.\n", mpi_rank); */ + ret = PDCregion_transfer_wait_all(tids2, (N)); + if (ret != SUCCEED) + printf("Failed to transfer wait\n"); + + fprintf(stderr, "Rank %4d: wait all tids2 took %.6f\n", mpi_rank, MPI_Wtime() - stime); + + MPI_Barrier(MPI_COMM_WORLD); + stime = MPI_Wtime(); + + for (int i = 0; i < iterations; i++) { + ret = PDCregion_transfer_close(tids[i]); + if (ret != SUCCEED) + printf("region transfer close failed\n"); + } + for (int i = 0; i < N; i++) { + ret = PDCregion_transfer_close(tids2[i]); + if (ret != SUCCEED) + printf("region transfer close failed\n"); + } + + fprintf(stderr, "Rank %4d: close all took %.6f\n", mpi_rank, MPI_Wtime() - stime); + + free(data_in); + free(data_out); + free(tids); + free(tids2); +} + +int +main(int argc, char **argv) +{ + + pdcid_t pdc_id, cont_prop, cont_id; + pdcid_t obj_prop, obj_id; + + uint64_t dims[1] = {PDC_SIZE_UNLIMITED}; + + MPI_Init(&argc, &argv); + MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); + MPI_Comm_size(MPI_COMM_WORLD, &mpi_size); + MPI_Comm_dup(MPI_COMM_WORLD, &mpi_comm); + + // create a pdc + pdc_id = PDCinit("pdc"); + + // create a container property + cont_prop = PDCprop_create(PDC_CONT_CREATE, pdc_id); + if (cont_prop <= 0) { + printf("Fail to create container property @ line %d!\n", __LINE__); + } + // create a container + cont_id = PDCcont_create_col("c1", cont_prop); + if (cont_id <= 0) { + printf("Fail to create container @ line %d!\n", __LINE__); + } + + // create an object property + obj_prop = PDCprop_create(PDC_OBJ_CREATE, pdc_id); + PDCprop_set_obj_dims(obj_prop, 1, dims); + PDCprop_set_obj_type(obj_prop, PDC_CHAR); + PDCprop_set_obj_time_step(obj_prop, 0); + PDCprop_set_obj_user_id(obj_prop, getuid()); + PDCprop_set_obj_app_name(obj_prop, "producer"); + PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_LOCAL); + + char obj_name[100] = {0}; + sprintf(obj_name, "obj-var-%d", mpi_rank); + PDCprop_set_obj_tags(obj_prop, obj_name); + obj_id = PDCobj_create(cont_id, obj_name, obj_prop); + + MPI_Barrier(MPI_COMM_WORLD); + double stime = MPI_Wtime(); + write_read_wait_all(obj_id, 1000); + + MPI_Barrier(MPI_COMM_WORLD); + fprintf(stderr, "Rank %4d: Write read wait all took %.6f\n", mpi_rank, MPI_Wtime() - stime); + + if (PDCobj_close(obj_id) < 0) { + printf("fail to close obj_id\n"); + } + + if (PDCprop_close(cont_prop) < 0) { + printf("Fail to close property @ line %d\n", __LINE__); + } + + if (PDCclose(pdc_id) < 0) { + printf("fail to close PDC\n"); + } + + MPI_Finalize(); +} diff --git a/src/tests/region_transfer_all.c b/src/tests/region_transfer_all.c index 2ee7efaef..4f7390c13 100644 --- a/src/tests/region_transfer_all.c +++ b/src/tests/region_transfer_all.c @@ -329,8 +329,6 @@ main(int argc, char **argv) } } - MPI_Barrier(MPI_COMM_WORLD); - // close object for (i = 0; i < OBJ_NUM; ++i) { if (PDCobj_close(obj[i]) < 0) { diff --git a/src/tests/run_checkpoint_restart_test.sh b/src/tests/run_checkpoint_restart_test.sh index 08b868e19..cb44f5bb4 100755 --- a/src/tests/run_checkpoint_restart_test.sh +++ b/src/tests/run_checkpoint_restart_test.sh @@ -5,7 +5,7 @@ # Cori CI needs srun even for serial tests run_cmd="" -if [[ "$NERSC_HOST" == "perlmutter" ]]; then +if [[ "$SUPERCOMPUTER" == "perlmutter" ]]; then run_cmd="srun -n 1 --mem=25600 --cpu_bind=cores --overlap" fi diff --git a/src/tests/run_multiple_test.sh b/src/tests/run_multiple_test.sh index e6044745b..9e0914ce8 100755 --- a/src/tests/run_multiple_test.sh +++ b/src/tests/run_multiple_test.sh @@ -5,7 +5,7 @@ # Cori CI needs srun even for serial tests run_cmd="" -if [[ "$NERSC_HOST" == "perlmutter" ]]; then +if [[ "$SUPERCOMPUTER" == "perlmutter" ]]; then run_cmd="srun -n 1 --mem=25600 --cpu_bind=cores --overlap" fi diff --git a/src/tests/run_test.sh b/src/tests/run_test.sh index 3dcd165ac..67a1e38bc 100755 --- a/src/tests/run_test.sh +++ b/src/tests/run_test.sh @@ -5,7 +5,7 @@ # Cori CI needs srun even for serial tests run_cmd="" -if [[ "$NERSC_HOST" == "perlmutter" ]]; then +if [[ "$SUPERCOMPUTER" == "perlmutter" ]]; then run_cmd="srun -n 1 --mem=25600 --cpu_bind=cores --overlap" fi diff --git a/src/tests/vpicio_mts.c b/src/tests/vpicio_mts.c index 102891ff0..acbed21fa 100644 --- a/src/tests/vpicio_mts.c +++ b/src/tests/vpicio_mts.c @@ -269,7 +269,11 @@ main(int argc, char **argv) printf("[%s] Transfer create time: %.5e\n", cur_time, t0 - t1); #endif +#ifdef ENABLE_MPI + if (PDCregion_transfer_start_all_mpi(transfer_requests, 8, MPI_COMM_WORLD) != SUCCEED) { +#else if (PDCregion_transfer_start_all(transfer_requests, 8) != SUCCEED) { +#endif printf("Failed to start transfer requests\n"); return FAIL; }