|
| 1 | +import copy |
| 2 | +from typing import Dict, List, NamedTuple, Tuple |
| 3 | +import numpy as np |
| 4 | + |
| 5 | +""" |
| 6 | +A concise implementation of the proposed smart control algorithm by Philippe de Bekker. |
| 7 | +""" |
| 8 | + |
| 9 | +__author__ = "Philippe de Bekker" |
| 10 | +__copyright__ = "Copyright (C) 2022 Philippe de Bekker" |
| 11 | +__version__ = "1.0" |
| 12 | +__maintainer__ = "Philippe de Bekker" |
| 13 | + |
| 14 | +__status__ = "Prototype" |
| 15 | + |
| 16 | +class Battery(NamedTuple): |
| 17 | + """ |
| 18 | + Battery properties for simulating a natural operational environment. |
| 19 | + """ |
| 20 | + |
| 21 | + # Complete battery capacity [kWh] |
| 22 | + capacity: float |
| 23 | + |
| 24 | + # Minimum battery capacity based on Depth of Discharge [kWh] |
| 25 | + min_capacity: float |
| 26 | + |
| 27 | + # Battery capacity at the start of the lookahead window [kWh] |
| 28 | + initial_capacity: float |
| 29 | + |
| 30 | + # Ratio of efficiency per simulated timestamp when charging the battery [%] |
| 31 | + efficiency_charging: float |
| 32 | + |
| 33 | + # Ratio of efficiency per simulated timestamp when discharging the battery [%] |
| 34 | + efficiency_discharging: float |
| 35 | + |
| 36 | + # Maximum amount of energy that can be charged per timestamp in the simulation [kW * h = kWh] |
| 37 | + max_chargable_energy: float |
| 38 | + |
| 39 | + # Maximum amount of energy that can be discharged per timestamp in the simulation [kW * h = kWh] |
| 40 | + max_dischargable_energy: float |
| 41 | + |
| 42 | +class LookaheadWindow(): |
| 43 | + |
| 44 | + def __init__(self, window_size: int, |
| 45 | + residual_power: List[float], |
| 46 | + export_tariffs: List[float], |
| 47 | + import_tarrifs: List[float], |
| 48 | + battery: Battery) -> None: |
| 49 | + """ |
| 50 | + Lookahead window that can be used for some timestamp in a simulation to compute an optimal decision based on forecasts of power, demand and tariffs. |
| 51 | +
|
| 52 | + Args: |
| 53 | + window_size (int): amount of timestamps |
| 54 | + residual_power (List[float]): (estimated) residual power at each timestamp (i.e. power minus demand) [kW * h = kWh] |
| 55 | + export_tariffs (List[float]): (estimated) export tariff at each timestamp [£/kWh] |
| 56 | + import_tarrifs (List[float]): (estimated) import tariff at each timestamp [£/kWh] |
| 57 | + battery (Battery): properties of simulated battery |
| 58 | + """ |
| 59 | + |
| 60 | + self.window_size: int = window_size |
| 61 | + self.residual_power: np.ndarray = np.array(residual_power) |
| 62 | + |
| 63 | + self.initial_capacity = battery.initial_capacity |
| 64 | + self.battery = battery |
| 65 | + self.room_to_charge: np.ndarray = np.full((window_size,), max(0, battery.capacity - battery.initial_capacity)) |
| 66 | + |
| 67 | + self.charging_boundary: float = battery.max_chargable_energy |
| 68 | + self.charging_boundaries: np.ndarray = np.full((window_size,), self.charging_boundary) |
| 69 | + |
| 70 | + self.discharging_boundary: float = battery.max_dischargable_energy |
| 71 | + self.discharging_boundaries: np.ndarray = np.full((window_size,), self.discharging_boundary) |
| 72 | + |
| 73 | + self.start_index = 0 |
| 74 | + self.throughput_values: np.ndarray = np.full((window_size,), max(0, battery.capacity - battery.initial_capacity)) |
| 75 | + |
| 76 | + self.export_tariffs: np.ndarray = np.array(export_tariffs) |
| 77 | + self.import_tariffs: np.ndarray = np.array(import_tarrifs) |
| 78 | + |
| 79 | + def compute(self) -> Dict[str, float]: |
| 80 | + """ |
| 81 | + Implements the proposed smart control algorithm by Philippe de Bekker. |
| 82 | + Step 5 is omitted as the simulated environment does not allow charging the battery using imported energy, however, a comment that explains the implementation is provided. |
| 83 | + Calculates the optimal decision to take for some timestamp based on the provided lookahead window. |
| 84 | +
|
| 85 | + Returns: |
| 86 | + Dict[str, float]: { |
| 87 | + "opt_net_value": optimal amount of energy to export/import [kWh], |
| 88 | + "opt_charge": optimal amount of energy to charge the battery [kWh], |
| 89 | + "opt_discharge": optimal amount of energy to discharge the battery [kWh] |
| 90 | + } |
| 91 | + """ |
| 92 | + |
| 93 | + for t in range(0, self.window_size): |
| 94 | + |
| 95 | + # We skip excess power (past excess power can be used in the future) |
| 96 | + if self.residual_power[t] >= 0: |
| 97 | + continue |
| 98 | + |
| 99 | + ##################################################################################################### |
| 100 | + # Use battery first (other future timestamps can later on swap with this discharged energy as well) # |
| 101 | + ##################################################################################################### |
| 102 | + dischargable_excess_demand = min(-self.residual_power[t], self.discharging_boundaries[t] * self.battery.efficiency_discharging) |
| 103 | + dischargable_battery_soc = max(0, self.initial_capacity - self.battery.min_capacity) |
| 104 | + battery_energy = min(dischargable_excess_demand / self.battery.efficiency_discharging, dischargable_battery_soc) |
| 105 | + |
| 106 | + if battery_energy > 0: |
| 107 | + self.discharging_boundaries[t] -= battery_energy |
| 108 | + self.initial_capacity -= battery_energy |
| 109 | + self.room_to_charge[t:] = [e + battery_energy for e in self.room_to_charge[t:]] |
| 110 | + self.residual_power[t] += battery_energy * self.battery.efficiency_discharging |
| 111 | + |
| 112 | + |
| 113 | + ######################################################################################## |
| 114 | + # Charge using past excess power (sorted by ascending selling prices) to discharge now # |
| 115 | + ######################################################################################## |
| 116 | + temp_throughput_values, start_index = self.__get_rtc_throughput_values(t) |
| 117 | + self.throughput_values = temp_throughput_values |
| 118 | + self.start_index = start_index |
| 119 | + ascending_selling_prices = np.argsort(self.export_tariffs[:t]) |
| 120 | + ascending_buying_prices = np.argsort(self.import_tariffs[:min(t + 1, max(0, self.window_size - 1))]) |
| 121 | + |
| 122 | + |
| 123 | + def charge(input_amount: float, at_t: int, for_t: int) -> None: |
| 124 | + |
| 125 | + # Charge at_t |
| 126 | + battery_amount = input_amount * self.battery.efficiency_charging |
| 127 | + self.residual_power[at_t] -= input_amount |
| 128 | + self.charging_boundaries[at_t] -= battery_amount |
| 129 | + |
| 130 | + |
| 131 | + # Update throughput & room to charge from now on |
| 132 | + self.room_to_charge[at_t:for_t] = [e - battery_amount for e in self.room_to_charge[at_t:for_t]] |
| 133 | + temp_throughput_values, start_index = self.__get_rtc_throughput_values(t) |
| 134 | + self.start_index = start_index |
| 135 | + self.throughput_values = temp_throughput_values |
| 136 | + |
| 137 | + # Discharge for_t |
| 138 | + output_amount = battery_amount * self.battery.efficiency_discharging |
| 139 | + self.residual_power[for_t] += output_amount |
| 140 | + self.discharging_boundaries[for_t] -= battery_amount |
| 141 | + |
| 142 | + |
| 143 | + for i in ascending_selling_prices: |
| 144 | + |
| 145 | + # There is no excess demand to cover anymore or can be discharged |
| 146 | + if self.residual_power[t] >= 0 or self.discharging_boundaries[t] == 0: |
| 147 | + break |
| 148 | + |
| 149 | + # The price of selling in the past vs. covering demand with bought energy now is more profitable |
| 150 | + if self.export_tariffs[i] > self.import_tariffs[t]: |
| 151 | + break |
| 152 | + |
| 153 | + # There is no throughput from this index anymore, look further ahead |
| 154 | + if i < self.start_index: |
| 155 | + continue |
| 156 | + |
| 157 | + # There is excess power at this timestamp, attempt to charge as much as is needed and possible |
| 158 | + if self.residual_power[i] > 0: |
| 159 | + |
| 160 | + # Charge as much as needed (and as possible) from current excess power |
| 161 | + curr_chargable_excess_power = min(self.residual_power[i], min(self.throughput_values[i], self.charging_boundaries[i]) / self.battery.efficiency_charging) # How much we can charge at i |
| 162 | + dischargable_excess_demand = min(-self.residual_power[t], self.discharging_boundaries[t] * self.battery.efficiency_discharging) # How much we want at t |
| 163 | + input_amount = min(dischargable_excess_demand / self.battery.efficiency_discharging, curr_chargable_excess_power) |
| 164 | + charge(input_amount=input_amount, at_t=i, for_t=t) |
| 165 | + |
| 166 | + ######################################################################################### |
| 167 | + # Then, we charge using past discharged energy (swap: at other timestamp we buy energy) # |
| 168 | + ######################################################################################### |
| 169 | + for i in ascending_buying_prices: |
| 170 | + |
| 171 | + # There is no excess demand to cover anymore or can be discharged |
| 172 | + if self.residual_power[t] >= 0 or self.discharging_boundaries[t] == 0: |
| 173 | + break |
| 174 | + |
| 175 | + # We are better off buying energy at our current timestamp, cheaper! |
| 176 | + if self.import_tariffs[t] <= self.import_tariffs[i]: |
| 177 | + break |
| 178 | + |
| 179 | + # There is no throughput from this index anymore, look further ahead |
| 180 | + if i < self.start_index: |
| 181 | + continue |
| 182 | + |
| 183 | + discharged = self.discharging_boundary - self.discharging_boundaries[i] |
| 184 | + if discharged > 0: |
| 185 | + |
| 186 | + # Charge as much as needed (and as possible) from past discharged energy |
| 187 | + curr_dischargable_past_energy = min(discharged, self.throughput_values[i]) |
| 188 | + dischargable_excess_demand = min(-self.residual_power[t], self.discharging_boundaries[t] * self.battery.efficiency_discharging) |
| 189 | + |
| 190 | + output_energy = min(curr_dischargable_past_energy * self.battery.efficiency_discharging, dischargable_excess_demand) |
| 191 | + battery_energy = output_energy / self.battery.efficiency_discharging |
| 192 | + |
| 193 | + # Buy energy at that timestamp (equivalent to adding excess demand which needs to be bought later) |
| 194 | + self.residual_power[i] -= output_energy |
| 195 | + self.discharging_boundaries[i] += battery_energy |
| 196 | + |
| 197 | + # Discharge at current timestamp |
| 198 | + self.residual_power[t] += output_energy |
| 199 | + self.discharging_boundaries[t] -= battery_energy |
| 200 | + |
| 201 | + |
| 202 | + # Update throughput & room to charge from past moment until now |
| 203 | + self.room_to_charge[i:t] = [e - battery_energy for e in self.room_to_charge[i:t]] |
| 204 | + temp_throughput_values, start_index = self.__get_rtc_throughput_values(t) |
| 205 | + self.throughput_values = temp_throughput_values |
| 206 | + self.start_index = start_index |
| 207 | + |
| 208 | + ################################################################################################################################## |
| 209 | + # If profitable, we also look at whether buying energy in the past and charging the battery with it to discharge now is possible # |
| 210 | + ################################################################################################################################## |
| 211 | + """ |
| 212 | + TODO (this scenario was not applicable in the simulated environment of the research): |
| 213 | + Sort on import energy prices until temp throughput is zero and buy as much energy as is needed while cheaper and is possible to charge at that moment |
| 214 | + """ |
| 215 | + |
| 216 | + #################################################### |
| 217 | + # Otherwise, we buy energy (keep as excess demand) # |
| 218 | + #################################################### |
| 219 | + |
| 220 | + # Left over residual power at original timestamp (0): sell all excess power & buy all excess demand |
| 221 | + return { |
| 222 | + "opt_net_value": self.residual_power[0], |
| 223 | + "opt_charge": self.charging_boundary - self.charging_boundaries[0], |
| 224 | + "opt_discharge": self.discharging_boundary - self.discharging_boundaries[0] |
| 225 | + } |
| 226 | + |
| 227 | + |
| 228 | + def __get_rtc_throughput_values(self, t: int) -> Tuple[List[int], int]: |
| 229 | + """ |
| 230 | + Gets the maximal throughput per timestamp before timestamp t - needed for the the room_to_charge list. |
| 231 | + In addition, the starting index of the first positive throughput after some bottleneck is provided. |
| 232 | + RtC values of 0 are a bottleneck for throughput, these simulated timestamps cannot carry more energy that is needed at t. |
| 233 | +
|
| 234 | + Args: |
| 235 | + t (int): index of current timestamp |
| 236 | +
|
| 237 | + Returns: |
| 238 | + Tuple[List[int], int]: |
| 239 | + - list of max throughput per timestamp |
| 240 | + - starting index of positive throughput |
| 241 | + """ |
| 242 | + |
| 243 | + throughput_values = [] |
| 244 | + max_throughput: float = float("inf") |
| 245 | + |
| 246 | + start_index = 0 |
| 247 | + start_index_changed = False |
| 248 | + |
| 249 | + for i in range(max(0, self.window_size - 1), -1, -1): |
| 250 | + temp_rtc = self.room_to_charge[i] |
| 251 | + max_throughput = min(max_throughput, temp_rtc) |
| 252 | + if not start_index_changed and max_throughput == 0: |
| 253 | + start_index = min(i + 1, t) |
| 254 | + start_index_changed = True |
| 255 | + throughput_values.append(max_throughput) |
| 256 | + |
| 257 | + throughput_values.reverse() |
| 258 | + return throughput_values, start_index |
| 259 | + |
| 260 | +class OutputModel(NamedTuple): |
| 261 | + # Amount of energy imported during a simulation [kWh] |
| 262 | + energy_bought: np.ndarray |
| 263 | + |
| 264 | + # Amount of energy exported during a simulation [kWh] |
| 265 | + energy_sold: np.ndarray |
| 266 | + |
| 267 | + # Total profit made by exporting energy during a simulation [£] |
| 268 | + profit: float |
| 269 | + |
| 270 | + # Total loss made by importing energy during a simulation [£] |
| 271 | + loss: float |
| 272 | + |
| 273 | +class OptimizedModel(): |
| 274 | + |
| 275 | + def __init__(self, battery: Battery, |
| 276 | + residual_power: List[float], |
| 277 | + export_tariffs: List[float], |
| 278 | + import_tariffs: List[float], |
| 279 | + lookahead: int, |
| 280 | + time: int) -> None: |
| 281 | + """ |
| 282 | + Creates a model that can be used for simulating an environment to run the smart control algorithm proposed by Philippe de Bekker. |
| 283 | +
|
| 284 | + Args: |
| 285 | + battery (Battery): properties of simulated battery in provided environment |
| 286 | + residual_power (List[float]): generated power subtracted by demand per timestamp |
| 287 | + export_tariffs (List[float]): export tariff per timestamp |
| 288 | + import_tariffs (List[float]): import tariff per timestamp |
| 289 | + lookahead (int): amount of timestamps that are provided in the lookahead window |
| 290 | + time (int): duration of the simulation (amount of timestamps) |
| 291 | + """ |
| 292 | + |
| 293 | + self.battery = battery |
| 294 | + self.capacity = 0 |
| 295 | + |
| 296 | + self.residual_power = residual_power |
| 297 | + self.export_tariffs = export_tariffs |
| 298 | + self.import_tariffs = import_tariffs |
| 299 | + |
| 300 | + self.lookahead = lookahead |
| 301 | + self.time = time |
| 302 | + |
| 303 | + |
| 304 | + def run(self, show_progress: bool = False) -> OutputModel: |
| 305 | + """ |
| 306 | + Runs a smart control algorithm in a simulated environment provided by the user to ultimately assess the performance by returning various statistics. |
| 307 | +
|
| 308 | + Args: |
| 309 | + show_progress (bool, optional): Show progress of algorithm in the console (useful for long runs). Defaults to False. |
| 310 | +
|
| 311 | + Returns: |
| 312 | + OutputModel: NamedTuple consisting of energy_bought, energy_sold, profit, loss |
| 313 | + """ |
| 314 | + |
| 315 | + length_res, length_exp, length_imp = len(self.residual_power), len(self.export_tariffs), len(self.import_tariffs) |
| 316 | + |
| 317 | + time = min([self.time, length_res, length_exp, length_imp]) |
| 318 | + lookahead = min(time, self.lookahead) |
| 319 | + capacity = self.battery.initial_battery_capacity |
| 320 | + |
| 321 | + profit = 0 |
| 322 | + loss = 0 |
| 323 | + |
| 324 | + energy_bought = np.zeros(time) |
| 325 | + energy_sold = np.zeros(time) |
| 326 | + |
| 327 | + if show_progress: |
| 328 | + print('[MODEL STARTED RUNNING]') |
| 329 | + |
| 330 | + for t in np.arange(time): |
| 331 | + |
| 332 | + if show_progress and t % 1000 == 0: |
| 333 | + print(f"Progress: {(float(t) / time * 100):,.1f}%") |
| 334 | + |
| 335 | + start = t |
| 336 | + end = min(time, t + lookahead) |
| 337 | + window_size = end - start |
| 338 | + |
| 339 | + # Note: can be replaced with estimations (forecasts) or altered values by functions |
| 340 | + lookahead_window_residual_power = copy.deepcopy(self.residual_power[start:end]) |
| 341 | + lookahead_window_export_tariffs = copy.deepcopy(self.export_tariffs[start:end]) |
| 342 | + lookahead_window_import_tariffs = copy.deepcopy(self.import_tariffs[start:end]) |
| 343 | + |
| 344 | + lookahead_window = LookaheadWindow( |
| 345 | + window_size = window_size, |
| 346 | + residual_power = lookahead_window_residual_power, |
| 347 | + export_tariffs = lookahead_window_export_tariffs, |
| 348 | + import_tarrifs = lookahead_window_import_tariffs, |
| 349 | + battery = self.battery |
| 350 | + ) |
| 351 | + |
| 352 | + opt_estimation = lookahead_window.compute() |
| 353 | + opt_charge = min([opt_estimation["opt_charge"], self.battery.max_chargable_energy, self.battery.capacity - capacity]) |
| 354 | + opt_discharge = min([opt_estimation["opt_discharge"], self.battery.max_dischargable_energy, max(0, capacity - self.battery.min_capacity)]) |
| 355 | + opt_res_power = self.residual_power[t] - opt_charge / self.battery.efficiency_charging + opt_discharge * self.battery.efficiency_discharging |
| 356 | + |
| 357 | + capacity += opt_charge - opt_discharge |
| 358 | + if opt_res_power >= 0: |
| 359 | + profit += opt_res_power * lookahead_window.export_tariffs[0] |
| 360 | + energy_sold[t] = opt_res_power |
| 361 | + else: |
| 362 | + loss -= opt_res_power * lookahead_window.import_tariffs[0] |
| 363 | + energy_bought[t] = -opt_res_power |
| 364 | + |
| 365 | + return OutputModel(energy_bought, energy_sold, profit, loss) |
0 commit comments