Skip to content

Commit

Permalink
cleaner shelve cache handling
Browse files Browse the repository at this point in the history
  • Loading branch information
C-Loftus committed Jan 22, 2025
1 parent 3ac9eef commit fefc3ef
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 23 deletions.
15 changes: 11 additions & 4 deletions makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# start the reverse proxy which gives our server https and points to the proper domain
caddy:
cp ./Caddyfile /etc/caddy/Caddyfile
sudo systemctl restart caddy
###### Docker Compose Commands
## We use these since there is a production profile and without specifying the profile
## docker will not start those services. This can be a footgun

prodUp:
docker compose --profile production up -d
Expand All @@ -12,7 +11,15 @@ prodBuild:
prodDown:
docker compose --profile production down

####### Helper Commands

# start the reverse proxy which gives our server https and points to the proper domain
caddy:
cp ./Caddyfile /etc/caddy/Caddyfile
sudo systemctl restart caddy

# get rid of the sensorthings db, mainly for testing purposes
# or if you need to recrawl. NOTE that you may need to reapply the indices after
wipedb:
docker volume rm oregonwaterdataportal-etl_postgis_volume

Expand Down
6 changes: 6 additions & 0 deletions userCode/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import shelve
from typing import ClassVar, Optional, Tuple

from userCode.env import RUNNING_IN_TEST_ENVIRONMENT
from userCode.util import deterministic_hash


Expand All @@ -38,6 +39,11 @@ def set(self, url: str, content: bytes, _ttl: Optional[timedelta] = None):
get_dagster_logger().warning(f"Unable to cache: {url}")

def get_or_fetch(self, url: str, force_fetch: bool = False) -> Tuple[bytes, int]:
# If we are in prod we want to ignore using the cache and not store anything
if not RUNNING_IN_TEST_ENVIRONMENT:
response = requests.get(url, headers=HEADERS, timeout=300)
return response.content, response.status_code

if self.contains(url) and not force_fetch:
try:
return self.get(url), 200
Expand Down
5 changes: 4 additions & 1 deletion userCode/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@
API_BACKEND_URL = get_env("API_BACKEND_URL")
AWQMS_URL = "https://ordeq.gselements.com/api"

RUNNING_AS_A_TEST_NOT_IN_PROD = "PYTEST_CURRENT_TEST" in os.environ
# If we are running inside of pytest, pytest will set this environment variable
# We can use this to cache data, check more strictly, or do other optimizations
# we wouldn't necessarily want to do in production
RUNNING_IN_TEST_ENVIRONMENT = "PYTEST_CURRENT_TEST" in os.environ
10 changes: 5 additions & 5 deletions userCode/odwr/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import requests
from typing import List, Optional, Tuple

from userCode.env import API_BACKEND_URL, RUNNING_AS_A_TEST_NOT_IN_PROD
from userCode.env import API_BACKEND_URL, RUNNING_IN_TEST_ENVIRONMENT
from userCode.helper_classes import BatchHelper, get_datastream_time_range, MockValues
from userCode.odwr.lib import (
fetch_station_metadata,
Expand Down Expand Up @@ -194,11 +194,11 @@ async def fetch_obs(datastream: Datastream) -> List[Observation]:
# If we are running this as a test, we want to keep track of which observations we have seen so we can detect duplicates
# We don't want to cache every single observation unless we are running as a test since the db will catch duplicates as well
# This is a further check to be thorough
if RUNNING_AS_A_TEST_NOT_IN_PROD:
if RUNNING_IN_TEST_ENVIRONMENT:
key = (datastream.iotid, date)
assert (
key not in seen_obs
), f"Found duplicate observation {key} after {i} iterations for station {attr.station_nbr} and datastream '{datastream.description}' after fetching url: {tsv_url} for date range {range.start} to {new_end}"
assert key not in seen_obs, (
f"Found duplicate observation {key} after {i} iterations for station {attr.station_nbr} and datastream '{datastream.description}' after fetching url: {tsv_url} for date range {range.start} to {new_end}"
)
seen_obs.add(key)

sta_representation = to_sensorthings_observation(
Expand Down
19 changes: 6 additions & 13 deletions userCode/odwr/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@


from userCode.cache import ShelveCache
from userCode.env import API_BACKEND_URL, RUNNING_AS_A_TEST_NOT_IN_PROD
from userCode.env import API_BACKEND_URL
from userCode.odwr.types import (
BASE_OREGON_URL,
POTENTIAL_DATASTREAMS,
Expand Down Expand Up @@ -92,9 +92,9 @@ def parse_oregon_tsv(
data.append(float(row[2]))

parsed_date = parse_date(str(DATE_COLUMN))
assert (
parsed_date not in unique_dates
), f"Date '{parsed_date}' appeared twice in the data"
assert parsed_date not in unique_dates, (
f"Date '{parsed_date}' appeared twice in the data"
)
unique_dates[parsed_date] = None

return ParsedTSVData(data, units, list(unique_dates))
Expand Down Expand Up @@ -159,15 +159,8 @@ def download_oregon_tsv(
"""Get the tsv data for a specific dataset for a specific station in a given date range"""
tsv_url = generate_oregon_tsv_url(dataset, station_nbr, start_date, end_date)

if RUNNING_AS_A_TEST_NOT_IN_PROD:
# If we are in a test, we want to use the cache to avoid making too many requests while testing
# But in production, we always want to fetch and not cache anything to avoid extra data
cache = ShelveCache()
response, status_code = cache.get_or_fetch(tsv_url, force_fetch=False)
else:
fetch_result = requests.get(tsv_url)
status_code = fetch_result.status_code
response = fetch_result.content
cache = ShelveCache()
response, status_code = cache.get_or_fetch(tsv_url, force_fetch=False)

if status_code != 200 or "An Error Has Occured" in response.decode("utf-8"):
raise RuntimeError(
Expand Down

0 comments on commit fefc3ef

Please sign in to comment.