Skip to content

Commit f69e38f

Browse files
Merge pull request #33 from coecms/beta
Combined branch for recent pull requests
2 parents 937d1d5 + f6eca64 commit f69e38f

File tree

6 files changed

+340
-55
lines changed

6 files changed

+340
-55
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ before_install:
55
- bash Miniconda3-latest-Linux-x86_64.sh -b -p ~/conda
66
- source ~/conda/bin/activate
77
- conda create --yes -c conda-forge -n mppnccombine-fast-build gcc openmpi 'hdf5>=1.10.2' libnetcdf nomkl
8-
- conda create --yes -c conda-forge -n mppnccombine-fast-test openmpi pytest xarray nomkl
8+
- conda create --yes -c conda-forge -n mppnccombine-fast-test openmpi pytest xarray nomkl netcdf4
99

1010
script:
1111
- make CONDA_BUILD=1 mppnccombine-fast

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ endif
2525

2626
all: mppnccombine-fast
2727

28-
test: mppnccombine-fast
28+
test check: mppnccombine-fast
2929
${TEST_ENV} pytest -v --capture=no test.py
3030

3131
mppnccombine-fast: async.o error.o read_chunked.o

async.c

+48-6
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#define TAG_VAR_INFO 22
4848

4949
#define MAX_VARIABLES 100
50+
#define MAX_DIMS 10
5051

5152
typedef struct {
5253
hid_t file_id;
@@ -55,6 +56,7 @@ typedef struct {
5556
hid_t var_id;
5657
char varname[NC_MAX_NAME+1];
5758
size_t refcount;
59+
hsize_t chunk_shape[MAX_DIMS]; // Set by get_var_info
5860
} vars[MAX_VARIABLES];
5961

6062
int total_vars;
@@ -78,6 +80,34 @@ varid_t open_variable_async(
7880
return out;
7981
}
8082

83+
// Get a dataspace, making sure it can fit the field
84+
hid_t get_resized_space(async_state_t * state, int idx, int ndims, hsize_t offset[], hsize_t shape[]) {
85+
hid_t data_space = H5Dget_space(state->vars[idx].var_id);
86+
H5ERR(data_space);
87+
88+
// Check if we need to extend the dataset
89+
hsize_t file_size[ndims];
90+
hsize_t max_size[ndims];
91+
H5ERR(H5Sget_simple_extent_dims(data_space, file_size, max_size));
92+
bool needs_resize = false;
93+
for (int d=0;d<ndims;++d) {
94+
if (offset[d] + shape[d] > file_size[d]) {
95+
needs_resize = true;
96+
file_size[d] = offset[d] + shape[d];
97+
if ((max_size[d] != H5S_UNLIMITED) && (file_size[d] > max_size[d])) {
98+
file_size[d] = max_size[d];
99+
}
100+
}
101+
}
102+
if (needs_resize) {
103+
H5ERR(H5Dset_extent(state->vars[idx].var_id, file_size));
104+
// Re-open
105+
data_space = H5Dget_space(state->vars[idx].var_id);
106+
H5ERR(data_space);
107+
}
108+
return data_space;
109+
}
110+
81111
void close_variable_async(
82112
varid_t varid,
83113
int async_writer_rank
@@ -172,6 +202,7 @@ void receive_variable_info_async(
172202
H5ERR(plist);
173203

174204
H5ERR(H5Pget_chunk(plist, ndims, varinfo + 0));
205+
H5ERR(H5Pget_chunk(plist, ndims, state->vars[idx].chunk_shape));
175206

176207
int nfilters = H5Pget_nfilters(plist);
177208
H5ERR(nfilters);
@@ -317,8 +348,8 @@ static size_t receive_write_uncompressed_async(
317348
hid_t mem_space = H5Screate_simple(ndims, shape, NULL);
318349
H5ERR(mem_space);
319350

320-
hid_t data_space = H5Dget_space(state->vars[idx].var_id);
321-
H5ERR(data_space);
351+
hid_t data_space = get_resized_space(state, idx, ndims, offset, shape);
352+
322353
H5ERR(H5Sselect_hyperslab(data_space, H5S_SELECT_SET,
323354
offset, NULL, shape, NULL));
324355

@@ -419,6 +450,9 @@ static size_t receive_write_chunk_async(
419450
hsize_t offset_[ndims];
420451
for (size_t d=0; d<ndims; ++d) {offset_[d] = offset[d];}
421452

453+
hid_t data_space = get_resized_space(state, idx, ndims, offset_, state->vars[idx].chunk_shape);
454+
H5ERR(H5Sclose(data_space));
455+
422456
int err = (H5DOwrite_chunk(state->vars[idx].var_id, H5P_DEFAULT, filter_mask,
423457
offset_, buffer_size, buffer));
424458
if (err < 0) {
@@ -509,9 +543,8 @@ static void receive_close_variable_async(
509543
void close_async(
510544
int async_writer_rank
511545
) {
512-
int buffer = 0;
513546
log_message(LOG_DEBUG, "SEND close file");
514-
MPI_Send(&buffer, 1, MPI_INT, async_writer_rank, TAG_CLOSE, MPI_COMM_WORLD);
547+
MPI_Send(NULL, 0, MPI_INT, async_writer_rank, TAG_CLOSE, MPI_COMM_WORLD);
515548
}
516549

517550
// Async runner to accept writes
@@ -545,19 +578,28 @@ size_t run_async_writer(
545578
receive_variable_info_async(&state, status);
546579
break;
547580
case (TAG_WRITE_CHUNK):
548-
receive_write_chunk_async(&state, status);
581+
total_size += receive_write_chunk_async(&state, status);
549582
break;
550583
case (TAG_WRITE_FILTER):
551-
receive_write_uncompressed_async(&state, status);
584+
total_size += receive_write_uncompressed_async(&state, status);
552585
break;
553586
case (TAG_CLOSE_VARIABLE):
554587
receive_close_variable_async(&state, status);
555588
break;
556589
case (TAG_CLOSE):
557590
--open_senders;
591+
MPI_Recv(NULL, 0, MPI_INT, status.MPI_SOURCE, TAG_CLOSE, MPI_COMM_WORLD,
592+
MPI_STATUS_IGNORE);
558593
break;
559594
}
560595
}
596+
597+
for (int v=0; v<state.total_vars; ++v) {
598+
H5ERR(H5Dclose(state.vars[v].var_id));
599+
}
600+
561601
H5ERR(H5Fclose(state.file_id));
562602
log_message(LOG_DEBUG, "DONE run_async_writer");
603+
604+
return total_size;
563605
}

0 commit comments

Comments
 (0)