From fefc3effb12042b14627e7a49b050fa040383a8c Mon Sep 17 00:00:00 2001 From: Colton Loftus <70598503+C-Loftus@users.noreply.github.com> Date: Wed, 22 Jan 2025 10:54:46 -0500 Subject: [PATCH] cleaner shelve cache handling --- makefile | 15 +++++++++++---- userCode/cache.py | 6 ++++++ userCode/env.py | 5 ++++- userCode/odwr/dag.py | 10 +++++----- userCode/odwr/lib.py | 19 ++++++------------- 5 files changed, 32 insertions(+), 23 deletions(-) diff --git a/makefile b/makefile index 6bac3c1..2c5c605 100644 --- a/makefile +++ b/makefile @@ -1,7 +1,6 @@ -# start the reverse proxy which gives our server https and points to the proper domain -caddy: - cp ./Caddyfile /etc/caddy/Caddyfile - sudo systemctl restart caddy +###### Docker Compose Commands +## We use these since there is a production profile and without specifying the profile +## docker will not start those services. This can be a footgun prodUp: docker compose --profile production up -d @@ -12,7 +11,15 @@ prodBuild: prodDown: docker compose --profile production down +####### Helper Commands + +# start the reverse proxy which gives our server https and points to the proper domain +caddy: + cp ./Caddyfile /etc/caddy/Caddyfile + sudo systemctl restart caddy + # get rid of the sensorthings db, mainly for testing purposes +# or if you need to recrawl. NOTE that you may need to reapply the indices after wipedb: docker volume rm oregonwaterdataportal-etl_postgis_volume diff --git a/userCode/cache.py b/userCode/cache.py index 3040414..3a8bc0e 100644 --- a/userCode/cache.py +++ b/userCode/cache.py @@ -16,6 +16,7 @@ import shelve from typing import ClassVar, Optional, Tuple +from userCode.env import RUNNING_IN_TEST_ENVIRONMENT from userCode.util import deterministic_hash @@ -38,6 +39,11 @@ def set(self, url: str, content: bytes, _ttl: Optional[timedelta] = None): get_dagster_logger().warning(f"Unable to cache: {url}") def get_or_fetch(self, url: str, force_fetch: bool = False) -> Tuple[bytes, int]: + # If we are in prod we want to ignore using the cache and not store anything + if not RUNNING_IN_TEST_ENVIRONMENT: + response = requests.get(url, headers=HEADERS, timeout=300) + return response.content, response.status_code + if self.contains(url) and not force_fetch: try: return self.get(url), 200 diff --git a/userCode/env.py b/userCode/env.py index 7873048..45a0ad9 100644 --- a/userCode/env.py +++ b/userCode/env.py @@ -15,4 +15,7 @@ API_BACKEND_URL = get_env("API_BACKEND_URL") AWQMS_URL = "https://ordeq.gselements.com/api" -RUNNING_AS_A_TEST_NOT_IN_PROD = "PYTEST_CURRENT_TEST" in os.environ +# If we are running inside of pytest, pytest will set this environment variable +# We can use this to cache data, check more strictly, or do other optimizations +# we wouldn't necessarily want to do in production +RUNNING_IN_TEST_ENVIRONMENT = "PYTEST_CURRENT_TEST" in os.environ diff --git a/userCode/odwr/dag.py b/userCode/odwr/dag.py index fa4b9dc..45ff56b 100644 --- a/userCode/odwr/dag.py +++ b/userCode/odwr/dag.py @@ -27,7 +27,7 @@ import requests from typing import List, Optional, Tuple -from userCode.env import API_BACKEND_URL, RUNNING_AS_A_TEST_NOT_IN_PROD +from userCode.env import API_BACKEND_URL, RUNNING_IN_TEST_ENVIRONMENT from userCode.helper_classes import BatchHelper, get_datastream_time_range, MockValues from userCode.odwr.lib import ( fetch_station_metadata, @@ -194,11 +194,11 @@ async def fetch_obs(datastream: Datastream) -> List[Observation]: # If we are running this as a test, we want to keep track of which observations we have seen so we can detect duplicates # We don't want to cache every single observation unless we are running as a test since the db will catch duplicates as well # This is a further check to be thorough - if RUNNING_AS_A_TEST_NOT_IN_PROD: + if RUNNING_IN_TEST_ENVIRONMENT: key = (datastream.iotid, date) - assert ( - key not in seen_obs - ), f"Found duplicate observation {key} after {i} iterations for station {attr.station_nbr} and datastream '{datastream.description}' after fetching url: {tsv_url} for date range {range.start} to {new_end}" + assert key not in seen_obs, ( + f"Found duplicate observation {key} after {i} iterations for station {attr.station_nbr} and datastream '{datastream.description}' after fetching url: {tsv_url} for date range {range.start} to {new_end}" + ) seen_obs.add(key) sta_representation = to_sensorthings_observation( diff --git a/userCode/odwr/lib.py b/userCode/odwr/lib.py index 5e3d153..515e574 100644 --- a/userCode/odwr/lib.py +++ b/userCode/odwr/lib.py @@ -19,7 +19,7 @@ from userCode.cache import ShelveCache -from userCode.env import API_BACKEND_URL, RUNNING_AS_A_TEST_NOT_IN_PROD +from userCode.env import API_BACKEND_URL from userCode.odwr.types import ( BASE_OREGON_URL, POTENTIAL_DATASTREAMS, @@ -92,9 +92,9 @@ def parse_oregon_tsv( data.append(float(row[2])) parsed_date = parse_date(str(DATE_COLUMN)) - assert ( - parsed_date not in unique_dates - ), f"Date '{parsed_date}' appeared twice in the data" + assert parsed_date not in unique_dates, ( + f"Date '{parsed_date}' appeared twice in the data" + ) unique_dates[parsed_date] = None return ParsedTSVData(data, units, list(unique_dates)) @@ -159,15 +159,8 @@ def download_oregon_tsv( """Get the tsv data for a specific dataset for a specific station in a given date range""" tsv_url = generate_oregon_tsv_url(dataset, station_nbr, start_date, end_date) - if RUNNING_AS_A_TEST_NOT_IN_PROD: - # If we are in a test, we want to use the cache to avoid making too many requests while testing - # But in production, we always want to fetch and not cache anything to avoid extra data - cache = ShelveCache() - response, status_code = cache.get_or_fetch(tsv_url, force_fetch=False) - else: - fetch_result = requests.get(tsv_url) - status_code = fetch_result.status_code - response = fetch_result.content + cache = ShelveCache() + response, status_code = cache.get_or_fetch(tsv_url, force_fetch=False) if status_code != 200 or "An Error Has Occured" in response.decode("utf-8"): raise RuntimeError(