diff --git a/adaptive/learner/average_learner1D.py b/adaptive/learner/average_learner1D.py index ad015793f..4f91bb0eb 100644 --- a/adaptive/learner/average_learner1D.py +++ b/adaptive/learner/average_learner1D.py @@ -3,7 +3,17 @@ from collections import defaultdict from copy import deepcopy from math import hypot -from typing import Callable, DefaultDict, Dict, List, Optional, Sequence, Set, Tuple +from typing import ( + Callable, + DefaultDict, + Dict, + Iterable, + List, + Optional, + Sequence, + Set, + Tuple, +) import numpy as np import scipy.stats @@ -356,7 +366,7 @@ def _update_losses_resampling(self, x: Real, real=True) -> None: if (b is not None) and right_loss_is_unknown: self.losses_combined[x, b] = float("inf") - def _calc_error_in_mean(self, ys: Sequence[Real], y_avg: Real, n: int) -> float: + def _calc_error_in_mean(self, ys: Iterable[Real], y_avg: Real, n: int) -> float: variance_in_mean = sum((y - y_avg) ** 2 for y in ys) / (n - 1) t_student = scipy.stats.t.ppf(1 - self.alpha, df=n - 1) return t_student * (variance_in_mean / n) ** 0.5 diff --git a/adaptive/learner/base_learner.py b/adaptive/learner/base_learner.py index f7e3212c9..e79261a16 100644 --- a/adaptive/learner/base_learner.py +++ b/adaptive/learner/base_learner.py @@ -5,14 +5,14 @@ from adaptive.utils import _RequireAttrsABCMeta, load, save -def uses_nth_neighbors(n): +def uses_nth_neighbors(n: int): """Decorator to specify how many neighboring intervals the loss function uses. Wraps loss functions to indicate that they expect intervals together with ``n`` nearest neighbors The loss function will then receive the data of the N nearest neighbors - (``nth_neighbors``) aling with the data of the interval itself in a dict. + (``nth_neighbors``) along with the data of the interval itself in a dict. The `~adaptive.Learner1D` will also make sure that the loss is updated whenever one of the ``nth_neighbors`` changes. diff --git a/adaptive/learner/learner1D.py b/adaptive/learner/learner1D.py index 1f521a028..9dd3aae4d 100644 --- a/adaptive/learner/learner1D.py +++ b/adaptive/learner/learner1D.py @@ -1,22 +1,54 @@ +import collections.abc import itertools import math -from collections.abc import Iterable from copy import deepcopy +from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Union import cloudpickle import numpy as np -import sortedcollections -import sortedcontainers +from sortedcollections.recipes import ItemSortedDict +from sortedcontainers.sorteddict import SortedDict from adaptive.learner.base_learner import BaseLearner, uses_nth_neighbors from adaptive.learner.learnerND import volume from adaptive.learner.triangulation import simplex_volume_in_embedding from adaptive.notebook_integration import ensure_holoviews +from adaptive.types import Float, Int, Real from adaptive.utils import cache_latest +# -- types -- + +# Commonly used types +Interval = Union[Tuple[float, float], Tuple[float, float, int]] +NeighborsType = Dict[float, List[Union[float, None]]] + +# Types for loss_per_interval functions +NoneFloat = Union[Float, None] +NoneArray = Union[np.ndarray, None] +XsType0 = Tuple[Float, Float] +YsType0 = Union[Tuple[Float, Float], Tuple[np.ndarray, np.ndarray]] +XsType1 = Tuple[NoneFloat, NoneFloat, NoneFloat, NoneFloat] +YsType1 = Union[ + Tuple[NoneFloat, NoneFloat, NoneFloat, NoneFloat], + Tuple[NoneArray, NoneArray, NoneArray, NoneArray], +] +XsTypeN = Tuple[NoneFloat, ...] +YsTypeN = Union[Tuple[NoneFloat, ...], Tuple[NoneArray, ...]] + + +__all__ = [ + "uniform_loss", + "default_loss", + "abs_min_log_loss", + "triangle_loss", + "resolution_loss_function", + "curvature_loss_function", + "Learner1D", +] + @uses_nth_neighbors(0) -def uniform_loss(xs, ys): +def uniform_loss(xs: XsType0, ys: YsType0) -> Float: """Loss function that samples the domain uniformly. Works with `~adaptive.Learner1D` only. @@ -36,7 +68,7 @@ def uniform_loss(xs, ys): @uses_nth_neighbors(0) -def default_loss(xs, ys): +def default_loss(xs: XsType0, ys: YsType0) -> Float: """Calculate loss on a single interval. Currently returns the rescaled length of the interval. If one of the @@ -44,23 +76,24 @@ def default_loss(xs, ys): never touched. This behavior should be improved later. """ dx = xs[1] - xs[0] - if isinstance(ys[0], Iterable): - dy = [abs(a - b) for a, b in zip(*ys)] - return np.hypot(dx, dy).max() + if isinstance(ys[0], collections.abc.Iterable): + dy_vec = np.array([abs(a - b) for a, b in zip(*ys)]) + return np.hypot(dx, dy_vec).max() else: dy = ys[1] - ys[0] return np.hypot(dx, dy) @uses_nth_neighbors(0) -def abs_min_log_loss(xs, ys): +def abs_min_log_loss(xs: XsType0, ys: YsType0) -> Float: """Calculate loss of a single interval that prioritizes the absolute minimum.""" - ys = [np.log(np.abs(y).min()) for y in ys] + ys = tuple(np.log(np.abs(y).min()) for y in ys) return default_loss(xs, ys) @uses_nth_neighbors(1) -def triangle_loss(xs, ys): +def triangle_loss(xs: XsType1, ys: YsType1) -> Float: + assert len(xs) == 4 xs = [x for x in xs if x is not None] ys = [y for y in ys if y is not None] @@ -68,7 +101,7 @@ def triangle_loss(xs, ys): return xs[1] - xs[0] N = len(xs) - 2 # number of constructed triangles - if isinstance(ys[0], Iterable): + if isinstance(ys[0], collections.abc.Iterable): pts = [(x, *y) for x, y in zip(xs, ys)] vol = simplex_volume_in_embedding else: @@ -77,7 +110,9 @@ def triangle_loss(xs, ys): return sum(vol(pts[i : i + 3]) for i in range(N)) / N -def resolution_loss_function(min_length=0, max_length=1): +def resolution_loss_function( + min_length: Real = 0, max_length: Real = 1 +) -> Callable[[XsType0, YsType0], Float]: """Loss function that is similar to the `default_loss` function, but you can set the maximum and minimum size of an interval. @@ -100,7 +135,7 @@ def resolution_loss_function(min_length=0, max_length=1): """ @uses_nth_neighbors(0) - def resolution_loss(xs, ys): + def resolution_loss(xs: XsType0, ys: YsType0) -> Float: loss = uniform_loss(xs, ys) if loss < min_length: # Return zero such that this interval won't be chosen again @@ -114,10 +149,12 @@ def resolution_loss(xs, ys): return resolution_loss -def curvature_loss_function(area_factor=1, euclid_factor=0.02, horizontal_factor=0.02): +def curvature_loss_function( + area_factor: Real = 1, euclid_factor: Real = 0.02, horizontal_factor: Real = 0.02 +) -> Callable[[XsType1, YsType1], Float]: # XXX: add a doc-string @uses_nth_neighbors(1) - def curvature_loss(xs, ys): + def curvature_loss(xs: XsType1, ys: YsType1) -> Float: xs_middle = xs[1:3] ys_middle = ys[1:3] @@ -133,7 +170,7 @@ def curvature_loss(xs, ys): return curvature_loss -def linspace(x_left, x_right, n): +def linspace(x_left: Real, x_right: Real, n: Int) -> List[Float]: """This is equivalent to 'np.linspace(x_left, x_right, n, endpoint=False)[1:]', but it is 15-30 times faster for small 'n'.""" @@ -145,17 +182,19 @@ def linspace(x_left, x_right, n): return [x_left + step * i for i in range(1, n)] -def _get_neighbors_from_list(xs): +def _get_neighbors_from_array(xs: np.ndarray) -> NeighborsType: xs = np.sort(xs) xs_left = np.roll(xs, 1).tolist() xs_right = np.roll(xs, -1).tolist() xs_left[0] = None xs_right[-1] = None neighbors = {x: [x_L, x_R] for x, x_L, x_R in zip(xs, xs_left, xs_right)} - return sortedcontainers.SortedDict(neighbors) + return SortedDict(neighbors) -def _get_intervals(x, neighbors, nth_neighbors): +def _get_intervals( + x: float, neighbors: NeighborsType, nth_neighbors: int +) -> List[Tuple[float, float]]: nn = nth_neighbors i = neighbors.index(x) start = max(0, i - nn - 1) @@ -208,8 +247,13 @@ class Learner1D(BaseLearner): decorator for more information. """ - def __init__(self, function, bounds, loss_per_interval=None): - self.function = function + def __init__( + self, + function: Callable[[Real], Union[Float, np.ndarray]], + bounds: Tuple[Real, Real], + loss_per_interval: Optional[Callable[[XsTypeN, YsTypeN], Float]] = None, + ): + self.function = function # type: ignore if hasattr(loss_per_interval, "nth_neighbors"): self.nth_neighbors = loss_per_interval.nth_neighbors @@ -223,13 +267,13 @@ def __init__(self, function, bounds, loss_per_interval=None): # the learners behavior in the tests. self._recompute_losses_factor = 2 - self.data = {} - self.pending_points = set() + self.data: Dict[Real, Real] = {} + self.pending_points: Set[Real] = set() # A dict {x_n: [x_{n-1}, x_{n+1}]} for quick checking of local # properties. - self.neighbors = sortedcontainers.SortedDict() - self.neighbors_combined = sortedcontainers.SortedDict() + self.neighbors: NeighborsType = SortedDict() + self.neighbors_combined: NeighborsType = SortedDict() # Bounding box [[minx, maxx], [miny, maxy]]. self._bbox = [list(bounds), [np.inf, -np.inf]] @@ -247,10 +291,10 @@ def __init__(self, function, bounds, loss_per_interval=None): self.bounds = list(bounds) - self._vdim = None + self._vdim: Optional[int] = None @property - def vdim(self): + def vdim(self) -> int: """Length of the output of ``learner.function``. If the output is unsized (when it's a scalar) then `vdim = 1`. @@ -275,35 +319,37 @@ def to_numpy(self): return np.array([(x, *np.atleast_1d(y)) for x, y in sorted(self.data.items())]) @property - def npoints(self): + def npoints(self) -> int: """Number of evaluated points.""" return len(self.data) @cache_latest - def loss(self, real=True): + def loss(self, real: bool = True) -> float: losses = self.losses if real else self.losses_combined if not losses: return np.inf max_interval, max_loss = losses.peekitem(0) return max_loss - def _scale_x(self, x): + def _scale_x(self, x: Optional[Float]) -> Optional[Float]: if x is None: return None return x / self._scale[0] - def _scale_y(self, y): + def _scale_y( + self, y: Union[Float, np.ndarray, None] + ) -> Union[Float, np.ndarray, None]: if y is None: return None y_scale = self._scale[1] or 1 return y / y_scale - def _get_point_by_index(self, ind): + def _get_point_by_index(self, ind: int) -> Optional[float]: if ind < 0 or ind >= len(self.neighbors): return None return self.neighbors.keys()[ind] - def _get_loss_in_interval(self, x_left, x_right): + def _get_loss_in_interval(self, x_left: float, x_right: float) -> float: assert x_left is not None and x_right is not None if x_right - x_left < self._dx_eps: @@ -323,7 +369,9 @@ def _get_loss_in_interval(self, x_left, x_right): # we need to compute the loss for this interval return self.loss_per_interval(xs_scaled, ys_scaled) - def _update_interpolated_loss_in_interval(self, x_left, x_right): + def _update_interpolated_loss_in_interval( + self, x_left: float, x_right: float + ) -> None: if x_left is None or x_right is None: return @@ -339,7 +387,7 @@ def _update_interpolated_loss_in_interval(self, x_left, x_right): self.losses_combined[a, b] = (b - a) * loss / dx a = b - def _update_losses(self, x, real=True): + def _update_losses(self, x: float, real: bool = True) -> None: """Update all losses that depend on x""" # When we add a new point x, we should update the losses # (x_left, x_right) are the "real" neighbors of 'x'. @@ -382,7 +430,7 @@ def _update_losses(self, x, real=True): self.losses_combined[x, b] = float("inf") @staticmethod - def _find_neighbors(x, neighbors): + def _find_neighbors(x: float, neighbors: NeighborsType) -> Any: if x in neighbors: return neighbors[x] pos = neighbors.bisect_left(x) @@ -391,14 +439,14 @@ def _find_neighbors(x, neighbors): x_right = keys[pos] if pos != len(neighbors) else None return x_left, x_right - def _update_neighbors(self, x, neighbors): + def _update_neighbors(self, x: float, neighbors: NeighborsType) -> None: if x not in neighbors: # The point is new x_left, x_right = self._find_neighbors(x, neighbors) neighbors[x] = [x_left, x_right] neighbors.get(x_left, [None, None])[1] = x neighbors.get(x_right, [None, None])[0] = x - def _update_scale(self, x, y): + def _update_scale(self, x: float, y: Union[Float, np.ndarray]) -> None: """Update the scale with which the x and y-values are scaled. For a learner where the function returns a single scalar the scale @@ -425,7 +473,7 @@ def _update_scale(self, x, y): self._bbox[1][1] = max(self._bbox[1][1], y) self._scale[1] = self._bbox[1][1] - self._bbox[1][0] - def tell(self, x, y): + def tell(self, x: float, y: Union[Float, Sequence[Float], np.ndarray]) -> None: if x in self.data: # The point is already evaluated before return @@ -460,7 +508,7 @@ def tell(self, x, y): self._oldscale = deepcopy(self._scale) - def tell_pending(self, x): + def tell_pending(self, x: float) -> None: if x in self.data: # The point is already evaluated before return @@ -468,7 +516,17 @@ def tell_pending(self, x): self._update_neighbors(x, self.neighbors_combined) self._update_losses(x, real=False) - def tell_many(self, xs, ys, *, force=False): + def tell_many( + self, + xs: Sequence[Float], + ys: Union[ + Sequence[Float], + Sequence[Sequence[Float]], + Sequence[np.ndarray], + ], + *, + force: bool = False + ) -> None: if not force and not (len(xs) > 0.5 * len(self.data) and len(xs) > 2): # Only run this more efficient method if there are # at least 2 points and the amount of points added are @@ -488,8 +546,8 @@ def tell_many(self, xs, ys, *, force=False): points_combined = np.hstack([points_pending, points]) # Generate neighbors - self.neighbors = _get_neighbors_from_list(points) - self.neighbors_combined = _get_neighbors_from_list(points_combined) + self.neighbors = _get_neighbors_from_array(points) + self.neighbors_combined = _get_neighbors_from_array(points_combined) # Update scale self._bbox[0] = [points_combined.min(), points_combined.max()] @@ -536,7 +594,7 @@ def tell_many(self, xs, ys, *, force=False): # have an inf loss. self._update_interpolated_loss_in_interval(*ival) - def ask(self, n, tell_pending=True): + def ask(self, n: int, tell_pending: bool = True) -> Tuple[List[float], List[float]]: """Return 'n' points that are expected to maximally reduce the loss.""" points, loss_improvements = self._ask_points_without_adding(n) @@ -546,7 +604,7 @@ def ask(self, n, tell_pending=True): return points, loss_improvements - def _ask_points_without_adding(self, n): + def _ask_points_without_adding(self, n: int) -> Tuple[List[float], List[float]]: """Return 'n' points that are expected to maximally reduce the loss. Without altering the state of the learner""" # Find out how to divide the n points over the intervals @@ -573,7 +631,8 @@ def _ask_points_without_adding(self, n): # Add bound intervals to quals if bounds were missing. if len(self.data) + len(self.pending_points) == 0: # We don't have any points, so return a linspace with 'n' points. - return np.linspace(*self.bounds, n).tolist(), [np.inf] * n + a, b = self.bounds + return np.linspace(a, b, n).tolist(), [np.inf] * n quals = loss_manager(self._scale[0]) if len(missing_bounds) > 0: @@ -624,11 +683,13 @@ def _ask_points_without_adding(self, n): return points, loss_improvements - def _loss(self, mapping, ival): + def _loss( + self, mapping: Dict[Interval, float], ival: Interval + ) -> Tuple[float, Interval]: loss = mapping[ival] return finite_loss(ival, loss, self._scale[0]) - def plot(self, *, scatter_or_line="scatter"): + def plot(self, *, scatter_or_line: str = "scatter"): """Returns a plot of the evaluated data. Parameters @@ -663,22 +724,23 @@ def plot(self, *, scatter_or_line="scatter"): return p.redim(x=dict(range=plot_bounds)) - def remove_unfinished(self): + def remove_unfinished(self) -> None: self.pending_points = set() self.losses_combined = deepcopy(self.losses) self.neighbors_combined = deepcopy(self.neighbors) - def _get_data(self): + def _get_data(self) -> Dict[float, float]: return self.data - def _set_data(self, data): + def _set_data(self, data: Dict[float, float]) -> None: if data: - self.tell_many(*zip(*data.items())) + xs, ys = zip(*data.items()) + self.tell_many(xs, ys) def __getstate__(self): return ( cloudpickle.dumps(self.function), - self.bounds, + tuple(self.bounds), self.loss_per_interval, dict(self.losses), # SortedDict cannot be pickled dict(self.losses_combined), # ItemSortedDict cannot be pickled @@ -694,17 +756,17 @@ def __setstate__(self, state): self.losses_combined.update(losses_combined) -def loss_manager(x_scale): +def loss_manager(x_scale: float) -> Dict[Interval, float]: def sort_key(ival, loss): loss, ival = finite_loss(ival, loss, x_scale) return -loss, ival - sorted_dict = sortedcollections.ItemSortedDict(sort_key) + sorted_dict = ItemSortedDict(sort_key) return sorted_dict -def finite_loss(ival, loss, x_scale): - """Get the socalled finite_loss of an interval in order to be able to +def finite_loss(ival: Interval, loss: float, x_scale: float) -> Tuple[float, Interval]: + """Get the so-called finite_loss of an interval in order to be able to sort intervals that have infinite loss.""" # If the loss is infinite we return the # distance between the two points. diff --git a/adaptive/tests/test_average_learner.py b/adaptive/tests/test_average_learner.py index f35794a39..5de3ced45 100644 --- a/adaptive/tests/test_average_learner.py +++ b/adaptive/tests/test_average_learner.py @@ -7,8 +7,12 @@ from adaptive.runner import simple +def f_unused(seed): + raise NotImplementedError("This function shouldn't be used.") + + def test_only_returns_new_points(): - learner = AverageLearner(lambda x: x, atol=None, rtol=0.01) + learner = AverageLearner(f_unused, atol=None, rtol=0.01) # Only tell it n = 5...10 for i in range(5, 10): @@ -25,7 +29,7 @@ def test_only_returns_new_points(): @flaky.flaky(max_runs=5) def test_avg_std_and_npoints(): - learner = AverageLearner(lambda x: x, atol=None, rtol=0.01) + learner = AverageLearner(f_unused, atol=None, rtol=0.01) for i in range(300): # This will add 5000 points at random values of n. @@ -63,7 +67,7 @@ def constant_function(seed): def test_zero_mean(): # see https://github.com/python-adaptive/adaptive/issues/275 - learner = AverageLearner(None, rtol=0.01) + learner = AverageLearner(f_unused, rtol=0.01) learner.tell(0, -1) learner.tell(1, 1) learner.loss() diff --git a/adaptive/tests/test_average_learner1d.py b/adaptive/tests/test_average_learner1d.py index 4123241a7..4286f55b9 100644 --- a/adaptive/tests/test_average_learner1d.py +++ b/adaptive/tests/test_average_learner1d.py @@ -16,8 +16,8 @@ def almost_equal_dicts(a, b): def test_tell_many_at_point(): f = generate_random_parametrization(noisy_peak) - learner = AverageLearner1D(f, bounds=[-2, 2]) - control = AverageLearner1D(f, bounds=[-2, 2]) + learner = AverageLearner1D(f, bounds=(-2, 2)) + control = AverageLearner1D(f, bounds=(-2, 2)) learner._recompute_losses_factor = 1 control._recompute_losses_factor = 1 simple_run(learner, 100) diff --git a/adaptive/tests/test_learners.py b/adaptive/tests/test_learners.py index 50b3b6264..43390f400 100644 --- a/adaptive/tests/test_learners.py +++ b/adaptive/tests/test_learners.py @@ -164,7 +164,7 @@ def gaussian(n): return random.gauss(1, 1) -@learn_with(AverageLearner1D, bounds=[-2, 2]) +@learn_with(AverageLearner1D, bounds=(-2, 2)) def noisy_peak( seed_x, sigma: uniform(1.5, 2.5), @@ -271,8 +271,8 @@ def test_uniform_sampling2D(learner_type, f, learner_kwargs): "learner_type, bounds", [ (Learner1D, (-1, 1)), - (Learner2D, [(-1, 1), (-1, 1)]), - (LearnerND, [(-1, 1), (-1, 1), (-1, 1)]), + (Learner2D, ((-1, 1), (-1, 1))), + (LearnerND, ((-1, 1), (-1, 1), (-1, 1))), ], ) def test_learner_accepts_lists(learner_type, bounds): @@ -480,7 +480,9 @@ def test_learner_performance_is_invariant_under_scaling( yscale = 1000 * random.random() l_kwargs = dict(learner_kwargs) - l_kwargs["bounds"] = xscale * np.array(l_kwargs["bounds"]) + bounds = xscale * np.array(l_kwargs["bounds"]) + bounds = tuple((bounds).tolist()) # to satisfy typeguard tests + l_kwargs["bounds"] = bounds def scale_x(x): if isinstance(learner, AverageLearner1D): diff --git a/setup.py b/setup.py index 510119486..144835f32 100644 --- a/setup.py +++ b/setup.py @@ -52,6 +52,7 @@ def get_version_and_cmdclass(package_name): "pytest-timeout", "pre_commit", "pandas", + "typeguard", ], "other": [ "dill", diff --git a/tox.ini b/tox.ini index 5b87ac41b..92ae4b0d0 100644 --- a/tox.ini +++ b/tox.ini @@ -7,6 +7,7 @@ testpaths = adaptive addopts = --durations=5 --cov --cov-append --cov-fail-under=70 -vvv --cov-report= + --typeguard-packages adaptive norecursedirs = docs