Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add sentry function #9

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 55 additions & 29 deletions src/satellite_consumer/run.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
"""Pipeline for downloading, processing, and saving archival satellite data.

Consolidates the old cli_downloader, backfill_hrv and backfill_nonhrv scripts.
"""

import datetime as dt
import sentry_sdk
from importlib.metadata import PackageNotFoundError, version

import eumdac.product
from joblib import Parallel, delayed
from loguru import logger as log
Expand All @@ -28,59 +23,86 @@
except PackageNotFoundError:
__version__ = "v?"

# Initialize Sentry


sentry_sdk.init(
dsn="https://[email protected]/1",
traces_sample_rate=1.0,
environment="production"
)


def _consume_command(command_opts: ArchiveCommandOptions | ConsumeCommandOptions) -> None:
"""Run the download and processing pipeline."""
fs = get_fs(path=command_opts.zarr_path)

window = command_opts.time_window

sentry_sdk.set_tag("satellite", command_opts.satellite_metadata.name)
sentry_sdk.set_tag("time_window", f"{window[0]} to {window[1]}")

product_iter = get_products_iterator(
sat_metadata=command_opts.satellite_metadata,
start=window[0],
end=window[1],
)

# Use existing zarr store if it exists
if fs.exists(command_opts.zarr_path.replace("s3://", "")):
log.info("Using existing zarr store", dst=command_opts.zarr_path)
else:
# Create new store
log.info("Creating new zarr store", dst=command_opts.zarr_path)
_ = create_empty_zarr(dst=command_opts.zarr_path, coords=command_opts.as_coordinates())

def _etl(product: eumdac.product.Product) -> str:
"""Download, process, and save a single NAT file."""
nat_filepath = download_nat(product, folder=f"{command_opts.workdir}/raw")
da = process_nat(path=nat_filepath, hrv=command_opts.hrv)
write_to_zarr(da=da, dst=command_opts.zarr_path)
return nat_filepath
try:
sentry_sdk.add_breadcrumb(message="Starting ETL for product", category="processing")
nat_filepath = download_nat(product, folder=f"{command_opts.workdir}/raw")
da = process_nat(path=nat_filepath, hrv=command_opts.hrv)
write_to_zarr(da=da, dst=command_opts.zarr_path)
return nat_filepath
except Exception as e:
sentry_sdk.capture_exception(e)
log.error(f"Error processing product: {e}")
return ""

nat_filepaths: list[str] = []
# Iterate through all products in search
for nat_filepath in Parallel(
n_jobs=command_opts.num_workers, return_as="generator",
)(delayed(_etl)(product) for product in product_iter):
nat_filepaths.append(nat_filepath)
if nat_filepath:
nat_filepaths.append(nat_filepath)

log.info("Finished population of zarr store", dst=command_opts.zarr_path)

if command_opts.validate:
validate(dataset_path=command_opts.zarr_path)
try:
validate(dataset_path=command_opts.zarr_path)
except Exception as e:
sentry_sdk.capture_exception(e)
log.error(f"Validation failed: {e}")

if isinstance(command_opts, ConsumeCommandOptions) and command_opts.latest_zip:
zippath: str = create_latest_zip(dst=command_opts.zarr_path)
log.info(f"Created latest.zip at {zippath}", dst=zippath)
try:
zippath: str = create_latest_zip(dst=command_opts.zarr_path)
log.info(f"Created latest.zip at {zippath}", dst=zippath)
except Exception as e:
sentry_sdk.capture_exception(e)
log.error(f"Failed to create latest.zip: {e}")

if command_opts.delete_raw:
if command_opts.workdir.startswith("s3://"):
log.warning("delete-raw was specified, but deleting S3 files is not yet implemented")
else:
log.info(
f"Deleting {len(nat_filepaths)} raw files in {command_opts.raw_folder}",
num_files=len(nat_filepaths), dst=command_opts.raw_folder,
)
_ = [f.unlink() for f in nat_filepaths] # type:ignore

try:
if command_opts.workdir.startswith("s3://"):
log.warning("delete-raw was specified, but deleting S3 files is not yet implemented")
else:
log.info(
f"Deleting {len(nat_filepaths)} raw files in {command_opts.raw_folder}",
num_files=len(nat_filepaths), dst=command_opts.raw_folder,
)
_ = [f.unlink() for f in nat_filepaths] # type:ignore
except Exception as e:
sentry_sdk.capture_exception(e)
log.error(f"Failed to delete raw files: {e}")

def run(config: SatelliteConsumerConfig) -> None:
"""Run the download and processing pipeline."""
Expand All @@ -91,8 +113,12 @@ def run(config: SatelliteConsumerConfig) -> None:
version=__version__, start_time=str(prog_start), opts=config.command_options.__str__(),
)

if config.command == "archive" or config.command == "consume":
_consume_command(command_opts=config.command_options)
try:
if config.command == "archive" or config.command == "consume":
_consume_command(command_opts=config.command_options)
except Exception as e:
sentry_sdk.capture_exception(e)
log.error(f"Pipeline execution failed: {e}")

runtime = dt.datetime.now(tz=dt.UTC) - prog_start
log.info(f"Completed satellite consumer run in {runtime!s}.")
Expand Down