Skip to content

Commit

Permalink
Combine observations
Browse files Browse the repository at this point in the history
  • Loading branch information
webb-ben committed Jan 21, 2025
1 parent 2e8abf3 commit b7e6ac8
Showing 1 changed file with 13 additions and 16 deletions.
29 changes: 13 additions & 16 deletions userCode/awqms/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def post_awqms_datastreams(awqms_datastreams: list[Datastream]):
group_name="awqms")
async def awqms_observations(awqms_metadata: StationData,
awqms_datastreams: list[Datastream]
) -> list[Observation]:
) -> bool:

associatedThing = awqms_metadata.MonitoringLocationId

Expand All @@ -181,8 +181,8 @@ async def fetch_and_process(datastream: Datastream):
id = "".join([datastream.iotid, result["StartDateTime"]])
iotid = deterministic_hash(id, 18)

_test = (iotid in observations or
iotid in observations_ids) and result["Status"] != "Final"
_test = (iotid in observations or iotid in observations_ids
) and result["Status"] != "Final"
if _test:
continue

Expand All @@ -192,21 +192,18 @@ async def fetch_and_process(datastream: Datastream):
)

# Run fetch_and_process for all datastreams concurrently
await asyncio.gather(*(fetch_and_process(datastream)
for datastream in awqms_datastreams))

return list(observations.values())


@asset(
partitions_def=station_partition,
deps=[post_awqms_datastreams],
group_name="awqms")
def batch_post_awqms_observations(awqms_observations: list[Observation]):
"""Post a group of observations for multiple datastreams
to the Sensorthings API"""
try:
await asyncio.gather(*(fetch_and_process(datastream)
for datastream in awqms_datastreams))
except Exception as err:
get_dagster_logger().error(err)
return False

awqms_observations = list(observations.values())
BatchHelper().send_observations(awqms_observations)

return True


awqms_job = define_asset_job(
"harvest_awqms",
Expand Down

0 comments on commit b7e6ac8

Please sign in to comment.