diff --git a/scripts/2_disruption_analysis.py b/scripts/2_disruption_analysis.py index ac10e0b..5ff9041 100644 --- a/scripts/2_disruption_analysis.py +++ b/scripts/2_disruption_analysis.py @@ -1,4 +1,3 @@ -# %% import sys from pathlib import Path @@ -16,7 +15,6 @@ raster_path = Path(load_config()["paths"]["JBA_data"]) -# %% def intersect_features_with_raster(raster_path, raster_key, features): print(f"Intersecting features with raster {raster_key}...") # read the raster data: depth (meter) @@ -62,17 +60,17 @@ def identify_disrupted_links(road_links, flood_key, flood_path, clip_path, out_p flood_path, flood_key, clipped_features ) # export intersected featrues - intersections.to_csv(out_path, index=False) + intersections.to_parquet(out_path, index=False) print("Complete identifying the disrupted links!") return intersections -def compute_maximum_speed_on_flooded_roads(depth, free_flow_speed): - depth = depth * 1000 # m to mm - if depth < 300: # mm: !!! uncertainty analysis (150, 300, 600) +def compute_maximum_speed_on_flooded_roads(depth, free_flow_speed, threshold=30): + depth = depth * 100 # m to cm + if depth < threshold: # cm # value = 0.0009 * depth**2 - 0.5529 * depth + 86.9448 # kmph - value = free_flow_speed * (depth / 300 - 1) ** 2 # mph + value = free_flow_speed * (depth / threshold - 1) ** 2 # mph return value # mph else: return 0.0 # mph @@ -92,80 +90,80 @@ def compute_damage_level_on_flooded_roads( or (road_classification == "A Road" and trunk_road == "true") ): if depth < 150: - return "no" + return 0 # "no" if 150 <= depth < 200: - return "minor" + return 1 # "minor" elif 200 <= depth < 300: - return "moderate" + return 2 # "moderate" elif 300 <= depth < 700: - return "extensive" + return 3 # "extensive" else: - return "severe" + return 4 # "severe" elif hasTunnel == 0 and ( road_classification == "Motorway" or (road_classification == "A Road" and trunk_road == "true") ): if depth < 150: - return "no" + return 0 # "no" if 150 <= depth < 200: - return "no" + return 0 # "no" elif 200 <= depth < 300: - return "no" + return 0 # "no" elif 300 <= depth < 700: - return "minor" + return 1 # "minor" else: - return "moderate" + return 2 # "moderate" else: if depth < 50: - return "no" + return 0 # "no" if 50 <= depth < 100: - return "no" + return 0 # "no" elif 100 <= depth < 200: - return "minor" + return 1 # "minor" elif 200 <= depth < 600: - return "minor" + return 1 # "minor" else: - return "moderate" + return 2 # "moderate" elif fldType == "river": if hasTunnel == 1 and ( road_classification == "Motorway" or (road_classification == "A Road" and trunk_road == "true") ): if depth < 250: - return "no" + return 0 # "no" if 250 <= depth < 300: - return "minor" + return 1 # "minor" elif 300 <= depth < 400: - return "minor" + return 1 # "minor" elif 400 <= depth < 800: - return "moderate" + return 2 # "moderate" else: - return "extensive" + return 3 # "extensive" elif hasTunnel == 0 and ( road_classification == "Motorway" or (road_classification == "A Road" and trunk_road == "true") ): if depth < 250: - return "no" + return 0 # "no" if 250 <= depth < 300: - return "minor" + return 1 # "minor" elif 300 <= depth < 400: - return "moderate" + return 2 # "moderate" elif 400 <= depth < 800: - return "extensive" + return 3 # "extensive" else: - return "severe" + return 4 # "severe" else: if depth < 50: - return "minor" + return 1 # "minor" if 50 <= depth < 100: - return "moderate" + return 2 # "moderate" elif 100 <= depth < 200: - return "moderate" + return 2 # "moderate" elif 200 <= depth < 600: - return "extensive" + return 3 # "extensive" else: - return "severe" + return 4 # "severe" else: print("Please enter the type of flood!") @@ -185,6 +183,16 @@ def main(flood_type=None): }, inplace=True, ) + + # damage level dict + damage_level_dict = { + 0: "no", + 1: "minor", + 2: "moderate", + 3: "extensive", + 4: "severe", + } + # paths flood_path = ( raster_path @@ -208,8 +216,8 @@ def main(flood_type=None): out_path = ( base_path.parent / "outputs" - / "disruption_analysis" - / f"{flood_key}_fld_depth.csv" + / "disruption_analysis_1129" + / f"{flood_key}_fld_depth.geopq" ) # intersection analysis @@ -218,26 +226,52 @@ def main(flood_type=None): flood_key, flood_path, clip_path, - out_path, + out_path, # export to file ) # for debug - # flood_key = "UK_2007_May_FLSW" - # intersections = pd.read_csv( - # base_path.parent - # / "outputs" - # / "disruption_analysis" - # / "UK_2007_May_FLSW_fld_depth.csv" - # ) + # intersections = gpd.read_parquet(out_path) - intersections = intersections.groupby("id", as_index=False)["flood_depth"].max() + # DAMAGE LEVEL ESTIMATION + if flood_type == "surface": + intersections["damage_level"] = intersections.apply( + lambda row: compute_damage_level_on_flooded_roads( + "surface", + row["road_classification"], + row["trunk_road"], + row["hasTunnel"], + row["flood_depth"], + ), + axis=1, + ) + elif flood_type == "river": + intersections["damage_level"] = intersections.apply( + lambda row: compute_damage_level_on_flooded_roads( + "river", + row.road_classification, + row.trunk_road, + row.hasTunnel, + row["flood_depth"], + ), + axis=1, + ) + else: + print("Damage level error: Please enter flood type!") + breakpoint() + intersections.to_parquet(out_path) + intersections = intersections.groupby("id", as_index=False).agg( + {"flood_depth": "max", "damage_level": "max"} + ) + intersections["damage_level"] = intersections.damage_level.map(damage_level_dict) road_links = road_links.merge( - intersections[["id", "flood_depth"]], how="left", on="id" + intersections[["id", "flood_depth", "damage_level"]], how="left", on="id" ) road_links["flood_depth"] = road_links["flood_depth"].fillna(0.0) + road_links["damage_level"] = road_links["damage_level"].fillna("no") - # attach capacity and speed info on D-zero (Edge_flows) + # MAX SPEED ESTIMATION + # attach capacity and speed info on D-0 road_links = road_links.merge( base_scenario_links[ [ @@ -253,39 +287,67 @@ def main(flood_type=None): on="id", ) - # calculate the maximum speeds on flooded road links - road_links["max_speed"] = road_links.apply( - lambda row: compute_maximum_speed_on_flooded_roads( - row["flood_depth"], row["free_flow_speeds"] - ), - axis=1, - ) - - # estimate damage levels + # uncertainties: flood depth threshold for road closure (15, 30, 60 cm) + """ + Embankment height (cm): + Surface River + Motorway and A-trunk 100 200 + Others 0 0 + """ + # motorway and a-trunk if flood_type == "surface": - road_links["damage_level"] = road_links.apply( - lambda row: compute_damage_level_on_flooded_roads( - "surface", - row["road_classification"], - row["trunk_road"], - row["hasTunnel"], - row["flood_depth"], + road_links.loc[ + (road_links["road_classification"] == "Motorway") + | ( + (road_links["road_classification"] == "A Road") + & (road_links["trunk_road"]) + ), + "max_speed", + ] = road_links.apply( + lambda row: compute_maximum_speed_on_flooded_roads( + max(row["flood_depth"] - 1.0, 0.0), # 100 cm embankment height + row["free_flow_speeds"], + threshold=30, ), axis=1, ) elif flood_type == "river": - road_links["damage_level"] = road_links.apply( - lambda row: compute_damage_level_on_flooded_roads( - "river", - row.road_classification, - row.trunk_road, - row.hasTunnel, - row["flood_depth"], + road_links.loc[ + (road_links["road_classification"] == "Motorway") + | ( + (road_links["road_classification"] == "A Road") + & (road_links["trunk_road"]) + ), + "max_speed", + ] = road_links.apply( + lambda row: compute_maximum_speed_on_flooded_roads( + max(row["flood_depth"] - 2.0, 0.0), # 200 cm embankment height + row["free_flow_speeds"], + threshold=30, ), axis=1, ) else: - print("Please enter flood type!") + print("Max speed error: please enter flood type!") + + # others + road_links.loc[ + ~( + (road_links["road_classification"] == "Motorway") + | ( + (road_links["road_classification"] == "A Road") + & (road_links["trunk_road"]) + ) + ), + "max_speed", + ] = road_links.apply( + lambda row: compute_maximum_speed_on_flooded_roads( + row["flood_depth"], # no embankment + row["free_flow_speeds"], + threshold=30, + ), + axis=1, + ) road_links.to_parquet(base_path / "disruption_analysis_1129" / "road_links.pq") road_links.drop(columns="geometry", inplace=True) diff --git a/scripts/3_damage_analysis.py b/scripts/3_damage_analysis.py index 9ebfb7b..27a1103 100644 --- a/scripts/3_damage_analysis.py +++ b/scripts/3_damage_analysis.py @@ -1,4 +1,3 @@ -# %% from pathlib import Path import pandas as pd @@ -17,7 +16,6 @@ raster_path = Path(load_config()["paths"]["JBA_data"]) -# %% def create_damage_curves(damage_ratio_df): list_of_damage_curves = [] cols = damage_ratio_df.columns[1:] @@ -49,12 +47,12 @@ def create_damage_curves(damage_ratio_df): def compute_damage_fraction( road_classification, trunk_road, - hasTunnel, + road_label, # [tunnel, bridge, road] flood_depth, # meter damage_curves, ): # print("Computing damage fractions...") - if hasTunnel == 1 and ( + if road_label == "tunnel" and ( road_classification == "Motorway" or (road_classification == "A Road" and trunk_road) ): @@ -62,7 +60,7 @@ def compute_damage_fraction( C2_damage_fraction = damage_curves["C2"].damage_fraction(flood_depth) return ("C1", C1_damage_fraction, "C2", C2_damage_fraction) - elif hasTunnel == 0 and ( + elif road_label != "tunnel" and ( road_classification == "Motorway" or (road_classification == "A Road" and trunk_road) ): @@ -82,343 +80,430 @@ def compute_damage_values( damage_fraction, road_classification, form_of_way, + urban, lanes, - hasTunnel, - aveBridgeWidth, + road_label, # ["tunnal", "bridge", "road"] damage_level, damage_values, # million £/unit + bridge_width=None, ): - # print("Compute damage values...") - # initialise values mins = [] maxs = [] # bridges - if aveBridgeWidth > 0: + if road_label == "bridge": + assert bridge_width is not None, "Bridge Width is None!" min_bridge = ( - aveBridgeWidth + bridge_width * length * damage_values[f"bridge_{flood_type}"][damage_level]["min"] ) max_bridge = ( - aveBridgeWidth + bridge_width * length * damage_values[f"bridge_{flood_type}"][damage_level]["max"] ) mins.append(min_bridge) maxs.append(max_bridge) - # mean_bridge = ( - # aveBridgeWidth - # * length - # * damage_values[f"bridge_{flood_type}"][damage_level]["mean"] - # ) # tunnels - if hasTunnel == 1: + if road_label == "tunnel": if road_classification == "Motorway": if lanes >= 8: - min_tunnel = ( - length - * 1e-3 - * lanes - * damage_values["tunnel"]["m_lanes_ge8"]["min"] - * damage_fraction - ) - max_tunnel = ( - length - * 1e-3 - * lanes - * damage_values["tunnel"]["m_lanes_ge8"]["max"] - * damage_fraction - ) - # mean_tunnel = ( - # length - # * 1e-3 - # * lanes - # * damage_values["tunnel"]["m_lanes_ge8"]["mean"] - # * damage_fraction - # ) - mins.append(min_tunnel) - maxs.append(max_tunnel) - - else: - min_tunnel = ( - length - * 1e-3 - * lanes - * damage_values["tunnel"]["m_lanes_lt8"]["min"] - * damage_fraction - ) - max_tunnel = ( - length - * 1e-3 - * lanes - * damage_values["tunnel"]["m_lanes_lt8"]["max"] - * damage_fraction - ) - mins.append(min_tunnel) - maxs.append(max_tunnel) - # mean_tunnel = ( - # length - # * 1e-3 - # * lanes - # * damage_values["tunnel"]["m_lanes_lt8"]["mean"] - # * damage_fraction - # ) - - else: - if form_of_way == "Single Carriageway": - if road_classification == "A Road": + if urban == 1: min_tunnel = ( length * 1e-3 * lanes - * damage_values["tunnel"]["a_single"]["min"] + * damage_values["tunnel"]["m_ge8_urb"]["min"] * damage_fraction ) max_tunnel = ( length * 1e-3 * lanes - * damage_values["tunnel"]["a_single"]["max"] + * damage_values["tunnel"]["m_ge8_urb"]["max"] * damage_fraction ) mins.append(min_tunnel) maxs.append(max_tunnel) - # mean_tunnel = ( - # length - # * 1e-3 - # * lanes - # * damage_values["tunnel"]["a_single"]["mean"] - # * damage_fraction - # ) - else: # road_classification == "B Road" + else: min_tunnel = ( length * 1e-3 * lanes - * damage_values["tunnel"]["b_single"]["min"] + * damage_values["tunnel"]["m_ge8_sub"]["min"] * damage_fraction ) max_tunnel = ( length * 1e-3 * lanes - * damage_values["tunnel"]["b_single"]["max"] + * damage_values["tunnel"]["m_ge8_sub"]["max"] * damage_fraction ) mins.append(min_tunnel) maxs.append(max_tunnel) - # mean_tunnel = ( - # length - # * 1e-3 - # * lanes - # * damage_values["tunnel"]["b_single"]["mean"] - # * damage_fraction - # ) - + else: + min_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["tunnel"]["m_lt8"]["min"] + * damage_fraction + ) + max_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["tunnel"]["m_lt8"]["max"] + * damage_fraction + ) + mins.append(min_tunnel) + maxs.append(max_tunnel) + else: + if form_of_way == "Single Carriageway": + if road_classification == "A Road": + if urban == 1: + min_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["tunnel"]["asingle_urb"]["min"] + * damage_fraction + ) + max_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["tunnel"]["asingle_urb"]["max"] + * damage_fraction + ) + mins.append(min_tunnel) + maxs.append(max_tunnel) + else: + min_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["tunnel"]["asingle_sub"]["min"] + * damage_fraction + ) + max_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["tunnel"]["asingle_sub"]["max"] + * damage_fraction + ) + mins.append(min_tunnel) + maxs.append(max_tunnel) + else: # road_classification == "B Road" + if urban == 1: + min_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["tunnel"]["bsingle_urb"]["min"] + * damage_fraction + ) + max_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["tunnel"]["bsingle_urb"]["max"] + * damage_fraction + ) + mins.append(min_tunnel) + maxs.append(max_tunnel) + else: + min_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["tunnel"]["bsingle_sub"]["min"] + * damage_fraction + ) + max_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["tunnel"]["bsingle_sub"]["max"] + * damage_fraction + ) + mins.append(min_tunnel) + maxs.append(max_tunnel) else: if lanes >= 6: + if urban == 1: + min_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["tunnel"]["abdual_ge6_urb"]["min"] + * damage_fraction + ) + max_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["tunnel"]["abdual_ge6_urb"]["max"] + * damage_fraction + ) + mins.append(min_tunnel) + maxs.append(max_tunnel) + else: + min_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["tunnel"]["abdual_ge6_sub"]["min"] + * damage_fraction + ) + max_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["tunnel"]["abdual_ge6_sub"]["max"] + * damage_fraction + ) + mins.append(min_tunnel) + maxs.append(max_tunnel) + else: # lanes < 6 + if urban == 1: + min_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["tunnel"]["abdual_lt6_urb"]["min"] + * damage_fraction + ) + max_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["tunnel"]["abdual_lt6_urb"]["max"] + * damage_fraction + ) + mins.append(min_tunnel) + maxs.append(max_tunnel) + else: + min_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["tunnel"]["abdual_lt6_sub"]["min"] + * damage_fraction + ) + max_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["tunnel"]["abdual_lt6_sub"]["max"] + * damage_fraction + ) + mins.append(min_tunnel) + maxs.append(max_tunnel) + + # ordinary roads: + if road_label == "road": + if road_classification == "Motorway": + if lanes >= 8: + if urban == 1: min_tunnel = ( length * 1e-3 * lanes - * damage_values["tunnel"]["ab_dual_lanes_ge6"]["min"] + * damage_values["road"]["m_ge8_urb"]["min"] * damage_fraction ) max_tunnel = ( length * 1e-3 * lanes - * damage_values["tunnel"]["ab_dual_lanes_ge6"]["max"] + * damage_values["road"]["m_ge8_urb"]["max"] * damage_fraction ) mins.append(min_tunnel) maxs.append(max_tunnel) - # mean_tunnel = ( - # length - # * 1e-3 - # * lanes - # * damage_values["tunnel"]["ab_dual_lanes_ge6"]["mean"] - # * damage_fraction - # ) - else: # lanes < 6 + else: min_tunnel = ( length * 1e-3 * lanes - * damage_values["tunnel"]["ab_dual_lanes_lt6"]["min"] + * damage_values["road"]["m_ge8_sub"]["min"] * damage_fraction ) max_tunnel = ( length * 1e-3 * lanes - * damage_values["tunnel"]["ab_dual_lanes_lt6"]["max"] + * damage_values["road"]["m_ge8_sub"]["max"] * damage_fraction ) mins.append(min_tunnel) maxs.append(max_tunnel) - # mean_tunnel = ( - # length - # * 1e-3 - # * lanes - # * damage_values["tunnel"]["ab_dual_lanes_lt6"]["mean"] - # * damage_fraction - # ) - - # ordinary roads: - if hasTunnel == 0 and aveBridgeWidth == 0: - if road_classification == "Motorway": - if lanes >= 8: - min_road = ( - length - * 1e-3 - * lanes - * damage_values["road"]["m_lanes_ge8"]["min"] - * damage_fraction - ) - max_road = ( - length - * 1e-3 - * lanes - * damage_values["road"]["m_lanes_ge8"]["max"] - * damage_fraction - ) - mins.append(min_road) - maxs.append(max_road) - # mean_road = ( - # length - # * 1e-3 - # * lanes - # * damage_values["road"]["m_lanes_ge8"]["mean"] - # * damage_fraction - # ) else: - min_road = ( + min_tunnel = ( length * 1e-3 * lanes - * damage_values["road"]["m_lanes_lt8"]["min"] + * damage_values["road"]["m_lt8"]["min"] * damage_fraction ) - max_road = ( + max_tunnel = ( length * 1e-3 * lanes - * damage_values["road"]["m_lanes_lt8"]["max"] + * damage_values["road"]["m_lt8"]["max"] * damage_fraction ) - mins.append(min_road) - maxs.append(max_road) - # mean_road = ( - # length - # * 1e-3 - # * lanes - # * damage_values["road"]["m_lanes_lt8"]["mean"] - # * damage_fraction - # ) - + mins.append(min_tunnel) + maxs.append(max_tunnel) else: if form_of_way == "Single Carriageway": if road_classification == "A Road": - min_road = ( - length - * 1e-3 - * lanes - * damage_values["road"]["a_single"]["min"] - * damage_fraction - ) - max_road = ( - length - * 1e-3 - * lanes - * damage_values["road"]["a_single"]["max"] - * damage_fraction - ) - mins.append(min_road) - maxs.append(max_road) - # mean_road = ( - # length - # * 1e-3 - # * lanes - # * damage_values["road"]["a_single"]["mean"] - # * damage_fraction - # ) + if urban == 1: + min_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["road"]["asingle_urb"]["min"] + * damage_fraction + ) + max_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["road"]["asingle_urb"]["max"] + * damage_fraction + ) + mins.append(min_tunnel) + maxs.append(max_tunnel) + else: + min_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["road"]["asingle_sub"]["min"] + * damage_fraction + ) + max_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["road"]["asingle_sub"]["max"] + * damage_fraction + ) + mins.append(min_tunnel) + maxs.append(max_tunnel) else: # road_classification == "B Road" - min_road = ( - length - * 1e-3 - * lanes - * damage_values["road"]["b_single"]["min"] - * damage_fraction - ) - max_road = ( - length - * 1e-3 - * lanes - * damage_values["road"]["b_single"]["max"] - * damage_fraction - ) - mins.append(min_road) - maxs.append(max_road) - # mean_road = ( - # length - # * 1e-3 - # * lanes - # * damage_values["road"]["b_single"]["mean"] - # * damage_fraction - # ) - + if urban == 1: + min_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["road"]["bsingle_urb"]["min"] + * damage_fraction + ) + max_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["road"]["bsingle_urb"]["max"] + * damage_fraction + ) + mins.append(min_tunnel) + maxs.append(max_tunnel) + else: + min_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["road"]["bsingle_sub"]["min"] + * damage_fraction + ) + max_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["road"]["bsingle_sub"]["max"] + * damage_fraction + ) + mins.append(min_tunnel) + maxs.append(max_tunnel) else: if lanes >= 6: - min_road = ( - length - * 1e-3 - * lanes - * damage_values["road"]["ab_dual_lanes_ge6"]["min"] - * damage_fraction - ) - max_road = ( - length - * 1e-3 - * lanes - * damage_values["road"]["ab_dual_lanes_ge6"]["max"] - * damage_fraction - ) - mins.append(min_road) - maxs.append(max_road) - # mean_road = ( - # length - # * 1e-3 - # * lanes - # * damage_values["road"]["ab_dual_lanes_ge6"]["mean"] - # * damage_fraction - # ) + if urban == 1: + min_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["road"]["abdual_ge6_urb"]["min"] + * damage_fraction + ) + max_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["road"]["abdual_ge6_urb"]["max"] + * damage_fraction + ) + mins.append(min_tunnel) + maxs.append(max_tunnel) + else: + min_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["road"]["abdual_ge6_sub"]["min"] + * damage_fraction + ) + max_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["road"]["abdual_ge6_sub"]["max"] + * damage_fraction + ) + mins.append(min_tunnel) + maxs.append(max_tunnel) else: # lanes < 6 - min_road = ( - length - * 1e-3 - * lanes - * damage_values["road"]["ab_dual_lanes_lt6"]["min"] - * damage_fraction - ) - max_road = ( - length - * 1e-3 - * lanes - * damage_values["road"]["ab_dual_lanes_lt6"]["max"] - * damage_fraction - ) - mins.append(min_road) - maxs.append(max_road) - # mean_road = ( - # length - # * 1e-3 - # * lanes - # * damage_values["road"]["ab_dual_lanes_lt6"]["mean"] - # * damage_fraction - # ) + if urban == 1: + min_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["road"]["abdual_lt6_urb"]["min"] + * damage_fraction + ) + max_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["road"]["abdual_lt6_urb"]["max"] + * damage_fraction + ) + mins.append(min_tunnel) + maxs.append(max_tunnel) + else: + min_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["road"]["abdual_lt6_sub"]["min"] + * damage_fraction + ) + max_tunnel = ( + length + * 1e-3 + * lanes + * damage_values["road"]["abdual_lt6_sub"]["max"] + * damage_fraction + ) + mins.append(min_tunnel) + maxs.append(max_tunnel) + min_cost = min(mins) max_cost = max(maxs) mean_cost = np.mean([min_cost, max_cost]) @@ -426,69 +511,111 @@ def compute_damage_values( return min_cost, max_cost, mean_cost -def calculate_damage( - disrupted_links, - damage_curves, - damage_values, - flood_type, -): - # add default columns +def calculate_damage(disrupted_links, damage_curves, damage_values): + """ + Calculate damage fractions and costs for disrupted road links based on flood depth. + + Parameters: + disrupted_links: DataFrame of disrupted road links with necessary attributes. + damage_curves: Dictionary containing damage curves. + damage_values: Dictionary containing damage cost values. + + Returns: + pd.DataFrame: Updated DataFrame with calculated damage fractions and costs. + """ + + # Validate required columns + required_columns = {"flood_depth_surface", "flood_depth_river"} + missing_columns = required_columns - set(disrupted_links.columns) + assert not missing_columns, f"Missing required columns: {missing_columns}" + + # Define damage categories and flood types curves = ["C1", "C2", "C3", "C4", "C5", "C6"] - for col in curves: - disrupted_links[f"{col}_damage_fraction"] = np.nan - disrupted_links[f"{col}_damage_value_min"] = np.nan - disrupted_links[f"{col}_damage_value_max"] = np.nan - disrupted_links[f"{col}_damage_value_mean"] = np.nan - - for idx, row in disrupted_links.iterrows(): - # compute damage fractions (2 damage curves for each scenario) + flood_types = ["surface", "river"] + + # Add default columns using MultiIndex for efficiency + col_tuples = [ + (f"{curve}_{flood_type}_damage_fraction", np.nan) + for curve in curves + for flood_type in flood_types + ] + [ + (f"{curve}_{flood_type}_damage_value_{stat}", np.nan) + for curve in curves + for flood_type in flood_types + for stat in ["min", "max", "mean"] + ] + for col, default in col_tuples: + disrupted_links[col] = default + + def calculate_for_row(row, flood_type): + """ + Helper function to calculate damage fractions and costs for a single row. + """ + # Compute damage fractions curve1, damage_fraction1, curve2, damage_fraction2 = compute_damage_fraction( row.road_classification, row.trunk_road, - row.hasTunnel, - row.flood_depth, + row.road_label, + row[f"flood_depth_{flood_type}"], damage_curves, ) - disrupted_links.loc[idx, f"{curve1}_damage_fraction"] = damage_fraction1 - disrupted_links.loc[idx, f"{curve2}_damage_fraction"] = damage_fraction2 - # compute damage costs (3 damage values for each scenario) - min_cost1, max_cost1, mean_cost1 = compute_damage_values( + # Compute damage values for both curves + damage_values_1 = compute_damage_values( row.geometry.length, flood_type, damage_fraction1, row.road_classification, row.form_of_way, + row.urban, row.lanes, - row.hasTunnel, - row.aveBridgeWidth, - row.damage_level, + row.road_label, + row[f"damage_level_{flood_type}"], damage_values, + row.aveBridgeWidth, ) - min_cost2, max_cost2, mean_cost2 = compute_damage_values( + damage_values_2 = compute_damage_values( row.geometry.length, flood_type, damage_fraction2, row.road_classification, row.form_of_way, + row.urban, row.lanes, - row.hasTunnel, - row.aveBridgeWidth, - row.damage_level, + row.road_label, + row[f"damage_level_{flood_type}"], damage_values, + row.aveBridgeWidth, ) - disrupted_links.loc[idx, f"{curve1}_damage_value_min"] = min_cost1 - disrupted_links.loc[idx, f"{curve1}_damage_value_max"] = max_cost1 - disrupted_links.loc[idx, f"{curve1}_damage_value_mean"] = mean_cost1 - disrupted_links.loc[idx, f"{curve2}_damage_value_min"] = min_cost2 - disrupted_links.loc[idx, f"{curve2}_damage_value_max"] = max_cost2 - disrupted_links.loc[idx, f"{curve2}_damage_value_mean"] = mean_cost2 + # Return a dictionary of results for easier assignment + return { + f"{curve1}_{flood_type}_damage_fraction": damage_fraction1, + f"{curve2}_{flood_type}_damage_fraction": damage_fraction2, + f"{curve1}_{flood_type}_damage_value_min": damage_values_1[0], + f"{curve1}_{flood_type}_damage_value_max": damage_values_1[1], + f"{curve1}_{flood_type}_damage_value_mean": damage_values_1[2], + f"{curve2}_{flood_type}_damage_value_min": damage_values_2[0], + f"{curve2}_{flood_type}_damage_value_max": damage_values_2[1], + f"{curve2}_{flood_type}_damage_value_mean": damage_values_2[2], + } + + # Apply calculation for each flood type + for flood_type in flood_types: + flood_results = disrupted_links.apply( + lambda row: calculate_for_row(row, flood_type), axis=1 + ) + flood_results_df = pd.DataFrame( + list(flood_results) + ) # Convert results to DataFrame + disrupted_links.update( + flood_results_df + ) # Update disrupted_links with the calculated values return disrupted_links -def main(flood_type=None): +def main(): # damage curves """ 4 damage curvse for M and A roads @@ -548,19 +675,53 @@ def main(flood_type=None): } # features - road_links = gpd.read_parquet( - base_path / "disruption_analysis_1129" / "road_links.pq" + # TEST: intersections: split, index_i, index_j, flood_depth + """Required attributes: + bridge_width: numeric or None + flood_depth_surface: numeric or None + flood_depth_river: numeric or None + damage_level_surface: no, minor, moderate, extensive, severe + damage_level_river: no, minor, moderate, extensive, severe + road_label: road, tunnel, bridge + """ + + intersections = gpd.read_parquet( + base_path.parent + / "outputs" + / "disruption_analysis_1129" + / "UK_2007_May_FLSW_RD_5m_4326_fld_depth.geopq" ) - # intersections - disrupted_links = road_links[road_links["flood_depth"] > 0].reset_index(drop=True) - # damage calculation - disrupted_links_with_damage = calculate_damage( - disrupted_links, damage_curves, damage_values, flood_type + + intersections.rename( + columns={ + "flood_depth": "flood_depth_surface", + "damage_level": "damage_level_surface", + }, + inplace=True, + ) + intersections["flood_depth_river"] = 0 + intersections["damage_level_river"] = "no" + intersections["road_label"] = "road" + intersections.loc[intersections.hasTunnel == 1, "road_label"] = "tunnel" + intersections.loc[intersections.aveBridgeWidth > 0, "road_label"] == "bridge" + + intersections_with_damage = calculate_damage( + intersections, damage_curves, damage_values + ) + intersections_with_damage = ( + pd.concat( + [intersections_with_damage["id"], intersections_with_damage.iloc[:, -48:]], + axis=1, + ) + .fillna(0) + .groupby(by=["id"], as_index=False) + .sum() # sum up damage values of all segments -> disrupted links ) # export results - disrupted_links_with_damage.to_csv( - base_path + intersections_with_damage.to_csv( + base_path.parent + / "outputs" / "disruption_analysis_1129" / "disrupted_links_with_damage_values.csv", index=False, @@ -568,4 +729,4 @@ def main(flood_type=None): if __name__ == "__main__": - main(flood_type="surface") + main()