@@ -126,7 +126,8 @@ def test_dask_distributed_write_netcdf_with_dimensionless_variables(
126126
127127@requires_cftime
128128@requires_netCDF4
129- def test_open_mfdataset_can_open_files_with_cftime_index (tmp_path ):
129+ @pytest .mark .parametrize ("parallel" , (True , False ))
130+ def test_open_mfdataset_can_open_files_with_cftime_index (parallel , tmp_path ):
130131 T = xr .cftime_range ("20010101" , "20010501" , calendar = "360_day" )
131132 Lon = np .arange (100 )
132133 data = np .random .random ((T .size , Lon .size ))
@@ -135,9 +136,55 @@ def test_open_mfdataset_can_open_files_with_cftime_index(tmp_path):
135136 da .to_netcdf (file_path )
136137 with cluster () as (s , [a , b ]):
137138 with Client (s ["address" ]):
138- for parallel in (False , True ):
139- with xr .open_mfdataset (file_path , parallel = parallel ) as tf :
140- assert_identical (tf ["test" ], da )
139+ with xr .open_mfdataset (file_path , parallel = parallel ) as tf :
140+ assert_identical (tf ["test" ], da )
141+
142+
143+ @requires_cftime
144+ @requires_netCDF4
145+ @pytest .mark .parametrize ("parallel" , (True , False ))
146+ def test_open_mfdataset_multiple_files_parallel_distributed (parallel , tmp_path ):
147+ lon = np .arange (100 )
148+ time = xr .cftime_range ("20010101" , periods = 100 , calendar = "360_day" )
149+ data = np .random .random ((time .size , lon .size ))
150+ da = xr .DataArray (data , coords = {"time" : time , "lon" : lon }, name = "test" )
151+
152+ fnames = []
153+ for i in range (0 , 100 , 10 ):
154+ fname = tmp_path / f"test_{ i } .nc"
155+ da .isel (time = slice (i , i + 10 )).to_netcdf (fname )
156+ fnames .append (fname )
157+
158+ with cluster () as (s , [a , b ]):
159+ with Client (s ["address" ]):
160+ with xr .open_mfdataset (
161+ fnames , parallel = parallel , concat_dim = "time" , combine = "nested"
162+ ) as tf :
163+ assert_identical (tf ["test" ], da )
164+
165+
166+ # TODO: move this to test_backends.py
167+ @requires_cftime
168+ @requires_netCDF4
169+ @pytest .mark .parametrize ("parallel" , (True , False ))
170+ def test_open_mfdataset_multiple_files_parallel (parallel , tmp_path ):
171+ lon = np .arange (100 )
172+ time = xr .cftime_range ("20010101" , periods = 100 , calendar = "360_day" )
173+ data = np .random .random ((time .size , lon .size ))
174+ da = xr .DataArray (data , coords = {"time" : time , "lon" : lon }, name = "test" )
175+
176+ fnames = []
177+ for i in range (0 , 100 , 10 ):
178+ fname = tmp_path / f"test_{ i } .nc"
179+ da .isel (time = slice (i , i + 10 )).to_netcdf (fname )
180+ fnames .append (fname )
181+
182+ for get in [dask .threaded .get , dask .multiprocessing .get , dask .local .get_sync , None ]:
183+ with dask .config .set (scheduler = get ):
184+ with xr .open_mfdataset (
185+ fnames , parallel = parallel , concat_dim = "time" , combine = "nested"
186+ ) as tf :
187+ assert_identical (tf ["test" ], da )
141188
142189
143190@pytest .mark .parametrize ("engine,nc_format" , ENGINES_AND_FORMATS )
0 commit comments