From fd28ff11ac4da8cf86b1b9116aa889aa483791d5 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Wed, 23 Aug 2017 10:19:20 -0700 Subject: [PATCH 1/7] dask='parallelized' for apply_ufunc' --- xarray/core/computation.py | 122 +++++++++++++++++++++++++------ xarray/tests/__init__.py | 4 +- xarray/tests/test_computation.py | 118 ++++++++++++++++++++++++++---- 3 files changed, 204 insertions(+), 40 deletions(-) diff --git a/xarray/core/computation.py b/xarray/core/computation.py index 784c722989c..a9c2c34bfed 100644 --- a/xarray/core/computation.py +++ b/xarray/core/computation.py @@ -500,6 +500,9 @@ def apply_variable_ufunc(func, *args, **kwargs): signature = kwargs.pop('signature') exclude_dims = kwargs.pop('exclude_dims', _DEFAULT_FROZEN_SET) + dask = kwargs.pop('dask', 'forbidden') + output_dtypes = kwargs.pop('output_dtypes', None) + output_sizes = kwargs.pop('output_sizes', None) if kwargs: raise TypeError('apply_variable_ufunc() got unexpected keyword ' 'arguments: %s' % list(kwargs)) @@ -515,6 +518,22 @@ def apply_variable_ufunc(func, *args, **kwargs): else arg for arg, core_dims in zip(args, signature.input_core_dims)] + if any(isinstance(array, dask_array_type) for array in input_data): + if dask == 'forbidden': + raise ValueError('encountered dask array, but did not set ' + "dask='allowed'") + elif dask == 'parallelized': + input_dims = [broadcast_dims + input_dims + for input_dims in signature.input_core_dims] + numpy_func = func + func = lambda *arrays: _apply_with_dask_atop( + numpy_func, arrays, input_dims, output_dims, signature, + output_dtypes, output_sizes) + elif dask == 'allowed': + pass + else: + raise ValueError('unknown setting for dask array handling: {}' + .format(dask)) result_data = func(*input_data) if signature.n_outputs > 1: @@ -527,24 +546,65 @@ def apply_variable_ufunc(func, *args, **kwargs): return Variable(dims, result_data) +def _apply_with_dask_atop(func, args, input_dims, output_dims, signature, + output_dtypes, output_sizes=None): + import dask.array as da + + if signature.n_outputs > 1: + raise NotImplementedError( + "multiple outputs not yet supported with dask='parallelized'") + + if output_dtypes is None: + raise ValueError( + "output dtypes (output_dtypes) required when using dask='parallelized'") + if len(output_dtypes) != signature.n_outputs: + raise ValueError('wrong number of output dtypes') + (dtype,) = output_dtypes + + if output_sizes is None: + output_sizes = {} + + new_dims = signature.all_output_core_dims - signature.all_input_core_dims + if any(dim not in output_sizes for dim in new_dims): + raise ValueError('output core dimensions not found on inputs must have ' + 'explicitly set sizes with ``output_sizes``: {}' + .format(new_dims)) + + args2 = [] + for data, core_dims in zip(args, signature.input_core_dims): + if isinstance(data, dask_array_type): + # core dimensions cannot span multiple chunks + chunks = {axis: (data.shape[axis],) + for axis, dim in enumerate(core_dims, -len(core_dims))} + data = data.rechunk(chunks) + args2.append(data) + + (out_ind,) = output_dims + atop_args = [ai for a in zip(args2, input_dims) for ai in a] + return da.atop(func, out_ind, *atop_args, dtype=dtype, concatenate=True, + new_axes=output_sizes) + + def apply_array_ufunc(func, *args, **kwargs): - """apply_variable_ufunc(func, *args, dask_array='forbidden') + """apply_array_ufunc(func, *args, dask='forbidden') """ - dask_array = kwargs.pop('dask_array', 'forbidden') + dask = kwargs.pop('dask', 'forbidden') if kwargs: raise TypeError('apply_array_ufunc() got unexpected keyword ' 'arguments: %s' % list(kwargs)) if any(isinstance(arg, dask_array_type) for arg in args): - # TODO: add a mode dask_array='auto' when dask.array gets a function - # for applying arbitrary gufuncs - if dask_array == 'forbidden': + if dask == 'forbidden': raise ValueError('encountered dask array, but did not set ' - "dask_array='allowed'") - elif dask_array != 'allowed': - raise ValueError('unknown setting for dask array handling: %r' - % dask_array) - # fall through + "dask='allowed'") + elif dask == 'parallelized': + raise ValueError("cannot use dask='parallelized' unless at least " + 'one input is an xarray object') + elif dask == 'allowed': + pass + else: + raise ValueError('unknown setting for dask array handling: {}' + .format(dask)) return func(*args) @@ -559,7 +619,9 @@ def apply_ufunc(func, *args, **kwargs): dataset_fill_value : Any = _DEFAULT_FILL_VALUE, keep_attrs : bool = False, kwargs : Mapping = None, - dask_array : str = 'forbidden') + dask_array : str = 'forbidden', + output_dtypes : Optional[Sequence] = None, + output_sizes : Optional[Mapping[Any, int]] = None) Apply a vectorized function for unlabeled arrays on xarray objects. @@ -630,10 +692,20 @@ def apply_ufunc(func, *args, **kwargs): Whether to copy attributes from the first argument to the output. kwargs: dict, optional Optional keyword arguments passed directly on to call ``func``. - dask_array: 'forbidden' or 'allowed', optional - Whether or not to allow applying the ufunc to objects containing lazy - data in the form of dask arrays. By default, this is forbidden, to - avoid implicitly converting lazy data. + dask: 'forbidden', 'allowed' or 'parallelized', optional + How to handle applying to objects containing lazy data in the form of + dask arrays: + - 'forbidden' (default): raise an error if a dask array is encountered. + - 'allowed': pass dask arrays directly on to ``func``. + - 'parallelized': automatically parallelize ``func`` if any of the + inputs are a dask array. If used, the ``otypes`` argument must also be + provided. Multiple output arguments are not yet supported. + output_dtypes : list of dtypes, optional + Optional list of output dtypes. Only used if dask='parallelized'. + output_sizes : dict, optional + Optional mapping from dimension names to sizes for outputs. Only used if + dask='parallelized' and new dimensions (not found on inputs) appear on + outputs. Returns ------- @@ -710,7 +782,9 @@ def stack(objects, dim, new_coord): exclude_dims = kwargs.pop('exclude_dims', frozenset()) dataset_fill_value = kwargs.pop('dataset_fill_value', _DEFAULT_FILL_VALUE) kwargs_ = kwargs.pop('kwargs', None) - dask_array = kwargs.pop('dask_array', 'forbidden') + dask = kwargs.pop('dask', 'forbidden') + output_dtypes = kwargs.pop('output_dtypes', None) + output_sizes = kwargs.pop('output_sizes', None) if kwargs: raise TypeError('apply_ufunc() got unexpected keyword arguments: %s' % list(kwargs)) @@ -727,12 +801,12 @@ def stack(objects, dim, new_coord): if kwargs_: func = functools.partial(func, **kwargs_) - array_ufunc = functools.partial( - apply_array_ufunc, func, dask_array=dask_array) - - variables_ufunc = functools.partial(apply_variable_ufunc, array_ufunc, + variables_ufunc = functools.partial(apply_variable_ufunc, func, signature=signature, - exclude_dims=exclude_dims) + exclude_dims=exclude_dims, + dask=dask, + output_dtypes=output_dtypes, + output_sizes=output_sizes) if any(isinstance(a, GroupBy) for a in args): # kwargs has already been added into func @@ -744,7 +818,7 @@ def stack(objects, dim, new_coord): dataset_join=dataset_join, dataset_fill_value=dataset_fill_value, keep_attrs=keep_attrs, - dask_array=dask_array) + dask=dask) return apply_groupby_ufunc(this_apply, *args) elif any(is_dict_like(a) for a in args): return apply_dataset_ufunc(variables_ufunc, *args, @@ -763,7 +837,7 @@ def stack(objects, dim, new_coord): elif any(isinstance(a, Variable) for a in args): return variables_ufunc(*args) else: - return array_ufunc(*args) + return apply_array_ufunc(func, *args, dask=dask) def where(cond, x, y): @@ -805,4 +879,4 @@ def where(cond, x, y): cond, x, y, join='exact', dataset_join='exact', - dask_array='allowed') + dask='allowed') diff --git a/xarray/tests/__init__.py b/xarray/tests/__init__.py index 424484b438a..33dd4305e9f 100644 --- a/xarray/tests/__init__.py +++ b/xarray/tests/__init__.py @@ -188,8 +188,8 @@ def raises_regex(error, pattern): with pytest.raises(error) as excinfo: yield message = str(excinfo.value) - if not re.match(pattern, message): - raise AssertionError('exception %r did not match pattern %s' + if not re.search(pattern, message): + raise AssertionError('exception %r did not match pattern %r' % (excinfo.value, pattern)) diff --git a/xarray/tests/test_computation.py b/xarray/tests/test_computation.py index 76d12d51616..c5e0b18c8bf 100644 --- a/xarray/tests/test_computation.py +++ b/xarray/tests/test_computation.py @@ -13,7 +13,7 @@ join_dict_keys, ordered_set_intersection, ordered_set_union, unified_dim_sizes, apply_ufunc) -from . import requires_dask +from . import requires_dask, raises_regex def assert_identical(a, b): @@ -344,7 +344,7 @@ def func(*x): output_core_dims=[[dim]], exclude_dims={dim}) if isinstance(result, (xr.Dataset, xr.DataArray)): - # note: this will fail dim is not a coordinate on any input + # note: this will fail if dim is not a coordinate on any input new_coord = np.concatenate([obj.coords[dim] for obj in objects]) result.coords[dim] = new_coord return result @@ -530,25 +530,17 @@ def add(a, b, join, dataset_join): assert_identical(actual, expected) -class _NoCacheVariable(xr.Variable): - """Subclass of Variable for testing that does not cache values.""" - # TODO: remove this class when we change the default behavior for caching - # dask.array objects. - def _data_cached(self): - return np.asarray(self._data) - - @requires_dask def test_apply_dask(): import dask.array as da array = da.ones((2,), chunks=2) - variable = _NoCacheVariable('x', array) + variable = xr.Variable('x', array) coords = xr.DataArray(variable).coords.variables data_array = xr.DataArray(variable, coords, fastpath=True) dataset = xr.Dataset({'y': variable}) - # encountered dask array, but did not set dask_array='allowed' + # encountered dask array, but did not set dask='allowed' with pytest.raises(ValueError): apply_ufunc(identity, array) with pytest.raises(ValueError): @@ -560,10 +552,10 @@ def test_apply_dask(): # unknown setting for dask array handling with pytest.raises(ValueError): - apply_ufunc(identity, array, dask_array='auto') + apply_ufunc(identity, array, dask='unknown') def dask_safe_identity(x): - return apply_ufunc(identity, x, dask_array='allowed') + return apply_ufunc(identity, x, dask='allowed') assert array is dask_safe_identity(array) @@ -580,6 +572,104 @@ def dask_safe_identity(x): assert_identical(dataset, actual) +@requires_dask +def test_apply_dask_parallelized(): + import dask.array as da + + array = da.ones((2, 2), chunks=(1, 1)) + data_array = xr.DataArray(array, dims=('x', 'y')) + + actual = apply_ufunc(identity, data_array, dask='parallelized', + output_dtypes=[float]) + assert isinstance(actual.data, da.Array) + assert actual.data.chunks == array.chunks + assert_identical(data_array, actual) + + # check rechunking of core dimensions + actual = apply_ufunc(identity, data_array, dask='parallelized', + output_dtypes=[float], + input_core_dims=[('y',)], + output_core_dims=[('y',)]) + assert isinstance(actual.data, da.Array) + assert actual.data.chunks == ((1, 1), (2,)) + assert_identical(data_array, actual) + + +@requires_dask +def test_apply_dask_parallelized_errors(): + import dask.array as da + + array = da.ones((2, 2), chunks=(1, 1)) + data_array = xr.DataArray(array, dims=('x', 'y')) + + with pytest.raises(NotImplementedError): + apply_ufunc(identity, data_array, output_core_dims=[['z'], ['z']], + dask='parallelized') + with raises_regex(ValueError, 'dtypes'): + apply_ufunc(identity, data_array, dask='parallelized') + with raises_regex(ValueError, 'wrong number'): + apply_ufunc(identity, data_array, dask='parallelized', + output_dtypes=[float, float]) + with raises_regex(ValueError, 'output_sizes'): + apply_ufunc(identity, data_array, output_core_dims=[['z']], + output_dtypes=[float], dask='parallelized') + with raises_regex(ValueError, 'at least one input is an xarray object'): + apply_ufunc(identity, array, dask='parallelized') + + +@requires_dask +def test_apply_dask_multiple_inputs(): + import dask.array as da + + def covariance(x, y): + return ((x - x.mean(axis=-1, keepdims=True)) + * (y - y.mean(axis=-1, keepdims=True))).mean(axis=-1) + + rs = np.random.RandomState(42) + array1 = da.from_array(rs.randn(4, 4), chunks=(2, 2)) + array2 = da.from_array(rs.randn(4, 4), chunks=(2, 2)) + data_array_1 = xr.DataArray(array1, dims=('x', 'y')) + data_array_2 = xr.DataArray(array2, dims=('x', 'y')) + + expected = apply_ufunc( + covariance, data_array_1.compute(), data_array_2.compute(), + input_core_dims=[['y'], ['y']]) + allowed = apply_ufunc( + covariance, data_array_1, data_array_2, input_core_dims=[['y'], ['y']], + dask='allowed') + assert isinstance(allowed.data, da.Array) + xr.testing.assert_allclose(expected, allowed.compute()) + + parallelized = apply_ufunc( + covariance, data_array_1, data_array_2, input_core_dims=[['y'], ['y']], + dask='parallelized', output_dtypes=[float]) + assert isinstance(parallelized.data, da.Array) + xr.testing.assert_allclose(expected, parallelized.compute()) + + +@requires_dask +def test_apply_dask_new_output_dimension(): + import dask.array as da + + array = da.ones((2, 2), chunks=(1, 1)) + data_array = xr.DataArray(array, dims=('x', 'y')) + + def stack_negative(obj): + def func(x): + return xr.core.npcompat.stack([x, -x], axis=-1) + return apply_ufunc(func, obj, output_core_dims=[['sign']], + dask='parallelized', output_dtypes=[obj.dtype], + output_sizes={'sign': 2}) + + expected = stack_negative(data_array.compute()) + + actual = stack_negative(data_array) + assert actual.dims == ('x', 'y', 'sign') + assert actual.shape == (2, 2, 2) + assert isinstance(actual.data, da.Array) + assert_identical(expected, actual) + + def test_where(): cond = xr.DataArray([True, False], dims='x') actual = xr.where(cond, 1, 0) From 22671389026b278772bd92ba8f2e00a339c92623 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Wed, 23 Aug 2017 19:47:06 -0700 Subject: [PATCH 2/7] Fix dask_array -> dask --- xarray/core/ops.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xarray/core/ops.py b/xarray/core/ops.py index b840f26073c..04f156a2998 100644 --- a/xarray/core/ops.py +++ b/xarray/core/ops.py @@ -148,7 +148,7 @@ def fillna(data, other, join="left", dataset_join="left"): return apply_ufunc(duck_array_ops.fillna, data, other, join=join, - dask_array="allowed", + dask="allowed", dataset_join=dataset_join, dataset_fill_value=np.nan, keep_attrs=True) @@ -176,7 +176,7 @@ def where_method(self, cond, other=dtypes.NA): self, cond, other, join=join, dataset_join=join, - dask_array='allowed', + dask='allowed', keep_attrs=True) From e95d311ca89eb381b33d13e57bc5d2d56d5f137e Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Sat, 16 Sep 2017 22:26:36 -0700 Subject: [PATCH 3/7] Delete unused code, better error messages --- xarray/core/computation.py | 108 ++++++++++++++----------------- xarray/tests/test_computation.py | 4 +- 2 files changed, 52 insertions(+), 60 deletions(-) diff --git a/xarray/core/computation.py b/xarray/core/computation.py index a9c2c34bfed..02f8c911ba0 100644 --- a/xarray/core/computation.py +++ b/xarray/core/computation.py @@ -68,28 +68,9 @@ def n_inputs(self): return len(self.input_core_dims) @property - def n_outputs(self): + def num_outputs(self): return len(self.output_core_dims) - @classmethod - def default(cls, n_inputs): - return cls([()] * n_inputs, [()]) - - @classmethod - def from_sequence(cls, nested): - if (not isinstance(nested, collections.Sequence) or - not len(nested) == 2 or - any(not isinstance(arg_list, collections.Sequence) - for arg_list in nested) or - any(isinstance(arg, basestring) or - not isinstance(arg, collections.Sequence) - for arg_list in nested for arg in arg_list)): - raise TypeError('functions signatures not provided as a string ' - 'must be a triply nested sequence providing the ' - 'list of core dimensions for each variable, for ' - 'both input and output.') - return cls(*nested) - def __eq__(self, other): try: return (self.input_core_dims == other.input_core_dims and @@ -190,7 +171,7 @@ def apply_dataarray_ufunc(func, *args, **kwargs): data_vars = [getattr(a, 'variable', a) for a in args] result_var = func(*data_vars) - if signature.n_outputs > 1: + if signature.num_outputs > 1: out = tuple(DataArray(variable, coords, name=name, fastpath=True) for variable, coords in zip(result_var, result_coords)) else: @@ -269,10 +250,10 @@ def _as_variables_or_variable(arg): def _unpack_dict_tuples( result_vars, # type: Mapping[Any, Tuple[Variable]] - n_outputs, # type: int + num_outputs, # type: int ): # type: (...) -> Tuple[Dict[Any, Variable]] - out = tuple(OrderedDict() for _ in range(n_outputs)) + out = tuple(OrderedDict() for _ in range(num_outputs)) for name, values in result_vars.items(): for value, results_dict in zip(values, out): results_dict[name] = value @@ -298,8 +279,8 @@ def apply_dict_of_variables_ufunc(func, *args, **kwargs): for name, variable_args in zip(names, grouped_by_name): result_vars[name] = func(*variable_args) - if signature.n_outputs > 1: - return _unpack_dict_tuples(result_vars, signature.n_outputs) + if signature.num_outputs > 1: + return _unpack_dict_tuples(result_vars, signature.num_outputs) else: return result_vars @@ -335,8 +316,8 @@ def apply_dataset_ufunc(func, *args, **kwargs): if (dataset_join not in _JOINS_WITHOUT_FILL_VALUES and fill_value is _DEFAULT_FILL_VALUE): - raise TypeError('To apply an operation to datasets with different ', - 'data variables, you must supply the ', + raise TypeError('to apply an operation to datasets with different ' + 'data variables with apply_ufunc, you must supply the ' 'dataset_fill_value argument.') if kwargs: @@ -353,7 +334,7 @@ def apply_dataset_ufunc(func, *args, **kwargs): func, *args, signature=signature, join=dataset_join, fill_value=fill_value) - if signature.n_outputs > 1: + if signature.num_outputs > 1: out = tuple(_fast_dataset(*args) for args in zip(result_vars, list_of_coords)) else: @@ -388,12 +369,12 @@ def apply_groupby_ufunc(func, *args): from .variable import Variable groupbys = [arg for arg in args if isinstance(arg, GroupBy)] - if not groupbys: - raise ValueError('must have at least one groupby to iterate over') + assert groupbys, 'must have at least one groupby to iterate over' first_groupby = groupbys[0] if any(not first_groupby._group.equals(gb._group) for gb in groupbys[1:]): - raise ValueError('can only perform operations over multiple groupbys ' - 'at once if they are all grouped the same way') + raise ValueError('apply_ufunc can only perform operations over ' + 'multiple GroupBy objets at once if they are all ' + 'grouped the same way') grouped_dim = first_groupby._group.name unique_values = first_groupby._unique_coord.values @@ -430,7 +411,7 @@ def unified_dim_sizes(variables, exclude_dims=frozenset()): for var in variables: if len(set(var.dims)) < len(var.dims): raise ValueError('broadcasting cannot handle duplicate ' - 'dimensions: %r' % list(var.dims)) + 'dimensions on a variable: %r' % list(var.dims)) for dim, size in zip(var.dims, var.shape): if dim not in exclude_dims: if dim not in dim_sizes: @@ -462,15 +443,17 @@ def broadcast_compat_data(variable, broadcast_dims, core_dims): set_old_dims = set(old_dims) missing_core_dims = [d for d in core_dims if d not in set_old_dims] if missing_core_dims: - raise ValueError('operation requires dimensions missing on input ' - 'variable: %r' % missing_core_dims) + raise ValueError('operand to apply_ufunc has required core dimensions ' + '%r, but some of these are missing on the input ' + 'variable: %r' % (list(core_dims), missing_core_dims)) set_new_dims = set(new_dims) unexpected_dims = [d for d in old_dims if d not in set_new_dims] if unexpected_dims: - raise ValueError('operation encountered unexpected dimensions %r ' - 'on input variable: these are core dimensions on ' - 'other input or output variables' % unexpected_dims) + raise ValueError('operand to apply_ufunc encountered unexpected ' + 'dimensions %r on an input variable: these are core ' + 'dimensions on other input or output variables' + % unexpected_dims) # for consistency with numpy, keep broadcast dimensions to the left old_broadcast_dims = tuple(d for d in broadcast_dims if d in set_old_dims) @@ -520,8 +503,11 @@ def apply_variable_ufunc(func, *args, **kwargs): if any(isinstance(array, dask_array_type) for array in input_data): if dask == 'forbidden': - raise ValueError('encountered dask array, but did not set ' - "dask='allowed'") + raise ValueError('apply_ufunc encountered a dask array on an ' + 'argument, but handling for dask arrays has not ' + 'been enabled. Either set the ``dask`` argument ' + 'or load your data into memory first with ' + '``.load()`` or ``.compute()``') elif dask == 'parallelized': input_dims = [broadcast_dims + input_dims for input_dims in signature.input_core_dims] @@ -532,11 +518,11 @@ def apply_variable_ufunc(func, *args, **kwargs): elif dask == 'allowed': pass else: - raise ValueError('unknown setting for dask array handling: {}' - .format(dask)) + raise ValueError('unknown setting for dask array handling in ' + 'apply_ufunc: {}'.format(dask)) result_data = func(*input_data) - if signature.n_outputs > 1: + if signature.num_outputs > 1: output = [] for dims, data in zip(output_dims, result_data): output.append(Variable(dims, data)) @@ -550,15 +536,17 @@ def _apply_with_dask_atop(func, args, input_dims, output_dims, signature, output_dtypes, output_sizes=None): import dask.array as da - if signature.n_outputs > 1: - raise NotImplementedError( - "multiple outputs not yet supported with dask='parallelized'") + if signature.num_outputs > 1: + raise NotImplementedError('multiple outputs from apply_ufunc not yet ' + "supported with dask='parallelized'") if output_dtypes is None: - raise ValueError( - "output dtypes (output_dtypes) required when using dask='parallelized'") - if len(output_dtypes) != signature.n_outputs: - raise ValueError('wrong number of output dtypes') + raise ValueError('output dtypes (output_dtypes) must be supplied to ' + "apply_func when using dask='parallelized'") + if len(output_dtypes) != signature.num_outputs: + raise ValueError('apply_ufunc arguments output_dtypes and ' + 'output_core_dims must have the same length: {} vs {}' + .format(len(output_dtypes), signature.num_outputs)) (dtype,) = output_dtypes if output_sizes is None: @@ -566,7 +554,8 @@ def _apply_with_dask_atop(func, args, input_dims, output_dims, signature, new_dims = signature.all_output_core_dims - signature.all_input_core_dims if any(dim not in output_sizes for dim in new_dims): - raise ValueError('output core dimensions not found on inputs must have ' + raise ValueError("when using dask='parallelized' with apply_ufunc, " + 'output core dimensions not found on inputs must have ' 'explicitly set sizes with ``output_sizes``: {}' .format(new_dims)) @@ -595,11 +584,14 @@ def apply_array_ufunc(func, *args, **kwargs): if any(isinstance(arg, dask_array_type) for arg in args): if dask == 'forbidden': - raise ValueError('encountered dask array, but did not set ' - "dask='allowed'") + raise ValueError('apply_ufunc encountered a dask array on an ' + 'argument, but handling for dask arrays has not ' + 'been enabled. Either set the ``dask`` argument ' + 'or load your data into memory first with ' + '``.load()`` or ``.compute()``') elif dask == 'parallelized': - raise ValueError("cannot use dask='parallelized' unless at least " - 'one input is an xarray object') + raise ValueError("cannot use dask='parallelized' for apply_ufunc " + 'unless at least one input is an xarray object') elif dask == 'allowed': pass else: @@ -619,7 +611,7 @@ def apply_ufunc(func, *args, **kwargs): dataset_fill_value : Any = _DEFAULT_FILL_VALUE, keep_attrs : bool = False, kwargs : Mapping = None, - dask_array : str = 'forbidden', + dask : str = 'forbidden', output_dtypes : Optional[Sequence] = None, output_sizes : Optional[Mapping[Any, int]] = None) @@ -698,8 +690,8 @@ def apply_ufunc(func, *args, **kwargs): - 'forbidden' (default): raise an error if a dask array is encountered. - 'allowed': pass dask arrays directly on to ``func``. - 'parallelized': automatically parallelize ``func`` if any of the - inputs are a dask array. If used, the ``otypes`` argument must also be - provided. Multiple output arguments are not yet supported. + inputs are a dask array. If used, the ``output_dtypes`` argument must + also be provided. Multiple output arguments are not yet supported. output_dtypes : list of dtypes, optional Optional list of output dtypes. Only used if dask='parallelized'. output_sizes : dict, optional diff --git a/xarray/tests/test_computation.py b/xarray/tests/test_computation.py index c5e0b18c8bf..6978e11f0c5 100644 --- a/xarray/tests/test_computation.py +++ b/xarray/tests/test_computation.py @@ -30,8 +30,8 @@ def test_signature_properties(): assert sig.output_core_dims == (('z',),) assert sig.all_input_core_dims == frozenset(['x', 'y']) assert sig.all_output_core_dims == frozenset(['z']) - assert sig.n_inputs == 2 - assert sig.n_outputs == 1 + assert sig.num_inputs == 2 + assert sig.num_outputs == 1 # dimension names matter assert _UFuncSignature([['x']]) != _UFuncSignature([['y']]) From ee95e63585272abced177a7b0b34da4d6116a2c1 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Sun, 17 Sep 2017 16:24:06 -0700 Subject: [PATCH 4/7] fix tests --- xarray/core/computation.py | 2 +- xarray/tests/test_computation.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/xarray/core/computation.py b/xarray/core/computation.py index 02f8c911ba0..a51ca460381 100644 --- a/xarray/core/computation.py +++ b/xarray/core/computation.py @@ -64,7 +64,7 @@ def all_core_dims(self): return self._all_core_dims @property - def n_inputs(self): + def num_inputs(self): return len(self.input_core_dims) @property diff --git a/xarray/tests/test_computation.py b/xarray/tests/test_computation.py index 6978e11f0c5..c860441dab9 100644 --- a/xarray/tests/test_computation.py +++ b/xarray/tests/test_computation.py @@ -607,7 +607,7 @@ def test_apply_dask_parallelized_errors(): dask='parallelized') with raises_regex(ValueError, 'dtypes'): apply_ufunc(identity, data_array, dask='parallelized') - with raises_regex(ValueError, 'wrong number'): + with raises_regex(ValueError, 'must have the same length'): apply_ufunc(identity, data_array, dask='parallelized', output_dtypes=[float, float]) with raises_regex(ValueError, 'output_sizes'): From ce9a0c0cfa76e678171cec17f02b9729f259724e Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Sun, 17 Sep 2017 16:47:59 -0700 Subject: [PATCH 5/7] really fix tests --- xarray/tests/test_computation.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/xarray/tests/test_computation.py b/xarray/tests/test_computation.py index c860441dab9..ad81c473a3e 100644 --- a/xarray/tests/test_computation.py +++ b/xarray/tests/test_computation.py @@ -275,7 +275,7 @@ def test_apply_output_core_dimension(): def stack_negative(obj): def func(x): - return xr.core.npcompat.stack([x, -x], axis=-1) + return np.stack([x, -x], axis=-1) result = apply_ufunc(func, obj, output_core_dims=[['sign']]) if isinstance(result, (xr.Dataset, xr.DataArray)): result.coords['sign'] = [1, -1] @@ -303,7 +303,7 @@ def func(x): def original_and_stack_negative(obj): def func(x): - return (x, xr.core.npcompat.stack([x, -x], axis=-1)) + return (x, np.stack([x, -x], axis=-1)) result = apply_ufunc(func, obj, output_core_dims=[[], ['sign']]) if isinstance(result[1], (xr.Dataset, xr.DataArray)): result[1].coords['sign'] = [1, -1] @@ -656,7 +656,7 @@ def test_apply_dask_new_output_dimension(): def stack_negative(obj): def func(x): - return xr.core.npcompat.stack([x, -x], axis=-1) + return np.stack([x, -x], axis=-1) return apply_ufunc(func, obj, output_core_dims=[['sign']], dask='parallelized', output_dtypes=[obj.dtype], output_sizes={'sign': 2}) From 70200df0670e54bc0665c6d267ca3da5a39123c4 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Mon, 25 Sep 2017 22:26:04 -0700 Subject: [PATCH 6/7] wip: raise errors --- xarray/core/computation.py | 14 ++++++++++---- xarray/tests/test_computation.py | 13 ++++++++----- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/xarray/core/computation.py b/xarray/core/computation.py index a51ca460381..b9cfabdb760 100644 --- a/xarray/core/computation.py +++ b/xarray/core/computation.py @@ -543,6 +543,9 @@ def _apply_with_dask_atop(func, args, input_dims, output_dims, signature, if output_dtypes is None: raise ValueError('output dtypes (output_dtypes) must be supplied to ' "apply_func when using dask='parallelized'") + if not isinstance(output_dtypes, list): + raise TypeError('output_dtypes must be a list of objects coercible to ' + 'numpy dtypes, got {}'.format(output_dtypes)) if len(output_dtypes) != signature.num_outputs: raise ValueError('apply_ufunc arguments output_dtypes and ' 'output_core_dims must have the same length: {} vs {}' @@ -564,12 +567,15 @@ def _apply_with_dask_atop(func, args, input_dims, output_dims, signature, if isinstance(data, dask_array_type): # core dimensions cannot span multiple chunks chunks = {axis: (data.shape[axis],) - for axis, dim in enumerate(core_dims, -len(core_dims))} + for axis, dim in enumerate(core_dims, -len(core_dims))} data = data.rechunk(chunks) args2.append(data) (out_ind,) = output_dims - atop_args = [ai for a in zip(args2, input_dims) for ai in a] + # skip leading dimensions that we did not insert with broadcast_compat_data + atop_args = [element + for (arg, dims) in zip(args2, input_dims) + for element in (arg, dims[-getattr(arg, 'ndim', 0):])] return da.atop(func, out_ind, *atop_args, dtype=dtype, concatenate=True, new_axes=output_sizes) @@ -635,8 +641,8 @@ def apply_ufunc(func, *args, **kwargs): Mix of labeled and/or unlabeled arrays to which to apply the function. input_core_dims : Sequence[Sequence], optional List of the same length as ``args`` giving the list of core dimensions - on each input argument that should be broadcast. By default, we assume - there are no core dimensions on any input arguments. + on each input argument that should not be broadcast. By default, we + assume there are no core dimensions on any input arguments. For example ,``input_core_dims=[[], ['time']]`` indicates that all dimensions on the first argument and all dimensions other than 'time' diff --git a/xarray/tests/test_computation.py b/xarray/tests/test_computation.py index ad81c473a3e..b911c7d8197 100644 --- a/xarray/tests/test_computation.py +++ b/xarray/tests/test_computation.py @@ -607,6 +607,9 @@ def test_apply_dask_parallelized_errors(): dask='parallelized') with raises_regex(ValueError, 'dtypes'): apply_ufunc(identity, data_array, dask='parallelized') + with raises_regex(TypeError, 'list'): + apply_ufunc(identity, data_array, dask='parallelized', + output_dtypes=float) with raises_regex(ValueError, 'must have the same length'): apply_ufunc(identity, data_array, dask='parallelized', output_dtypes=[float, float]) @@ -628,20 +631,20 @@ def covariance(x, y): rs = np.random.RandomState(42) array1 = da.from_array(rs.randn(4, 4), chunks=(2, 2)) array2 = da.from_array(rs.randn(4, 4), chunks=(2, 2)) - data_array_1 = xr.DataArray(array1, dims=('x', 'y')) - data_array_2 = xr.DataArray(array2, dims=('x', 'y')) + data_array_1 = xr.DataArray(array1, dims=('x', 'z')) + data_array_2 = xr.DataArray(array2, dims=('y', 'z')) expected = apply_ufunc( covariance, data_array_1.compute(), data_array_2.compute(), - input_core_dims=[['y'], ['y']]) + input_core_dims=[['z'], ['z']]) allowed = apply_ufunc( - covariance, data_array_1, data_array_2, input_core_dims=[['y'], ['y']], + covariance, data_array_1, data_array_2, input_core_dims=[['z'], ['z']], dask='allowed') assert isinstance(allowed.data, da.Array) xr.testing.assert_allclose(expected, allowed.compute()) parallelized = apply_ufunc( - covariance, data_array_1, data_array_2, input_core_dims=[['y'], ['y']], + covariance, data_array_1, data_array_2, input_core_dims=[['z'], ['z']], dask='parallelized', output_dtypes=[float]) assert isinstance(parallelized.data, da.Array) xr.testing.assert_allclose(expected, parallelized.compute()) From bdeb875ac7160f4e17c6821413fbb684edd5d483 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Fri, 6 Oct 2017 19:22:44 -0700 Subject: [PATCH 7/7] Raise an error instead of automatic rechunking --- xarray/core/computation.py | 20 +++++++++++++------- xarray/tests/test_computation.py | 19 ++++++++----------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/xarray/core/computation.py b/xarray/core/computation.py index b9cfabdb760..a3838a004ce 100644 --- a/xarray/core/computation.py +++ b/xarray/core/computation.py @@ -562,19 +562,25 @@ def _apply_with_dask_atop(func, args, input_dims, output_dims, signature, 'explicitly set sizes with ``output_sizes``: {}' .format(new_dims)) - args2 = [] - for data, core_dims in zip(args, signature.input_core_dims): + for n, (data, core_dims) in enumerate( + zip(args, signature.input_core_dims)): if isinstance(data, dask_array_type): # core dimensions cannot span multiple chunks - chunks = {axis: (data.shape[axis],) - for axis, dim in enumerate(core_dims, -len(core_dims))} - data = data.rechunk(chunks) - args2.append(data) + for axis, dim in enumerate(core_dims, start=-len(core_dims)): + if len(data.chunks[axis]) != 1: + raise ValueError( + 'dimension {!r} on {}th function argument to ' + "apply_ufunc with dask='parallelized' consists of " + 'multiple chunks, but is also a core dimension. To ' + 'fix, rechunk into a single dask array chunk along ' + 'this dimension, i.e., ``.rechunk({})``, but beware ' + 'that this may significantly increase memory usage.' + .format(dim, n, {dim: -1})) (out_ind,) = output_dims # skip leading dimensions that we did not insert with broadcast_compat_data atop_args = [element - for (arg, dims) in zip(args2, input_dims) + for (arg, dims) in zip(args, input_dims) for element in (arg, dims[-getattr(arg, 'ndim', 0):])] return da.atop(func, out_ind, *atop_args, dtype=dtype, concatenate=True, new_axes=output_sizes) diff --git a/xarray/tests/test_computation.py b/xarray/tests/test_computation.py index b911c7d8197..bb19692cb03 100644 --- a/xarray/tests/test_computation.py +++ b/xarray/tests/test_computation.py @@ -585,15 +585,6 @@ def test_apply_dask_parallelized(): assert actual.data.chunks == array.chunks assert_identical(data_array, actual) - # check rechunking of core dimensions - actual = apply_ufunc(identity, data_array, dask='parallelized', - output_dtypes=[float], - input_core_dims=[('y',)], - output_core_dims=[('y',)]) - assert isinstance(actual.data, da.Array) - assert actual.data.chunks == ((1, 1), (2,)) - assert_identical(data_array, actual) - @requires_dask def test_apply_dask_parallelized_errors(): @@ -619,6 +610,12 @@ def test_apply_dask_parallelized_errors(): with raises_regex(ValueError, 'at least one input is an xarray object'): apply_ufunc(identity, array, dask='parallelized') + with raises_regex(ValueError, 'consists of multiple chunks'): + apply_ufunc(identity, data_array, dask='parallelized', + output_dtypes=[float], + input_core_dims=[('y',)], + output_core_dims=[('y',)]) + @requires_dask def test_apply_dask_multiple_inputs(): @@ -629,8 +626,8 @@ def covariance(x, y): * (y - y.mean(axis=-1, keepdims=True))).mean(axis=-1) rs = np.random.RandomState(42) - array1 = da.from_array(rs.randn(4, 4), chunks=(2, 2)) - array2 = da.from_array(rs.randn(4, 4), chunks=(2, 2)) + array1 = da.from_array(rs.randn(4, 4), chunks=(2, 4)) + array2 = da.from_array(rs.randn(4, 4), chunks=(2, 4)) data_array_1 = xr.DataArray(array1, dims=('x', 'z')) data_array_2 = xr.DataArray(array2, dims=('y', 'z'))