Skip to content

Commit

Permalink
Refactor windowed writes
Browse files Browse the repository at this point in the history
  • Loading branch information
GregoryPetrochenkov-NOAA committed Feb 27, 2025
1 parent 9a5e830 commit 6db4cff
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 110 deletions.
3 changes: 3 additions & 0 deletions tools/inundate_gms_optimized.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import argparse
import os
import sys
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed

import pandas as pd
Expand Down Expand Up @@ -112,6 +114,7 @@ def Inundate_gms(
print(f"{hucCode},{branch_id},{exc.__class__.__name__}, {exc}")

except Exception as exc:
traceback.print_exc(file=sys.stdout)
if log_file is not None:
print(f"{hucCode},{branch_id},{exc.__class__.__name__}, {exc}", file=open(log_file, "a"))
else:
Expand Down
120 changes: 70 additions & 50 deletions tools/inundation_optimized.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,33 @@ def inundate(
if src_table is not None:
create_src_subset_csv(hydro_table, catchmentStagesDict, src_table)

depths_profile = rem.profile
inundation_profile = catchments.profile

depths_profile.update(
driver='GTiff',
blockxsize=256,
blockysize=256,
tiled=True,
# compress='lzw'
)
inundation_profile.update(
driver='GTiff',
blockxsize=256,
blockysize=256,
tiled=True,
# compress='lzw'
dtype='int8',
nodata=0,
)

depth_rst = rasterio.open(depths, "w+", **depths_profile) if depths is not None else None
inundation_rst = (
rasterio.open(inundation_raster, "w+", **inundation_profile)
if (inundation_profile is not None)
else None
)

# make windows generator
window_gen = __make_windows_generator(
rem,
Expand All @@ -198,6 +225,10 @@ def inundate(
hucs=hucs,
hucSet=hucSet,
windowed=windowed,
depth_rst=depth_rst,
inundation_rst=inundation_rst,
depth_nodata=depths_profile['nodata'],
inundation_nodata=inundation_profile['nodata'],
)

# executor = ThreadPoolExecutor(max_workers=num_workers)
Expand Down Expand Up @@ -235,20 +266,24 @@ def inundate(

# power down pool
# executor.shutdown(wait=True)
depth_rst.close()
inundation_rst.close()
return inundation_rasters, depth_rasters, inundation_polys


def __inundate_in_huc(
rem_array,
catchments_array,
rem_profile,
catchments_profile,
depth_rst,
inundation_rst,
hucCode,
catchmentStagesDict,
depths,
inundation_raster,
quiet=False,
window=None,
depth_nodata=None,
inundation_nodata=None,
):
"""
Inundate within the chosen scope
Expand All @@ -275,54 +310,30 @@ def __inundate_in_huc(
if hucCode is not None:
__vprint("Inundating {} ...".format(hucCode), not quiet)

depths_profile = rem_profile
inundation_profile = catchments_profile
depths_profile.update(
driver='GTiff',
blockxsize=256,
blockysize=256,
tiled=True,
# compress='lzw'
)
inundation_profile.update(
driver='GTiff',
blockxsize=256,
blockysize=256,
tiled=True,
# compress='lzw'
dtype='int8',
nodata=0,
)

# print("Nodata", rem_profile['nodata'], type(rem_profile['nodata']),
# catchments_profile['nodata'], type(catchments_profile['nodata']))
# make output arrays

out_array = np.tile(np.int8(0), rem_array.shape)
rem, out_array = __go_fast_mapping(
# out_array = np.tile(np.int8(0), rem_array.shape)
rem, catchments = __go_fast_mapping(
rem_array,
catchments_array,
catchmentStagesDict,
rem_array.shape[1],
rem_array.shape[0],
np.int16(rem_profile['nodata']),
np.int16(catchments_profile['nodata']),
out_array,
np.int16(depth_nodata),
np.int16(inundation_nodata),
)

# print(inundation_raster, type(final_array[0][0]), np.max(final_array))

if depths is not None:
with rasterio.open(depths, "w", **depths_profile) as file:
file.write(rem, window=window, indexes=1)
depth_rst.write(rem, window=window, indexes=1)

if inundation_raster is not None:
with rasterio.open(inundation_raster, "w", **inundation_profile) as file:
file.write(out_array, window=window, indexes=1)
inundation_rst.write(catchments, window=window, indexes=1)

return inundation_raster, depths, None


@njit(nogil=True, fastmath=True, parallel=False, cache=False)
def __go_fast_mapping(rem, catchments, catchmentStagesDict, x, y, nodata_r, nodata_c, out_array):
@njit(nogil=True, fastmath=True, parallel=True, cache=True)
def __go_fast_mapping(rem, catchments, catchmentStagesDict, x, y, nodata_r, nodata_c):
"""
Numba optimization for determining flood depth and flood
Expand All @@ -331,7 +342,7 @@ def __go_fast_mapping(rem, catchments, catchmentStagesDict, x, y, nodata_r, noda
rem : numpy array
Relative elevation model values which will be replaced by inundation depth values
catchments : numpy array
Rasterized catchments represented by HydroIDs to be replaced with inundation values
Rasterized catchments represented by HydoIDs to be replaced with inundation values
catchmentStagesDict : numba dictionary
Numba compatible dictionary with HydroID as a key and flood stage as a value
x : int
Expand All @@ -357,23 +368,21 @@ def __go_fast_mapping(rem, catchments, catchmentStagesDict, x, y, nodata_r, noda

# If the depth is greater than approximately 1/10th of a foot
if depth < 30:
catchments[i, j] *= -1 # set HydroIDs to negative
rem[i, j] = 0
out_array[i, j] = np.int8(0)

else:
rem[i, j] = depth
out_array[i, j] = np.int8(1)
else:
rem[i, j] = 0
out_array[i, j] = np.int8(0)
catchments[i, j] *= -1 # set HydroIDs to negative
else:
rem[i, j] = 0
out_array[i, j] = np.int8(0)
catchments[i, j] = nodata_c
else:
rem[i, j] = 0
out_array[i, j] = np.int8(0)
catchments[i, j] = nodata_c

return rem, out_array
return rem, catchments


def __make_windows_generator(
Expand All @@ -388,6 +397,10 @@ def __make_windows_generator(
hucs=None,
hucSet=None,
windowed=False,
depth_rst=None,
inundation_rst=None,
depth_nodata=None,
inundation_nodata=None,
):
"""
Generator to split processing in to windows or different masked datasets
Expand Down Expand Up @@ -489,13 +502,16 @@ def __return_huc_in_hucSet(hucCode, hucSet):
yield (
rem_array,
catchments_array,
rem.profile,
catchments.profile,
depth_rst,
inundation_rst,
hucCode,
catchmentStagesDict,
depths,
inundation_raster,
quiet,
None,
depth_nodata,
inundation_nodata,
)

else:
Expand All @@ -507,28 +523,32 @@ def __return_huc_in_hucSet(hucCode, hucSet):
yield (
rem.read(1, window=window),
catchments.read(1, window=window),
rem.profile,
catchments.profile,
depth_rst,
inundation_rst,
hucCode,
catchmentStagesDict,
depths,
inundation_raster,
quiet,
window,
depth_nodata,
inundation_nodata,
)
else:

yield (
rem.read(1),
catchments.read(1),
rem.profile,
catchments.profile,
depth_rst,
inundation_rst,
hucCode,
catchmentStagesDict,
depths,
inundation_raster,
quiet,
None,
depth_nodata,
inundation_nodata,
)


Expand Down
36 changes: 18 additions & 18 deletions tools/mosaic_inundation.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,28 +137,28 @@ def mosaic_by_unit(

directory, file = os.path.split(mosaic_output)
file, ext = os.path.splitext(file)
merge = os.path.join(directory, file + "_mosaic" + ext)
overlap.merge_rasters(merge, threaded=threaded, workers=workers, nodata=nodata)
# merge = os.path.join(directory, file + "_mosaic" + ext)
overlap.merge_rasters(mosaic_output, threaded=threaded, workers=workers, nodata=nodata)

if mask:
fh.vprint("Masking ...", verbose)
print("Masking Begin", time.localtime())
overlap.mask_mosaic(merge, mask, outfile=mosaic_output, workers=workers)

os.remove(merge)

del overlap
gc.collect()

if remove_inputs:
fh.vprint("Removing inputs ...", verbose)

remove_list = []
for inun_map in inundation_maps_list:
if inun_map is not None and os.path.isfile(inun_map):
remove_list.append(inun_map)

return remove_list
overlap.mask_mosaic(mosaic_output, mask, outfile=mosaic_output, workers=workers)

# os.remove(merge)

# del overlap
# gc.collect()

# if remove_inputs:
# fh.vprint("Removing inputs ...", verbose)
#
# remove_list = []
# for inun_map in inundation_maps_list:
# if inun_map is not None and os.path.isfile(inun_map):
# remove_list.append(inun_map)
#
# return remove_list


def mosaic_final_inundation_extent_to_poly(inundation_raster, inundation_polygon, driver="GPKG"):
Expand Down
8 changes: 4 additions & 4 deletions tools/overlapping_inundation3.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ def mask_mosaic(mosaic, polys, polys_layer=None, outfile=None, workers=4, quiet=
mosaic_read = mosaic_read.sel({'band': 1})
geom = polys['geometry'].values[0]

def write_window(mosaic, geom, window, wrst, lock):
def write_window(geom, window, wrst, lock):
mosaic_slice = mosaic.isel(
y=slice(window.row_off, window.row_off + window.height),
x=slice(window.col_off, window.col_off + window.width),
Expand All @@ -390,8 +390,8 @@ def write_window(mosaic, geom, window, wrst, lock):
gdf_temp['arb'] = np.int8(1)
temp_rast = make_geocube(vector_data=gdf_temp, measurements=['arb'], like=mosaic_slice)
mosaic_slice.data = xr.where(np.isnan(temp_rast['arb']), 0, mosaic_slice.data)
with lock:
wrst.write_band(1, mosaic_slice.data.squeeze(), window=window)
# with lock:
wrst.write_band(1, mosaic_slice.data.squeeze(), window=window)

executor = ThreadPoolExecutor(max_workers=workers)

Expand All @@ -401,7 +401,7 @@ def __data_generator(windows, mosaic, geom, wrst, lock):

lock = Lock()

with rasterio.open(outfile, "w", **profile) as wrst:
with rasterio.open(outfile, "r+", **profile) as wrst:
dgen = __data_generator(windows, mosaic_read, geom, wrst, lock)
results = {executor.submit(write_window, *wg): 1 for wg in dgen}

Expand Down
Loading

0 comments on commit 6db4cff

Please sign in to comment.