From e6bbd6e1cb1854a1ae992b878a1b75b94398d5dc Mon Sep 17 00:00:00 2001 From: misi9170 <39596329+misi9170@users.noreply.github.com> Date: Wed, 5 Feb 2025 11:06:38 -0500 Subject: [PATCH] Add parallel solve for `sample_flow_at_points` (#1059) * Running sample_flow_at_points in all interface options. * Reducing code repetitions. * Reduce code repetition in run() call. * Add test for sample_flow_at_points --- floris/par_floris_model.py | 165 +++++++++++++++++++--------- tests/par_floris_model_unit_test.py | 26 +++++ 2 files changed, 139 insertions(+), 52 deletions(-) diff --git a/floris/par_floris_model.py b/floris/par_floris_model.py index bfb60b300..8c3e5dce8 100644 --- a/floris/par_floris_model.py +++ b/floris/par_floris_model.py @@ -8,6 +8,9 @@ from floris.core import State from floris.floris_model import FlorisModel +from floris.type_dec import ( + NDArrayFloat, +) class ParFlorisModel(FlorisModel): @@ -123,81 +126,111 @@ def run(self) -> None: t0 = timerpc() super().run() t1 = timerpc() - elif self.interface == "multiprocessing": + self._print_timings(t0, t1, None, None) + else: t0 = timerpc() self.core.initialize_domain() parallel_run_inputs = self._preprocessing() t1 = timerpc() - if self.return_turbine_powers_only: - with self._PoolExecutor(self.max_workers) as p: - self._turbine_powers_split = p.starmap( - _parallel_run_powers_only, + if self.interface == "multiprocessing": + if self.return_turbine_powers_only: + with self._PoolExecutor(self.max_workers) as p: + self._turbine_powers_split = p.starmap( + _parallel_run_powers_only, + parallel_run_inputs + ) + else: + with self._PoolExecutor(self.max_workers) as p: + self._fmodels_split = p.starmap(_parallel_run, parallel_run_inputs) + elif self.interface == "pathos": + if self.return_turbine_powers_only: + self._turbine_powers_split = self.pathos_pool.map( + _parallel_run_powers_only_map, parallel_run_inputs ) - else: - with self._PoolExecutor(self.max_workers) as p: - self._fmodels_split = p.starmap(_parallel_run, parallel_run_inputs) + else: + self._fmodels_split = self.pathos_pool.map( + _parallel_run_map, + parallel_run_inputs + ) + elif self.interface == "concurrent": + if self.return_turbine_powers_only: + with self._PoolExecutor(self.max_workers) as p: + self._turbine_powers_split = p.map( + _parallel_run_powers_only_map, + parallel_run_inputs + ) + self._turbine_powers_split = list(self._turbine_powers_split) + else: + with self._PoolExecutor(self.max_workers) as p: + self._fmodels_split = p.map( + _parallel_run_map, + parallel_run_inputs + ) + self._fmodels_split = list(self._fmodels_split) t2 = timerpc() self._postprocessing() self.core.farm.finalize(self.core.grid.unsorted_indices) self.core.state = State.USED t3 = timerpc() - elif self.interface == "pathos": + self._print_timings(t0, t1, t2, t3) + + def sample_flow_at_points(self, x: NDArrayFloat, y: NDArrayFloat, z: NDArrayFloat): + """ + Sample the flow field at specified points. + + Args: + x: The x-coordinates of the points. + y: The y-coordinates of the points. + z: The z-coordinates of the points. + + Returns: + NDArrayFloat: The wind speeds at the specified points. + """ + if self.return_turbine_powers_only: + raise NotImplementedError( + "Sampling flow at points is not supported when " + "return_turbine_powers_only is set to True on ParFlorisModel." + ) + + if self.interface is None: t0 = timerpc() - self.core.initialize_domain() - parallel_run_inputs = self._preprocessing() + sampled_wind_speeds = super().sample_flow_at_points(x, y, z) t1 = timerpc() - if self.return_turbine_powers_only: - self._turbine_powers_split = self.pathos_pool.map( - _parallel_run_powers_only_map, - parallel_run_inputs - ) - else: - self._fmodels_split = self.pathos_pool.map( - _parallel_run_map, - parallel_run_inputs - ) - t2 = timerpc() - self._postprocessing() - self.core.farm.finalize(self.core.grid.unsorted_indices) - self.core.state = State.USED - t3 = timerpc() - elif self.interface == "concurrent": + self._print_timings(t0, t1, None, None) + else: t0 = timerpc() self.core.initialize_domain() parallel_run_inputs = self._preprocessing() + parallel_sample_flow_at_points_inputs = [ + (fmodel_dict, control_setpoints, x, y, z) + for fmodel_dict, control_setpoints in parallel_run_inputs + ] t1 = timerpc() - if self.return_turbine_powers_only: + if self.interface == "multiprocessing": with self._PoolExecutor(self.max_workers) as p: - self._turbine_powers_split = p.map( - _parallel_run_powers_only_map, - parallel_run_inputs + sampled_wind_speeds_p = p.starmap( + _parallel_sample_flow_at_points, + parallel_sample_flow_at_points_inputs ) - self._turbine_powers_split = list(self._turbine_powers_split) - else: + elif self.interface == "pathos": + sampled_wind_speeds_p = self.pathos_pool.map( + _parallel_sample_flow_at_points_map, + parallel_sample_flow_at_points_inputs + ) + elif self.interface == "concurrent": with self._PoolExecutor(self.max_workers) as p: - self._fmodels_split = p.map( - _parallel_run_map, - parallel_run_inputs + sampled_wind_speeds_p = p.map( + _parallel_sample_flow_at_points_map, + parallel_sample_flow_at_points_inputs ) - self._fmodels_split = list(self._fmodels_split) + sampled_wind_speeds_p = list(sampled_wind_speeds_p) t2 = timerpc() - self._postprocessing() - self.core.farm.finalize(self.core.grid.unsorted_indices) - self.core.state = State.USED + sampled_wind_speeds = np.concatenate(sampled_wind_speeds_p, axis=0) t3 = timerpc() - if self.print_timings: - print("===============================================================================") - if self.interface is None: - print(f"Total time spent for serial calculation (interface=None): {t1 - t0:.3f} s") - else: - print( - "Total time spent for parallel calculation " - f"({self.max_workers} workers): {t3-t0:.3f} s" - ) - print(f" Time spent in parallel preprocessing: {t1-t0:.3f} s") - print(f" Time spent in parallel loop execution: {t2-t1:.3f} s.") - print(f" Time spent in parallel postprocessing: {t3-t2:.3f} s") + self._print_timings(t0, t1, t2, t3) + + return sampled_wind_speeds def _preprocessing(self): """ @@ -278,6 +311,23 @@ def _postprocessing(self): axis=0 ) + def _print_timings(self, t0, t1, t2, t3): + """ + Print the timings for the parallel execution. + """ + if self.print_timings: + print("===============================================================================") + if self.interface is None: + print(f"Total time spent for serial calculation (interface=None): {t1 - t0:.3f} s") + else: + print( + "Total time spent for parallel calculation " + f"({self.max_workers} workers): {t3-t0:.3f} s" + ) + print(f" Time spent in parallel preprocessing: {t1-t0:.3f} s") + print(f" Time spent in parallel loop execution: {t2-t1:.3f} s.") + print(f" Time spent in parallel postprocessing: {t3-t2:.3f} s") + def _get_turbine_powers(self): """ Calculates the power at each turbine in the wind farm. @@ -364,3 +414,14 @@ def _parallel_run_powers_only_map(x): Wrapper for unpacking inputs to _parallel_run_powers_only() for use with map(). """ return _parallel_run_powers_only(*x) + +def _parallel_sample_flow_at_points(fmodel_dict, set_kwargs, x, y, z): + fmodel = FlorisModel(fmodel_dict) + fmodel.set(**set_kwargs) + return fmodel.sample_flow_at_points(x, y, z) + +def _parallel_sample_flow_at_points_map(x): + """ + Wrapper for unpacking inputs to _parallel_sample_flow_at_points() for use with map(). + """ + return _parallel_sample_flow_at_points(*x) diff --git a/tests/par_floris_model_unit_test.py b/tests/par_floris_model_unit_test.py index 9e56e4d8c..9f5669a3c 100644 --- a/tests/par_floris_model_unit_test.py +++ b/tests/par_floris_model_unit_test.py @@ -286,3 +286,29 @@ def test_control_setpoints(sample_inputs_fixture): assert powers_fmodel.shape == powers_pfmodel.shape assert np.allclose(powers_fmodel, powers_pfmodel) + +def test_sample_flow_at_points(sample_inputs_fixture): + + sample_inputs_fixture.core["wake"]["model_strings"]["velocity_model"] = VELOCITY_MODEL + sample_inputs_fixture.core["wake"]["model_strings"]["deflection_model"] = DEFLECTION_MODEL + + fmodel = FlorisModel(sample_inputs_fixture.core) + + wind_speeds = np.array([8.0, 8.0, 9.0]) + wind_directions = np.array([270.0, 270.0, 270.0]) + fmodel.set( + wind_directions=wind_speeds.flatten(), + wind_speeds=wind_directions.flatten(), + turbulence_intensities=0.06 * np.ones_like(wind_speeds.flatten()), + ) + + x_test = np.array([500.0, 750.0, 1000.0, 1250.0, 1500.0]) + y_test = np.array([0.0, 0.0, 0.0, 0.0, 0.0]) + z_test = np.array([90.0, 90.0, 90.0, 90.0, 90.0]) + + ws_base = fmodel.sample_flow_at_points(x_test, y_test, z_test) + + for interface in ["multiprocessing", "pathos", "concurrent"]: + pfmodel = ParFlorisModel(fmodel, max_workers=2, interface=interface) + ws_test = pfmodel.sample_flow_at_points(x_test, y_test, z_test) + assert np.allclose(ws_base, ws_test)