Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue/mcc lcc #52

Draft
wants to merge 21 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion gradboost_pv/preprocessing/region_filtered.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Preprocess NWP data using geospatial mask"""
import itertools
import logging
import multiprocessing as mp
import os
from pathlib import Path
from typing import Iterable, Tuple, Union

Expand All @@ -14,13 +16,18 @@

from gradboost_pv.models.utils import DEFAULT_DIRECTORY_TO_PROCESSED_NWP

logger = logging.getLogger(__name__)

ESO_GEO_JSON_URL = (
"https://data.nationalgrideso.com/backend/dataset/2810092e-d4b2-472f-b955-d8bea01f9ec0/"
"resource/08534dae-5408-4e31-8639-b579c8f1c50b/download/gsp_regions_20220314.geojson"
)

# processing takes quite a long time, so take a subset for now.
DEFAULT_VARIABLES_FOR_PROCESSING = [
"mcc",
"lcc",
"hcc",
"dswrf",
# "hcct",
"lcc",
Expand Down Expand Up @@ -78,6 +85,8 @@ def process_eso_uk_multipolygon(uk_shape: gpd.GeoDataFrame) -> MultiPolygon:
Returns:
MultiPolygon: Object representing the UK-region.
"""
logger.info("Processing UK region shapefile")

concat_poly = unary_union(uk_shape["geometry"].values)

return MultiPolygon(Polygon(p.exterior) for p in concat_poly.geoms)
Expand All @@ -97,6 +106,8 @@ def generate_polygon_mask(
np.ndarray: 2-D array where each (x_i, y_i) value signifies if the point (x_i, y_i) belong
to the polygon.
"""
logger.info("Generating polygon mask")

coords = list(map(lambda x: Point(x[0], x[1]), itertools.product(coordinates_x, coordinates_y)))

# create a mask for belonging to UK region or not
Expand Down Expand Up @@ -136,12 +147,22 @@ def check_points_in_multipolygon_multiprocessed(
Returns:
np.ndarray: _description_
"""
filename = "./data/uk_region_mask_train.npy"
if os.path.exists(filename):
logger.debug("Loading UK region mask from file")
return np.load(filename)
items = [(point, polygon) for point in points]
results = list()
with mp.Pool(num_processes) as pool:
for result in pool.starmap(check_point_in_multipolygon, items):
results.append(result)
return np.asarray(results)

a = np.asarray(results)

logger.debug(f"Saving UK region mask from file {filename}")
np.save("./data/uk_region_mask_train.npy", a)

return a


def _process_nwp(
Expand Down Expand Up @@ -184,11 +205,15 @@ def load_mask(self) -> xr.DataArray:
Returns:
xr.DataArray: UK-region mask, on NWP (x,y) coords
"""

logger.info("Loading UK region mask from National Grid ESO")

uk_polygon = query_eso_geojson()
uk_polygon = process_eso_uk_multipolygon(uk_polygon)
mask = generate_polygon_mask(self.nwp.coords["x"], self.nwp.coords["y"], uk_polygon)

# convert numpy array to xarray mask for a 1 variable, 1 step times series of (x,y) coords
logger.debug("Making mask")
mask = xr.DataArray(
np.tile(mask.T, (len(self.nwp.coords["init_time"]), 1, 1)),
dims=["init_time", "x", "y"],
Expand Down
28 changes: 25 additions & 3 deletions scripts/preprocessing/uk_region_downsample.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Script for processing raw NWP data"""
import datetime as dt
import itertools
import logging
from argparse import ArgumentParser
from pathlib import Path

Expand All @@ -22,6 +23,14 @@

logger = getLogger("uk-region-filter-nwp-data")

formatString = (
"[%(asctime)s] {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s" # specify a format string
)
logLevel = logging.DEBUG # specify standard log level
logging.basicConfig(format=formatString, level=logLevel, datefmt="%Y-%m-%d %I:%M:%S")

logging.getLogger("gcsfs").setLevel(logging.INFO)
logging.getLogger("geopandas").setLevel(logging.INFO)

FORECAST_HORIZONS = range(NWP_STEP_HORIZON)

Expand Down Expand Up @@ -50,22 +59,30 @@ def main(base_save_directory: Path):
Script to preprocess NWP data, overnight
"""

logger.info("Loading GSP data")
gsp = xr.open_zarr(GSP_FPATH)
logger.info("Loading NWP data")
nwp = xr.open_zarr(NWP_FPATH)
nwp = nwp.chunk({"step": 1, "variable": 1, "init_time": 50})
# logger.info("chunking NWP data")
# nwp = nwp.chunk({"step": 1, "variable": 1, "init_time": 50})

# if we consider all nwp together the polygon mask is too big to fit in memory
# instead, iterate over each year in the dataset and save locally

years = pd.DatetimeIndex(nwp.init_time.values).year.unique().values
date_years = [dt.datetime(year=year, month=1, day=1) for year in years]

for i in range(len(years) - 1):
year = years[i]
# add the next year so that we include over the last year in the data
date_years.append(dt.datetime(year=years[-1] + 1, month=1, day=1))

for i in range(len(date_years) - 1):
year = date_years[i].year
logger.info("Loading NWP data for year %s", year)
start_datetime, end_datetime = date_years[i], date_years[i + 1]
_nwp = nwp.sel(init_time=slice(start_datetime, end_datetime))

# time points to interpolate our nwp data onto.
logger.info("Loading GSP datetimes for year %s", year)
evaluation_timeseries = (
gsp.coords["datetime_gmt"]
.where(
Expand All @@ -76,19 +93,24 @@ def main(base_save_directory: Path):
.values
)

logger.debug(f"{evaluation_timeseries=}")
logger.debug(f"{evaluation_timeseries.shape=}")

dataset_builder = NWPUKRegionMaskedDatasetBuilder(
_nwp,
evaluation_timeseries,
)

iter_params = list(itertools.product(DEFAULT_VARIABLES_FOR_PROCESSING, FORECAST_HORIZONS))
for var, step in iter_params:
logger.info(f"Processing var: {var}, step: {step}, year: {year}")
uk_region, outer_region = dataset_builder.build_region_masked_covariates(var, step)

inner_fpath, outer_fpath = build_local_save_path(
step, var, year, directory=base_save_directory
)

logger.debug("Saving files")
uk_region.to_pickle(inner_fpath)
outer_region.to_pickle(outer_fpath)

Expand Down