diff --git a/wormpose/commands/postprocess_results.py b/wormpose/commands/postprocess_results.py index 89ac2dc..6a9f966 100644 --- a/wormpose/commands/postprocess_results.py +++ b/wormpose/commands/postprocess_results.py @@ -14,7 +14,7 @@ import h5py import numpy as np import numpy.ma as ma -from scipy.interpolate import UnivariateSpline +from scipy.interpolate import interp1d from scipy.signal import savgol_filter from wormpose.commands import _log_parameters @@ -75,61 +75,32 @@ def _get_valid_segments(is_valid_series: np.ndarray, max_gap_size: int, min_segm class _SplineInterpolation(object): - def __init__(self, frames_around: int = 3, spline_degree: int = 3): - - self.spline_degree = spline_degree - self.frames_around = frames_around - self.slope = np.linspace(0, 0.5, frames_around + 1) - - def interpolate_tseries( - self, tseries: np.ndarray, segments_boundaries: Sequence, std_fraction: float - ) -> np.ndarray: - - weight = 1 / (std_fraction * np.nanstd(tseries)) + def interpolate_tseries(self, tseries: np.ndarray, segments_boundaries: Sequence) -> np.ndarray: tseries[~np.isnan(tseries)] = np.unwrap(tseries[~np.isnan(tseries)]) new_tseries = np.full_like(tseries, np.nan) for t0, tf in segments_boundaries: - new_tseries[t0:tf] = self._interpolate_segment(tseries[t0:tf], weight) + new_tseries[t0:tf] = self._interpolate_segment(tseries[t0:tf]) return new_tseries - def _interpolate_segment(self, tseries: np.ndarray, weight: float) -> np.ndarray: + def _interpolate_segment(self, tseries: np.ndarray) -> np.ndarray: new_tseries = np.copy(tseries) - nan_y = np.isnan(new_tseries) - indices_nan = np.any(nan_y, axis=1) - series_len = len(new_tseries) - x = np.arange(series_len) - new_tseries[nan_y] = 0.0 - - w = self.build_weights(indices_nan, series_len, weight) - # perform spline interpolation separately for each dimension for dim in range(new_tseries.shape[1]): - y = new_tseries[:, dim] - spl = UnivariateSpline(x, y, w=w, k=self.spline_degree, s=len(x)) - new_x = x - new_weighted_y = spl(new_x) - new_tseries[:, dim] = new_weighted_y + y0 = new_tseries[:, dim] + xn = np.arange(len(new_tseries)) + sel = ~np.isnan(y0) + x = xn[sel] + y = y0[sel] + f = interp1d(x, y, kind="cubic") + yn = f(xn) + new_tseries[:, dim] = yn return new_tseries - def build_weights(self, indices_nan: np.ndarray, series_len: int, weight: float) -> np.ndarray: - # setup weights: lower the weights closer to the edges - w = np.full(series_len, weight) - where_nan = np.where(indices_nan)[0] - if len(where_nan) == 0: - return w - - for idx in range(series_len): - closest_nan_distance = np.min(np.abs(where_nan - idx)) - if closest_nan_distance <= self.frames_around: - w[idx] = self.slope[closest_nan_distance] * weight - - return w - def _smooth_tseries( tseries: np.ndarray, @@ -211,15 +182,13 @@ def _parse_arguments(dataset_path: str, kwargs: dict): if kwargs.get("work_dir") is None: kwargs["work_dir"] = default_paths.WORK_DIR if kwargs.get("max_gap_size") is None: - kwargs["max_gap_size"] = 3 + kwargs["max_gap_size"] = 4 if kwargs.get("min_segment_size") is None: - kwargs["min_segment_size"] = 11 + kwargs["min_segment_size"] = 8 if kwargs.get("smoothing_window") is None: - kwargs["smoothing_window"] = 7 + kwargs["smoothing_window"] = 8 if kwargs.get("poly_order") is None: kwargs["poly_order"] = 3 - if kwargs.get("std_fraction") is None: - kwargs["std_fraction"] = 0.001 if kwargs.get("eigenworms_matrix_path") is None: kwargs["eigenworms_matrix_path"] = None if kwargs.get("num_process") is None: @@ -294,9 +263,7 @@ def post_process(dataset_path: str, **kwargs): min_segment_size=args.min_segment_size, ) # interpolate and smooth in angles space - thetas_interp = spline_interpolation.interpolate_tseries( - results_raw.theta, segments_boundaries, args.std_fraction - ) + thetas_interp = spline_interpolation.interpolate_tseries(results_raw.theta, segments_boundaries) results_interp = _calculate_skeleton(thetas_interp, args, dataset, video_name) thetas_smooth = _smooth_tseries( @@ -322,6 +289,8 @@ def post_process(dataset_path: str, **kwargs): setattr(results_interp, "modes", _thetas_to_modes(results_interp.theta, eigenworms_matrix)) setattr(results_smooth, "modes", _thetas_to_modes(results_smooth.theta, eigenworms_matrix)) + frame_rate = features.frame_rate + # save results results_saver = ResultsSaver( temp_dir=args.temp_dir, results_root_dir=results_root_dir, results_filename=POSTPROCESSED_RESULTS_FILENAME @@ -332,8 +301,8 @@ def post_process(dataset_path: str, **kwargs): "min_segment_size": args.min_segment_size, "smoothing_window": args.smoothing_window, "poly_order": args.poly_order, - "std_fraction": args.std_fraction, "dorsal_ventral_flip": flipped, + "frame_rate": frame_rate, } results_saver.save( @@ -369,11 +338,6 @@ def main(): help="Only segments of valid values of length greater than min_segment_size (frames)" "will be interpolated and smoothed", ) - parser.add_argument( - "--std_fraction", - type=float, - help="The higher the guessed noise to signal ratio is, the smoother the interpolation will be", - ) parser.add_argument("--smoothing_window", type=int, help="smoothing window in frames") parser.add_argument("--poly_order", type=int, help="polynomial order in smoothing") parser.add_argument("--temp_dir", type=str, help="Where to store temporary intermediate results")