From b56ac14110ac8b7c842c33b1e14403f3710058ad Mon Sep 17 00:00:00 2001 From: aryan lamba Date: Fri, 28 Feb 2025 19:13:18 +0530 Subject: [PATCH 1/6] add sentry function Signed-off-by: aryan lamba --- src/satellite_consumer/run.py | 84 +++++++++++++++++++++++------------ 1 file changed, 55 insertions(+), 29 deletions(-) diff --git a/src/satellite_consumer/run.py b/src/satellite_consumer/run.py index 396827a..6f4c110 100644 --- a/src/satellite_consumer/run.py +++ b/src/satellite_consumer/run.py @@ -1,11 +1,6 @@ -"""Pipeline for downloading, processing, and saving archival satellite data. - -Consolidates the old cli_downloader, backfill_hrv and backfill_nonhrv scripts. -""" - import datetime as dt +import sentry_sdk from importlib.metadata import PackageNotFoundError, version - import eumdac.product from joblib import Parallel, delayed from loguru import logger as log @@ -28,11 +23,23 @@ except PackageNotFoundError: __version__ = "v?" +# Initialize Sentry + + +sentry_sdk.init( + dsn="https://12927b5f211046b575ee51fd8b1ac34f@o1.ingest.sentry.io/1", + traces_sample_rate=1.0, + environment="production" +) + + def _consume_command(command_opts: ArchiveCommandOptions | ConsumeCommandOptions) -> None: """Run the download and processing pipeline.""" fs = get_fs(path=command_opts.zarr_path) - window = command_opts.time_window + + sentry_sdk.set_tag("satellite", command_opts.satellite_metadata.name) + sentry_sdk.set_tag("time_window", f"{window[0]} to {window[1]}") product_iter = get_products_iterator( sat_metadata=command_opts.satellite_metadata, @@ -40,47 +47,62 @@ def _consume_command(command_opts: ArchiveCommandOptions | ConsumeCommandOptions end=window[1], ) - # Use existing zarr store if it exists if fs.exists(command_opts.zarr_path.replace("s3://", "")): log.info("Using existing zarr store", dst=command_opts.zarr_path) else: - # Create new store log.info("Creating new zarr store", dst=command_opts.zarr_path) _ = create_empty_zarr(dst=command_opts.zarr_path, coords=command_opts.as_coordinates()) def _etl(product: eumdac.product.Product) -> str: """Download, process, and save a single NAT file.""" - nat_filepath = download_nat(product, folder=f"{command_opts.workdir}/raw") - da = process_nat(path=nat_filepath, hrv=command_opts.hrv) - write_to_zarr(da=da, dst=command_opts.zarr_path) - return nat_filepath + try: + sentry_sdk.add_breadcrumb(message="Starting ETL for product", category="processing") + nat_filepath = download_nat(product, folder=f"{command_opts.workdir}/raw") + da = process_nat(path=nat_filepath, hrv=command_opts.hrv) + write_to_zarr(da=da, dst=command_opts.zarr_path) + return nat_filepath + except Exception as e: + sentry_sdk.capture_exception(e) + log.error(f"Error processing product: {e}") + return "" nat_filepaths: list[str] = [] - # Iterate through all products in search for nat_filepath in Parallel( n_jobs=command_opts.num_workers, return_as="generator", )(delayed(_etl)(product) for product in product_iter): - nat_filepaths.append(nat_filepath) + if nat_filepath: + nat_filepaths.append(nat_filepath) log.info("Finished population of zarr store", dst=command_opts.zarr_path) if command_opts.validate: - validate(dataset_path=command_opts.zarr_path) + try: + validate(dataset_path=command_opts.zarr_path) + except Exception as e: + sentry_sdk.capture_exception(e) + log.error(f"Validation failed: {e}") if isinstance(command_opts, ConsumeCommandOptions) and command_opts.latest_zip: - zippath: str = create_latest_zip(dst=command_opts.zarr_path) - log.info(f"Created latest.zip at {zippath}", dst=zippath) + try: + zippath: str = create_latest_zip(dst=command_opts.zarr_path) + log.info(f"Created latest.zip at {zippath}", dst=zippath) + except Exception as e: + sentry_sdk.capture_exception(e) + log.error(f"Failed to create latest.zip: {e}") if command_opts.delete_raw: - if command_opts.workdir.startswith("s3://"): - log.warning("delete-raw was specified, but deleting S3 files is not yet implemented") - else: - log.info( - f"Deleting {len(nat_filepaths)} raw files in {command_opts.raw_folder}", - num_files=len(nat_filepaths), dst=command_opts.raw_folder, - ) - _ = [f.unlink() for f in nat_filepaths] # type:ignore - + try: + if command_opts.workdir.startswith("s3://"): + log.warning("delete-raw was specified, but deleting S3 files is not yet implemented") + else: + log.info( + f"Deleting {len(nat_filepaths)} raw files in {command_opts.raw_folder}", + num_files=len(nat_filepaths), dst=command_opts.raw_folder, + ) + _ = [f.unlink() for f in nat_filepaths] # type:ignore + except Exception as e: + sentry_sdk.capture_exception(e) + log.error(f"Failed to delete raw files: {e}") def run(config: SatelliteConsumerConfig) -> None: """Run the download and processing pipeline.""" @@ -91,8 +113,12 @@ def run(config: SatelliteConsumerConfig) -> None: version=__version__, start_time=str(prog_start), opts=config.command_options.__str__(), ) - if config.command == "archive" or config.command == "consume": - _consume_command(command_opts=config.command_options) + try: + if config.command == "archive" or config.command == "consume": + _consume_command(command_opts=config.command_options) + except Exception as e: + sentry_sdk.capture_exception(e) + log.error(f"Pipeline execution failed: {e}") runtime = dt.datetime.now(tz=dt.UTC) - prog_start log.info(f"Completed satellite consumer run in {runtime!s}.") From ebda748cfa548d5d16ebfbb1695e51f5e5b7a264 Mon Sep 17 00:00:00 2001 From: aryan lamba Date: Fri, 28 Feb 2025 20:22:41 +0530 Subject: [PATCH 2/6] update all change --- src/satellite_consumer/run.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/satellite_consumer/run.py b/src/satellite_consumer/run.py index 6f4c110..7f77bec 100644 --- a/src/satellite_consumer/run.py +++ b/src/satellite_consumer/run.py @@ -1,3 +1,5 @@ +"""Pipeline for downloading, processing, and saving archival satellite data.""" + import datetime as dt import sentry_sdk from importlib.metadata import PackageNotFoundError, version @@ -32,14 +34,15 @@ environment="production" ) +sentry_sdk.set_tag("app_name", "satellite-consumer") +sentry_sdk.set_tag("version", __version__) + def _consume_command(command_opts: ArchiveCommandOptions | ConsumeCommandOptions) -> None: """Run the download and processing pipeline.""" fs = get_fs(path=command_opts.zarr_path) window = command_opts.time_window - sentry_sdk.set_tag("satellite", command_opts.satellite_metadata.name) - sentry_sdk.set_tag("time_window", f"{window[0]} to {window[1]}") product_iter = get_products_iterator( sat_metadata=command_opts.satellite_metadata, From e305b5bca39f514ab7f6c0ccdca94f551cd7f80e Mon Sep 17 00:00:00 2001 From: aryan lamba Date: Sat, 1 Mar 2025 13:15:10 +0530 Subject: [PATCH 3/6] update env file --- src/satellite_consumer/run.py | 104 ++++++++++++++++------------------ 1 file changed, 48 insertions(+), 56 deletions(-) diff --git a/src/satellite_consumer/run.py b/src/satellite_consumer/run.py index 7f77bec..b08bfb7 100644 --- a/src/satellite_consumer/run.py +++ b/src/satellite_consumer/run.py @@ -1,8 +1,14 @@ -"""Pipeline for downloading, processing, and saving archival satellite data.""" +"""Pipeline for downloading, processing, and saving archival satellite data. + +Consolidates the old cli_downloader, backfill_hrv and backfill_nonhrv scripts. +""" import datetime as dt -import sentry_sdk +import os from importlib.metadata import PackageNotFoundError, version + +from dotenv import load_dotenv +import sentry_sdk import eumdac.product from joblib import Parallel, delayed from loguru import logger as log @@ -20,29 +26,34 @@ from satellite_consumer.storage import create_empty_zarr, create_latest_zip, get_fs, write_to_zarr from satellite_consumer.validate import validate +# Load environment variables from .env file +load_dotenv() + +# Get Sentry DSN from environment variables +SENTRY_DSN = os.getenv("SENTRY_DSN") + +# Initialize Sentry if DSN is available +if SENTRY_DSN: + sentry_sdk.init( + dsn=SENTRY_DSN, # Using .env variable + traces_sample_rate=1.0, + environment="production" + ) + log.info(" Sentry initialized successfully!") +else: + log.warning("SENTRY_DSN is not set. Sentry will not be initialized.") + try: __version__ = version("satellite-consumer") except PackageNotFoundError: __version__ = "v?" -# Initialize Sentry - - -sentry_sdk.init( - dsn="https://12927b5f211046b575ee51fd8b1ac34f@o1.ingest.sentry.io/1", - traces_sample_rate=1.0, - environment="production" -) - -sentry_sdk.set_tag("app_name", "satellite-consumer") -sentry_sdk.set_tag("version", __version__) - def _consume_command(command_opts: ArchiveCommandOptions | ConsumeCommandOptions) -> None: """Run the download and processing pipeline.""" fs = get_fs(path=command_opts.zarr_path) + window = command_opts.time_window - product_iter = get_products_iterator( sat_metadata=command_opts.satellite_metadata, @@ -50,62 +61,47 @@ def _consume_command(command_opts: ArchiveCommandOptions | ConsumeCommandOptions end=window[1], ) + # Use existing zarr store if it exists if fs.exists(command_opts.zarr_path.replace("s3://", "")): log.info("Using existing zarr store", dst=command_opts.zarr_path) else: + # Create new store log.info("Creating new zarr store", dst=command_opts.zarr_path) _ = create_empty_zarr(dst=command_opts.zarr_path, coords=command_opts.as_coordinates()) def _etl(product: eumdac.product.Product) -> str: """Download, process, and save a single NAT file.""" - try: - sentry_sdk.add_breadcrumb(message="Starting ETL for product", category="processing") - nat_filepath = download_nat(product, folder=f"{command_opts.workdir}/raw") - da = process_nat(path=nat_filepath, hrv=command_opts.hrv) - write_to_zarr(da=da, dst=command_opts.zarr_path) - return nat_filepath - except Exception as e: - sentry_sdk.capture_exception(e) - log.error(f"Error processing product: {e}") - return "" + nat_filepath = download_nat(product, folder=f"{command_opts.workdir}/raw") + da = process_nat(path=nat_filepath, hrv=command_opts.hrv) + write_to_zarr(da=da, dst=command_opts.zarr_path) + return nat_filepath nat_filepaths: list[str] = [] + # Iterate through all products in search for nat_filepath in Parallel( n_jobs=command_opts.num_workers, return_as="generator", )(delayed(_etl)(product) for product in product_iter): - if nat_filepath: - nat_filepaths.append(nat_filepath) + nat_filepaths.append(nat_filepath) log.info("Finished population of zarr store", dst=command_opts.zarr_path) if command_opts.validate: - try: - validate(dataset_path=command_opts.zarr_path) - except Exception as e: - sentry_sdk.capture_exception(e) - log.error(f"Validation failed: {e}") + validate(dataset_path=command_opts.zarr_path) if isinstance(command_opts, ConsumeCommandOptions) and command_opts.latest_zip: - try: - zippath: str = create_latest_zip(dst=command_opts.zarr_path) - log.info(f"Created latest.zip at {zippath}", dst=zippath) - except Exception as e: - sentry_sdk.capture_exception(e) - log.error(f"Failed to create latest.zip: {e}") + zippath: str = create_latest_zip(dst=command_opts.zarr_path) + log.info(f"Created latest.zip at {zippath}", dst=zippath) if command_opts.delete_raw: - try: - if command_opts.workdir.startswith("s3://"): - log.warning("delete-raw was specified, but deleting S3 files is not yet implemented") - else: - log.info( - f"Deleting {len(nat_filepaths)} raw files in {command_opts.raw_folder}", - num_files=len(nat_filepaths), dst=command_opts.raw_folder, - ) - _ = [f.unlink() for f in nat_filepaths] # type:ignore - except Exception as e: - sentry_sdk.capture_exception(e) - log.error(f"Failed to delete raw files: {e}") + if command_opts.workdir.startswith("s3://"): + log.warning("delete-raw was specified, but deleting S3 files is not yet implemented") + else: + log.info( + f"Deleting {len(nat_filepaths)} raw files in {command_opts.raw_folder}", + num_files=len(nat_filepaths), dst=command_opts.raw_folder, + ) + _ = [f.unlink() for f in nat_filepaths] + def run(config: SatelliteConsumerConfig) -> None: """Run the download and processing pipeline.""" @@ -116,12 +112,8 @@ def run(config: SatelliteConsumerConfig) -> None: version=__version__, start_time=str(prog_start), opts=config.command_options.__str__(), ) - try: - if config.command == "archive" or config.command == "consume": - _consume_command(command_opts=config.command_options) - except Exception as e: - sentry_sdk.capture_exception(e) - log.error(f"Pipeline execution failed: {e}") + if config.command == "archive" or config.command == "consume": + _consume_command(command_opts=config.command_options) runtime = dt.datetime.now(tz=dt.UTC) - prog_start log.info(f"Completed satellite consumer run in {runtime!s}.") From 0b26b0c629f8e11867542fca7c18b63717899261 Mon Sep 17 00:00:00 2001 From: aryan lamba Date: Sat, 1 Mar 2025 14:15:53 +0530 Subject: [PATCH 4/6] update code Signed-off-by: aryan lamba --- src/satellite_consumer/run.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/satellite_consumer/run.py b/src/satellite_consumer/run.py index b08bfb7..07a64e8 100644 --- a/src/satellite_consumer/run.py +++ b/src/satellite_consumer/run.py @@ -39,9 +39,9 @@ traces_sample_rate=1.0, environment="production" ) - log.info(" Sentry initialized successfully!") + log.info("✅ Sentry initialized successfully!") else: - log.warning("SENTRY_DSN is not set. Sentry will not be initialized.") + log.warning("⚠️ SENTRY_DSN is not set. Sentry will not be initialized.") try: __version__ = version("satellite-consumer") @@ -100,7 +100,8 @@ def _etl(product: eumdac.product.Product) -> str: f"Deleting {len(nat_filepaths)} raw files in {command_opts.raw_folder}", num_files=len(nat_filepaths), dst=command_opts.raw_folder, ) - _ = [f.unlink() for f in nat_filepaths] + for f in nat_filepaths: + f.unlink() # type: ignore # Removed unnecessary list comprehension def run(config: SatelliteConsumerConfig) -> None: From eb66781249a7233d592a60e0c87a73f78ecaafa8 Mon Sep 17 00:00:00 2001 From: aryan lamba Date: Wed, 5 Mar 2025 18:04:39 +0530 Subject: [PATCH 5/6] update environment variable called ENVIRONMENT instead of hard coded --- src/satellite_consumer/run.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/satellite_consumer/run.py b/src/satellite_consumer/run.py index 07a64e8..3b972da 100644 --- a/src/satellite_consumer/run.py +++ b/src/satellite_consumer/run.py @@ -35,10 +35,10 @@ # Initialize Sentry if DSN is available if SENTRY_DSN: sentry_sdk.init( - dsn=SENTRY_DSN, # Using .env variable - traces_sample_rate=1.0, - environment="production" - ) + dsn=SENTRY_DSN, # Using .env variable + traces_sample_rate=1.0, + environment=os.getenv("ENVIRONMENT", "production") # Get from ENVIRONMENT variable +) log.info("✅ Sentry initialized successfully!") else: log.warning("⚠️ SENTRY_DSN is not set. Sentry will not be initialized.") From ae745dadb49aff5d19cdef5f6671a0e8b14acf94 Mon Sep 17 00:00:00 2001 From: aryan lamba Date: Fri, 7 Mar 2025 23:50:11 +0530 Subject: [PATCH 6/6] update --- src/satellite_consumer/run.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/satellite_consumer/run.py b/src/satellite_consumer/run.py index 3b972da..0c75b2a 100644 --- a/src/satellite_consumer/run.py +++ b/src/satellite_consumer/run.py @@ -37,11 +37,12 @@ sentry_sdk.init( dsn=SENTRY_DSN, # Using .env variable traces_sample_rate=1.0, - environment=os.getenv("ENVIRONMENT", "production") # Get from ENVIRONMENT variable +environment=os.getenv("ENVIRONMENT", "development") + # Get from ENVIRONMENT variable ) - log.info("✅ Sentry initialized successfully!") + log.info(" Sentry initialized successfully!") else: - log.warning("⚠️ SENTRY_DSN is not set. Sentry will not be initialized.") + log.debug(" SENTRY_DSN is not set. Sentry will not be initialized.") try: __version__ = version("satellite-consumer")