Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up on, saving to cloud #214

Merged
merged 30 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c13ba38
add NUMBER_CONCURRENT_JOBS
peterdudfield Jan 2, 2025
9154578
mypy fix
peterdudfield Jan 2, 2025
8cef147
mypy
peterdudfield Jan 2, 2025
c63a554
role back
peterdudfield Jan 2, 2025
a070be3
turn on verbose
peterdudfield Jan 2, 2025
2b33bc1
try with processes
peterdudfield Jan 2, 2025
6821301
add logging
peterdudfield Jan 2, 2025
68651b3
add log
peterdudfield Jan 2, 2025
c5b439a
make chunk size of lat and lon 1
peterdudfield Jan 2, 2025
bca7ce2
role back
peterdudfield Jan 2, 2025
1131ec5
change to 17 and 18 chunks for lat lon
peterdudfield Jan 2, 2025
7f4df0d
have option to change large chunk size divider
peterdudfield Jan 2, 2025
df748a0
lint
peterdudfield Jan 2, 2025
30b9510
fix
peterdudfield Jan 2, 2025
f39af76
lint
peterdudfield Jan 2, 2025
e7ceff9
lint
peterdudfield Jan 2, 2025
69de6dc
try using safe_chunks=False
peterdudfield Jan 2, 2025
6150c46
remove truncate
peterdudfield Jan 2, 2025
feba5d1
tidy
peterdudfield Jan 2, 2025
c1d8702
lint
peterdudfield Jan 2, 2025
0a18e80
remove chunking of 1
peterdudfield Jan 3, 2025
e150bd6
change to 2 chunks in lat lon
peterdudfield Jan 3, 2025
1fd874d
add logging
peterdudfield Jan 3, 2025
a7d304e
remove safe chunks
peterdudfield Jan 3, 2025
64cfa43
Move maximum chink size back to 8, change to 2 in ecmwf realtime
peterdudfield Jan 3, 2025
0d24ef4
lint
peterdudfield Jan 3, 2025
75bc57e
tidy up, for parrellel threads and processes
peterdudfield Jan 3, 2025
daa2a9a
tidy
peterdudfield Jan 3, 2025
45d0c0c
Merge branch 'main' into concurrent-jobs
peterdudfield Jan 6, 2025
6c3e9ed
fix(coordinate): Log warning on unsafe regional writes (#216)
devsjc Jan 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/nwp_consumer/internal/entities/coordinates.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ class NWPDimensionCoordinateMap:
Will be truncated to 4 decimal places, and ordered as 90 -> -90.
"""
longitude: list[float] | None = None
"""The longitude coordinates of the forecast grid in degrees.
"""The longitude coordinates of the forecast grid in degrees. """
maximum_number_of_chunks_in_one_dim: int = 8
""" The maximum number of chunks in one dimension.
When saving to S3 we might want this to be small, to reduce the number of files saved.

Will be truncated to 4 decimal places, and ordered as -180 -> 180.
"""
Expand All @@ -116,7 +119,9 @@ def dims(self) -> list[str]:
Ignores any dimensions that do not have a corresponding coordinate
index value list.
"""
return [f.name for f in dataclasses.fields(self) if getattr(self, f.name) is not None]
return [f.name for f in dataclasses.fields(self) if
getattr(self, f.name) is not None
and f.name != "maximum_number_of_chunks_in_one_dim"]

@property
def shapemap(self) -> dict[str, int]:
Expand Down Expand Up @@ -409,7 +414,8 @@ def default_chunking(self) -> dict[str, int]:
"init_time": 1,
"step": 1,
} | {
dim: len(getattr(self, dim)) // 8 if len(getattr(self, dim)) > 8 else 1
dim: len(getattr(self, dim)) // self.maximum_number_of_chunks_in_one_dim
if len(getattr(self, dim)) > self.maximum_number_of_chunks_in_one_dim else 1
for dim in self.dims
if dim not in ["init_time", "step"]
}
Expand Down
7 changes: 7 additions & 0 deletions src/nwp_consumer/internal/entities/modelmetadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ def with_region(self, region: str) -> "ModelMetadata":
log.warning(f"Unknown region '{region}', not cropping expected coordinates.")
return self

def set_maximum_number_of_chunks_in_one_dim(self, maximum_number_of_chunks_in_one_dim: int) \
-> "ModelMetadata":
"""Set the maximum number of chunks in one dimension."""
self.expected_coordinates.maximum_number_of_chunks_in_one_dim \
= maximum_number_of_chunks_in_one_dim
return self


class Models:
"""Namespace containing known models."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ def repository() -> entities.RawRepositoryMetadata:
},
postprocess_options=entities.PostProcessOptions(),
available_models={
"default": entities.Models.ECMWF_HRES_IFS_0P1DEGREE.with_region("uk"),
"hres-ifs-uk": entities.Models.ECMWF_HRES_IFS_0P1DEGREE.with_region("uk"),
"default": entities.Models.ECMWF_HRES_IFS_0P1DEGREE.with_region("uk")
.set_maximum_number_of_chunks_in_one_dim(2),
"hres-ifs-uk": entities.Models.ECMWF_HRES_IFS_0P1DEGREE.with_region("uk")
.set_maximum_number_of_chunks_in_one_dim(2),
"hres-ifs-india": entities.Models.ECMWF_HRES_IFS_0P1DEGREE.with_region("india"),
},
)
Expand Down Expand Up @@ -194,6 +196,7 @@ def _download(self, url: str) -> ResultE[pathlib.Path]:
).with_suffix(".grib").expanduser()

