Skip to content

Commit

Permalink
Add e2e testing
Browse files Browse the repository at this point in the history
  • Loading branch information
webb-ben committed Jan 22, 2025
1 parent ba96f8f commit b16bf30
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 24 deletions.
103 changes: 82 additions & 21 deletions tests/awqms/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,35 @@
#
# =================================================================

from dagster import DagsterInstance
import pytest
import requests
from unittest.mock import patch

from userCode import definitions
from userCode.awqms.dag import (
awqms_preflight_checks,
post_awqms_station,
post_awqms_datastreams,
awqms_datastreams,
awqms_observations,
awqms_schedule,
)
from userCode.awqms.types import (
ALL_RELEVANT_STATIONS
)
from userCode.env import API_BACKEND_URL
from userCode.helper_classes import (
get_datastream_time_range
)

from ..lib import (
wipe_datastreams,
wipe_locations,
wipe_observed_properties,
wipe_things,
assert_observations_and_datastreams_empty,
assert_no_duplicate_at_given_time,
)


def test_awqms_preflight_checks():
Expand Down Expand Up @@ -57,26 +75,6 @@ def test_post_awqms_datastreams(sample_datastream):
mock_post.assert_called_once()


@pytest.mark.asyncio
async def test_awqms_observations(sample_station_data, sample_datastream):
datastreams = [
sample_datastream,
]

with patch(
"userCode.awqms.lib.fetch_observation_ids"
) as mock_fetch_observation_ids:
mock_fetch_observation_ids.return_value = set()

with patch("userCode.awqms.lib.fetch_observations") as mock_fetch_observations:
mock_fetch_observations.return_value = [
{"ResultValue": 10, "StartDateTime": "2025-01-01", "Status": "Final"}
]

observations = await awqms_observations(sample_station_data, datastreams) # type: ignore
assert len(observations) > 0


def test_awqms_schedule_triggering():
pch = "userCode.awqms.dag.station_partition.get_partition_keys"
with patch(pch) as mock_get_partition_keys:
Expand All @@ -87,3 +85,66 @@ def test_awqms_schedule_triggering():
assert len(runs) == 2
assert runs[0].partition_key == "1234"
assert runs[1].partition_key == "5678"


def test_full_pipeline():
"""Test the full pipeline execution and data integrity"""
# Clean environment
wipe_locations()
wipe_observed_properties()
wipe_things()
wipe_datastreams()
assert_observations_and_datastreams_empty()

harvest_job = definitions.get_job_def("harvest_awqms")

instance = DagsterInstance.ephemeral()
first_station = str(ALL_RELEVANT_STATIONS[0])

initial_run = harvest_job.execute_in_process(
instance=instance,
partition_key=first_station,
)
assert initial_run.success

# Verify data was created
for endpoint in ["Locations", "Datastreams", "Observations"]:
response = requests.get(f"{API_BACKEND_URL}/{endpoint}?$count=true")
assert response.ok, f"Failed to get {endpoint}: {response.text}"
count = response.json()["@iot.count"]
assert count > 0, f"No {endpoint} found after harvesting"

# Check first datastream
datastreams = requests.get(f"{API_BACKEND_URL}/Datastreams")
assert datastreams.ok, "Failed to get datastreams"
first_datastream_iotid = datastreams.json()["value"][0]["@iot.id"]

# Verify time range
range = get_datastream_time_range(first_datastream_iotid)

# Check for duplicates
assert_no_duplicate_at_given_time(first_datastream_iotid, range.start)
assert_no_duplicate_at_given_time(first_datastream_iotid, range.end)

# Test update run
update_run = harvest_job.execute_in_process(
instance=instance,
partition_key=first_station,
)
assert update_run.success, "Update run failed"

# Verify updated time range
update_range = get_datastream_time_range(first_datastream_iotid)
assert update_range.start < update_range.end, "Updated range start must be before end"
assert update_range.start == range.start, "Start date should not change on update"
assert update_range.end >= range.end, "End date should advance or stay same"

# Final duplicate checks
assert_no_duplicate_at_given_time(first_datastream_iotid, update_range.start)
assert_no_duplicate_at_given_time(first_datastream_iotid, update_range.end)

# Cleanup
wipe_locations()
wipe_things()
wipe_observed_properties()
wipe_datastreams()
28 changes: 26 additions & 2 deletions tests/awqms/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import tempfile
from unittest.mock import patch, Mock

from userCode.awqms.lib import read_csv, fetch_station, fetch_observations
from userCode.awqms.lib import read_csv, fetch_station, fetch_observations, fetch_observation_ids


@pytest.fixture
Expand Down Expand Up @@ -88,4 +88,28 @@ def test_fetch_observations_invalid_json(mock_shelve_cache_cls):

# Test that a RuntimeError is raised for invalid JSON data
with pytest.raises(RuntimeError, match="Request to.*failed with status 404"):
fetch_observations("Temperature", "TEST123")
fetch_observations("Temperature", "12005-ORDEQ")


def test_fetch_observation_ids():
datastream_id = "12005-ORDEQ-2704"
api_url = f"https://localhost:8080/FROST-Server/v1.1/Datastreams('{datastream_id}')/Observations"

mock_response_1 = {
"value": [{"@iot.id": 1}, {"@iot.id": 2}],
"@iot.nextLink": f"{api_url}?$skip=2"
}

mock_response_2 = {"value": [{"@iot.id": 3}, {"@iot.id": 4}]}

with patch("userCode.awqms.lib.requests.get") as mock_get:

mock_get.side_effect = [
Mock(status_code=200, json=Mock(return_value=mock_response_1)),
Mock(status_code=200, json=Mock(return_value=mock_response_2))
]

result = fetch_observation_ids(datastream_id)

assert result == {1, 2, 3, 4}
assert mock_get.call_count == 2
2 changes: 1 addition & 1 deletion tests/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def assert_no_duplicate_at_given_time(
):
"""Checks if there are multiple observations at the same time for a given datastream"""
# This is in a format like https://owdp-pilot.internetofwater.app/FROST-Server/v1.1/Datastreams(140805000)/Observations?$filter=resultTime%20eq%201941-10-01T00:00:00Z
url = f"{API_BACKEND_URL}/Datastreams({datastream_int})/Observations?$filter=resultTime%20eq%20{date_to_check.strftime('%Y-%m-%dT%H:%M:%SZ')}"
url = f"{API_BACKEND_URL}/Datastreams('{datastream_int}')/Observations?$filter=resultTime%20eq%20{date_to_check.strftime('%Y-%m-%dT%H:%M:%SZ')}"
resp = requests.get(url)
assert resp.ok, resp.text
assert (
Expand Down

0 comments on commit b16bf30

Please sign in to comment.