Skip to content

Commit

Permalink
Add parallel solve for sample_flow_at_points (#1059)
Browse files Browse the repository at this point in the history
* Running sample_flow_at_points in all interface options.

* Reducing code repetitions.

* Reduce code repetition in run() call.

* Add test for sample_flow_at_points
  • Loading branch information
misi9170 authored Feb 5, 2025
1 parent a14193b commit e6bbd6e
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 52 deletions.
165 changes: 113 additions & 52 deletions floris/par_floris_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

from floris.core import State
from floris.floris_model import FlorisModel
from floris.type_dec import (
NDArrayFloat,
)


class ParFlorisModel(FlorisModel):
Expand Down Expand Up @@ -123,81 +126,111 @@ def run(self) -> None:
t0 = timerpc()
super().run()
t1 = timerpc()
elif self.interface == "multiprocessing":
self._print_timings(t0, t1, None, None)
else:
t0 = timerpc()
self.core.initialize_domain()
parallel_run_inputs = self._preprocessing()
t1 = timerpc()
if self.return_turbine_powers_only:
with self._PoolExecutor(self.max_workers) as p:
self._turbine_powers_split = p.starmap(
_parallel_run_powers_only,
if self.interface == "multiprocessing":
if self.return_turbine_powers_only:
with self._PoolExecutor(self.max_workers) as p:
self._turbine_powers_split = p.starmap(
_parallel_run_powers_only,
parallel_run_inputs
)
else:
with self._PoolExecutor(self.max_workers) as p:
self._fmodels_split = p.starmap(_parallel_run, parallel_run_inputs)
elif self.interface == "pathos":
if self.return_turbine_powers_only:
self._turbine_powers_split = self.pathos_pool.map(
_parallel_run_powers_only_map,
parallel_run_inputs
)
else:
with self._PoolExecutor(self.max_workers) as p:
self._fmodels_split = p.starmap(_parallel_run, parallel_run_inputs)
else:
self._fmodels_split = self.pathos_pool.map(
_parallel_run_map,
parallel_run_inputs
)
elif self.interface == "concurrent":
if self.return_turbine_powers_only:
with self._PoolExecutor(self.max_workers) as p:
self._turbine_powers_split = p.map(
_parallel_run_powers_only_map,
parallel_run_inputs
)
self._turbine_powers_split = list(self._turbine_powers_split)
else:
with self._PoolExecutor(self.max_workers) as p:
self._fmodels_split = p.map(
_parallel_run_map,
parallel_run_inputs
)
self._fmodels_split = list(self._fmodels_split)
t2 = timerpc()
self._postprocessing()
self.core.farm.finalize(self.core.grid.unsorted_indices)
self.core.state = State.USED
t3 = timerpc()
elif self.interface == "pathos":
self._print_timings(t0, t1, t2, t3)

def sample_flow_at_points(self, x: NDArrayFloat, y: NDArrayFloat, z: NDArrayFloat):
"""
Sample the flow field at specified points.
Args:
x: The x-coordinates of the points.
y: The y-coordinates of the points.
z: The z-coordinates of the points.
Returns:
NDArrayFloat: The wind speeds at the specified points.
"""
if self.return_turbine_powers_only:
raise NotImplementedError(
"Sampling flow at points is not supported when "
"return_turbine_powers_only is set to True on ParFlorisModel."
)

if self.interface is None:
t0 = timerpc()
self.core.initialize_domain()
parallel_run_inputs = self._preprocessing()
sampled_wind_speeds = super().sample_flow_at_points(x, y, z)
t1 = timerpc()
if self.return_turbine_powers_only:
self._turbine_powers_split = self.pathos_pool.map(
_parallel_run_powers_only_map,
parallel_run_inputs
)
else:
self._fmodels_split = self.pathos_pool.map(
_parallel_run_map,
parallel_run_inputs
)
t2 = timerpc()
self._postprocessing()
self.core.farm.finalize(self.core.grid.unsorted_indices)
self.core.state = State.USED
t3 = timerpc()
elif self.interface == "concurrent":
self._print_timings(t0, t1, None, None)
else:
t0 = timerpc()
self.core.initialize_domain()
parallel_run_inputs = self._preprocessing()
parallel_sample_flow_at_points_inputs = [
(fmodel_dict, control_setpoints, x, y, z)
for fmodel_dict, control_setpoints in parallel_run_inputs
]
t1 = timerpc()
if self.return_turbine_powers_only:
if self.interface == "multiprocessing":
with self._PoolExecutor(self.max_workers) as p:
self._turbine_powers_split = p.map(
_parallel_run_powers_only_map,
parallel_run_inputs
sampled_wind_speeds_p = p.starmap(
_parallel_sample_flow_at_points,
parallel_sample_flow_at_points_inputs
)
self._turbine_powers_split = list(self._turbine_powers_split)
else:
elif self.interface == "pathos":
sampled_wind_speeds_p = self.pathos_pool.map(
_parallel_sample_flow_at_points_map,
parallel_sample_flow_at_points_inputs
)
elif self.interface == "concurrent":
with self._PoolExecutor(self.max_workers) as p:
self._fmodels_split = p.map(
_parallel_run_map,
parallel_run_inputs
sampled_wind_speeds_p = p.map(
_parallel_sample_flow_at_points_map,
parallel_sample_flow_at_points_inputs
)
self._fmodels_split = list(self._fmodels_split)
sampled_wind_speeds_p = list(sampled_wind_speeds_p)
t2 = timerpc()
self._postprocessing()
self.core.farm.finalize(self.core.grid.unsorted_indices)
self.core.state = State.USED
sampled_wind_speeds = np.concatenate(sampled_wind_speeds_p, axis=0)
t3 = timerpc()
if self.print_timings:
print("===============================================================================")
if self.interface is None:
print(f"Total time spent for serial calculation (interface=None): {t1 - t0:.3f} s")
else:
print(
"Total time spent for parallel calculation "
f"({self.max_workers} workers): {t3-t0:.3f} s"
)
print(f" Time spent in parallel preprocessing: {t1-t0:.3f} s")
print(f" Time spent in parallel loop execution: {t2-t1:.3f} s.")
print(f" Time spent in parallel postprocessing: {t3-t2:.3f} s")
self._print_timings(t0, t1, t2, t3)

return sampled_wind_speeds

def _preprocessing(self):
"""
Expand Down Expand Up @@ -278,6 +311,23 @@ def _postprocessing(self):
axis=0
)

def _print_timings(self, t0, t1, t2, t3):
"""
Print the timings for the parallel execution.
"""
if self.print_timings:
print("===============================================================================")
if self.interface is None:
print(f"Total time spent for serial calculation (interface=None): {t1 - t0:.3f} s")
else:
print(
"Total time spent for parallel calculation "
f"({self.max_workers} workers): {t3-t0:.3f} s"
)
print(f" Time spent in parallel preprocessing: {t1-t0:.3f} s")
print(f" Time spent in parallel loop execution: {t2-t1:.3f} s.")
print(f" Time spent in parallel postprocessing: {t3-t2:.3f} s")

def _get_turbine_powers(self):
"""
Calculates the power at each turbine in the wind farm.
Expand Down Expand Up @@ -364,3 +414,14 @@ def _parallel_run_powers_only_map(x):
Wrapper for unpacking inputs to _parallel_run_powers_only() for use with map().
"""
return _parallel_run_powers_only(*x)

def _parallel_sample_flow_at_points(fmodel_dict, set_kwargs, x, y, z):
fmodel = FlorisModel(fmodel_dict)
fmodel.set(**set_kwargs)
return fmodel.sample_flow_at_points(x, y, z)

def _parallel_sample_flow_at_points_map(x):
"""
Wrapper for unpacking inputs to _parallel_sample_flow_at_points() for use with map().
"""
return _parallel_sample_flow_at_points(*x)
26 changes: 26 additions & 0 deletions tests/par_floris_model_unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,29 @@ def test_control_setpoints(sample_inputs_fixture):

assert powers_fmodel.shape == powers_pfmodel.shape
assert np.allclose(powers_fmodel, powers_pfmodel)

def test_sample_flow_at_points(sample_inputs_fixture):

sample_inputs_fixture.core["wake"]["model_strings"]["velocity_model"] = VELOCITY_MODEL
sample_inputs_fixture.core["wake"]["model_strings"]["deflection_model"] = DEFLECTION_MODEL

fmodel = FlorisModel(sample_inputs_fixture.core)

wind_speeds = np.array([8.0, 8.0, 9.0])
wind_directions = np.array([270.0, 270.0, 270.0])
fmodel.set(
wind_directions=wind_speeds.flatten(),
wind_speeds=wind_directions.flatten(),
turbulence_intensities=0.06 * np.ones_like(wind_speeds.flatten()),
)

x_test = np.array([500.0, 750.0, 1000.0, 1250.0, 1500.0])
y_test = np.array([0.0, 0.0, 0.0, 0.0, 0.0])
z_test = np.array([90.0, 90.0, 90.0, 90.0, 90.0])

ws_base = fmodel.sample_flow_at_points(x_test, y_test, z_test)

for interface in ["multiprocessing", "pathos", "concurrent"]:
pfmodel = ParFlorisModel(fmodel, max_workers=2, interface=interface)
ws_test = pfmodel.sample_flow_at_points(x_test, y_test, z_test)
assert np.allclose(ws_base, ws_test)

0 comments on commit e6bbd6e

Please sign in to comment.