# Only download the file if not already present
log.info("Checking for local file: '%s'", local_path)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this and the below should be debug logs

if not local_path.exists() or local_path.stat().st_size == 0:
local_path.parent.mkdir(parents=True, exist_ok=True)
log.debug("Requesting file from S3 at: '%s'", url)
Expand All @@ -203,6 +206,7 @@ def _download(self, url: str) -> ResultE[pathlib.Path]:
raise FileNotFoundError(f"File not found at '{url}'")

with local_path.open("wb") as lf, self._fs.open(url, "rb") as rf:
log.info(f"Writing file from {url} to {local_path}")
for chunk in iter(lambda: rf.read(12 * 1024), b""):
lf.write(chunk)
lf.flush()
Expand Down Expand Up @@ -280,6 +284,7 @@ def _convert(path: pathlib.Path) -> ResultE[list[xr.DataArray]]:
.sortby(variables=["step", "variable", "longitude"])
.sortby(variables="latitude", ascending=False)
)

except Exception as e:
return Failure(ValueError(
f"Error processing dataset {i} from '{path}' to DataArray: {e}",
Expand Down
11 changes: 8 additions & 3 deletions src/nwp_consumer/internal/services/consumer_service.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had it as threads because it's IO that's intensive, as opposed to compute, in each iteration. How come you cahnge it to processes? Also, if concurrency is set to True, why would n_jobs want to then be set to 1? Would that not make it not concurrent again?

Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,18 @@ def _parallelize_generator[T](
"""
# TODO: Change this based on threads instead of CPU count
n_jobs: int = max(cpu_count() - 1, max_connections)
if os.getenv("CONCURRENCY", "True").capitalize() == "False":
prefer = "threads"

concurrency = os.getenv("CONCURRENCY", "True").capitalize() == "False"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, there is some funny logic here that is not clear, I will tidy up

if concurrency:
n_jobs = 1
log.debug(f"Using {n_jobs} concurrent thread(s)")
prefer = "processes"

log.debug(f"Using {n_jobs} concurrent {prefer}")

return Parallel( # type: ignore
n_jobs=n_jobs,
prefer="threads",
prefer=prefer,
verbose=0,
return_as="generator_unordered",
)(delayed_generator)
Expand Down
Loading