From 19b85a3a5a2708fcec8f9e57ab8e689676aabb9d Mon Sep 17 00:00:00 2001 From: Ramesh Maddegoda <94033485+ramesh-maddegoda@users.noreply.github.com> Date: Tue, 6 Feb 2024 19:04:53 -0800 Subject: [PATCH] UPDATE Nucleus pds-nucleus-datasync-completion lambda code to read the verified files DataSync report instead of the transferred files report Refer to task https://github.com/NASA-PDS/nucleus/issues/54 --- .../pds-nucleus-datasync-completion.py | 341 +++++++++++++++--- 1 file changed, 290 insertions(+), 51 deletions(-) diff --git a/src/pds/ingress/pds-nucleus-datasync-completion.py b/src/pds/ingress/pds-nucleus-datasync-completion.py index 1d0a922..e0a8e87 100644 --- a/src/pds/ingress/pds-nucleus-datasync-completion.py +++ b/src/pds/ingress/pds-nucleus-datasync-completion.py @@ -31,9 +31,253 @@ rds_data = boto3.client('rds-data') +def lambda_handler(event,context): + import boto3 + import urllib.parse + import logging + import json + import os + import time + from xml.dom import minidom + + datasync_reports_s3_bucket_name = "pds-nucleus-datassync-reports" + + logger = logging.getLogger("pds-nucleus-datasync-completion") + logger.setLevel(logging.DEBUG) + logger.addHandler(logging.StreamHandler()) + + s3 = boto3.resource('s3') + s3_client = boto3.client('s3') + + s3_bucket_name = "pds-nucleus-staging" + db_clust_arn = os.environ.get('DB_CLUSTER_ARN') + db_secret_arn = os.environ.get('DB_SECRET_ARN') + efs_mount_path = os.environ.get('EFS_MOUNT_PATH') + + + rds_data = boto3.client('rds-data') + + def lambda_handler(event,context): + logger.info(f"Lambda Request ID: {context.aws_request_id}") + logger.info(f"Event: {event}") + resource = event['resources'] + + resource_list = resource[0].split("/") + task_id = resource_list[1] + exec_id = resource_list[3] + + prefix = f"Detailed-Reports/{task_id}/{exec_id}/{exec_id}.files-verified-" + + datasync_reports_s3_bucket = s3.Bucket(datasync_reports_s3_bucket_name) + + list_of_files = [] + + for transfer_report in datasync_reports_s3_bucket.objects.filter(Prefix=prefix): + + transfer_report_file_content = transfer_report.get()['Body'].read().decode('utf-8') + transfer_report_json_content = json.loads(transfer_report_file_content) + trasfered_file_obj_list = transfer_report_json_content['Verified'] + + logger.debug(f"veri_file_obj_list: {trasfered_file_obj_list}") + + for file_obj in trasfered_file_obj_list: + + obj_name = file_obj['RelativePath'] + obj_type = file_obj['SrcMetadata']['Type'] + + if obj_type == 'Regular': # Not a directory + + if obj_name.endswith('.fz'): + file_to_extract = f"/mnt/data/{s3_bucket_name}" + obj_name + extract_file(file_to_extract) + obj_name = obj_name.rstrip(",.fz") + + s3_url_of_file = "s3://" + s3_bucket_name + obj_name + + s3_key = obj_name[1:] + + handle_file_types(s3_url_of_file, s3_bucket_name, s3_key) + + list_of_files.append(s3_url_of_file) + + logger.debug(f"List_of_files received: {list_of_files}") + + + # Handle file types + def handle_file_types(s3_url_of_file, s3_bucket, s3_key): + + try: + # TODO: Product label received (THIS CAN BE LBLX ) + if s3_url_of_file.lower().endswith(".xml") and not s3_url_of_file.lower().endswith(".aux.xml") and not s3_url_of_file.lower().endswith("_aux.xml") : + logger.debug(f"Received product file: {s3_url_of_file}" ) + save_files_for_product_label(s3_url_of_file, s3_bucket_name, s3_key) + save_product_processing_status_in_database(s3_url_of_file, "INCOMPLETE") + + + # Data file received + elif not s3_url_of_file.lower().endswith("/"): # Not a directory + logger.info(f"Received data file: {s3_url_of_file}" ) + save_data_file_in_database(s3_url_of_file) + + except Exception as e: + logger.error(f"Error processing . Exception: {str(e)}") + raise e + + + # Extracts .fz to files to .fits + def extract_file(file_to_extract): + + logger.debug(f"Extraction file {file_to_extract}...") + + try: + os.system(f'funpack {os.path.normpath(file_to_extract)}') + logger.info(f'Unpacked file: {os.path.normpath(file_to_extract)}') + except Exception as e: + logger.error(f"Error extracting file: {file_to_extract}. Exception: {str(e)}") + raise e + + + + # Creates a mapping record in the database for product and relevant files + def save_product_data_file_mapping_in_database(s3_url_of_product_label, s3_url_of_data_file): + + logger.info(f"Saving product data file mapping {s3_url_of_product_label} --> {s3_url_of_data_file} in database") + sql = """ + REPLACE INTO product_data_file_mapping + ( + s3_url_of_product_label, + s3_url_of_data_file, + last_updated_epoch_time) + VALUES( + :s3_url_of_product_label_param, + :s3_url_of_data_file_param, + :last_updated_epoch_time_param + ) + """ + + s3_url_of_product_label_param = {'name':'s3_url_of_product_label_param', 'value':{'stringValue': s3_url_of_product_label}} + s3_url_of_data_file_param = {'name':'s3_url_of_data_file_param', 'value':{'stringValue': s3_url_of_data_file}} + last_updated_epoch_time_param = {'name':'last_updated_epoch_time_param', 'value':{'longValue': round(time.time()*1000)}} + + param_set = [s3_url_of_product_label_param, s3_url_of_data_file_param, last_updated_epoch_time_param] + + try: + response = rds_data.execute_statement( + resourceArn = db_clust_arn, + secretArn = db_secret_arn, + database = 'pds_nucleus', + sql = sql, + parameters = param_set) + + logger.debug(str(response)) + except Exception as e: + logger.error(f"Error updating product_data_file_mapping table. Exception: {str(e)}") + raise e + + # Creates a record for product + def save_product_processing_status_in_database(s3_url_of_product_label, processing_status): + + logger.debug(f"Saving product processing status for: {s3_url_of_product_label} in database") + + sql = """ + REPLACE INTO product + ( + s3_url_of_product_label, + processing_status, + last_updated_epoch_time) + VALUES( + :s3_url_of_product_label_param, + :processing_status_param, + :last_updated_epoch_time_param + ) + """ + + s3_url_of_product_label_param = {'name':'s3_url_of_product_label_param', 'value':{'stringValue': s3_url_of_product_label}} + processing_status_param = {'name':'processing_status_param', 'value':{'stringValue': processing_status}} + last_updated_epoch_time_param = {'name':'last_updated_epoch_time_param', 'value':{'longValue': round(time.time()*1000)}} + + param_set = [s3_url_of_product_label_param, processing_status_param, last_updated_epoch_time_param] + + try: + response = rds_data.execute_statement( + resourceArn = db_clust_arn, + secretArn = db_secret_arn, + database = 'pds_nucleus', + sql = sql, + parameters = param_set) + + print(str(response)) + except Exception as e: + logger.error(f"Error writing to product table. Exception: {str(e)}") + raise e + + # Creates a record for data file + def save_data_file_in_database(s3_url_of_data_file): + + logger.debug(f"Saving data file name in database: {s3_url_of_data_file} in database") + + sql = """ + REPLACE INTO data_file + ( + s3_url_of_data_file, + last_updated_epoch_time) + VALUES( + :s3_url_of_data_file_param, + :last_updated_epoch_time_param + ) + """ + + + s3_url_of_data_file_param = {'name':'s3_url_of_data_file_param', 'value':{'stringValue': s3_url_of_data_file}} + last_updated_epoch_time_param = {'name':'last_updated_epoch_time_param', 'value':{'longValue': round(time.time()*1000)}} + + param_set = [s3_url_of_data_file_param, last_updated_epoch_time_param] + + try: + response = rds_data.execute_statement( + resourceArn = db_clust_arn, + secretArn = db_secret_arn, + database = 'pds_nucleus', + sql = sql, + parameters = param_set) + + logger.info(str(response)) + + except Exception as e: + logger.error(f"Error updating data_file table. Exception: {str(e)}") + raise e + + + # Creates a record for product label + def save_files_for_product_label(s3_url_of_product_label, bucket, key): + s3_base_dir = s3_url_of_product_label.rsplit('/',1)[0] + + try: + s3_response = s3_client.get_object(Bucket=bucket, Key=key) + + except Exception as e: + logger.error(f"Error getting S3 object: for bucker: {bucket} and key: {key}. Exception: {str(e)}") + raise e + + try: + # Get the Body object in the S3 get_object() response + s3_object_body = s3_response.get('Body') + + # Read the data in bytes format and convert it to string + content_str = s3_object_body.read().decode() + + # parse xml + xmldoc = minidom.parseString(content_str) + expected_files_from_product_label = xmldoc.getElementsByTagName('file_name') + + for x in expected_files_from_product_label: + s3_url_of_data_file = s3_base_dir + "/" + x.firstChild.nodeValue + save_product_data_file_mapping_in_database(s3_url_of_product_label, s3_url_of_data_file) + + except Exception as e: + logger.error(f"Error handling missing files for product label: {s3_url_of_product_label}. Exception: {str(e)}") + raise e -def lambda_handler(event, context): - """ Lambda Handler """ logger.info(f"Lambda Request ID: {context.aws_request_id}") logger.info(f"Event: {event}") @@ -43,7 +287,7 @@ def lambda_handler(event, context): task_id = resource_list[1] exec_id = resource_list[3] - prefix = f"Detailed-Reports/{task_id}/{exec_id}/{exec_id}.files-transferred-" + prefix = f"Detailed-Reports/{task_id}/{exec_id}/{exec_id}.files-verified-" datasync_reports_s3_bucket = s3.Bucket(datasync_reports_s3_bucket_name) @@ -53,16 +297,16 @@ def lambda_handler(event, context): transfer_report_file_content = transfer_report.get()['Body'].read().decode('utf-8') transfer_report_json_content = json.loads(transfer_report_file_content) - trasfered_file_obj_list = transfer_report_json_content['Transferred'] + verified_file_obj_list = transfer_report_json_content['Verified'] - logger.debug(f"trasfered_file_obj_list: {trasfered_file_obj_list}") + logger.debug(f"verified_file_obj_list: {verified_file_obj_list}") - for file_obj in trasfered_file_obj_list: + for file_obj in verified_file_obj_list: obj_name = file_obj['RelativePath'] obj_type = file_obj['SrcMetadata']['Type'] - if obj_type == 'Regular': # Not a directory + if obj_type == 'Regular': # Not a directory if obj_name.endswith('.fz'): file_to_extract = f"/mnt/data/{s3_bucket_name}" + obj_name @@ -80,20 +324,20 @@ def lambda_handler(event, context): logger.debug(f"List_of_files received: {list_of_files}") - +# Handle file types def handle_file_types(s3_url_of_file, s3_bucket, s3_key): - """ Invokes functions based on the file type """ try: # TODO: Product label received (THIS CAN BE LBLX ) - if s3_url_of_file.lower().endswith(".xml") and not s3_url_of_file.lower().endswith(".aux.xml"): - logger.debug(f"Received product file: {s3_url_of_file}") + if s3_url_of_file.lower().endswith(".xml") and not s3_url_of_file.lower().endswith(".aux.xml") and not s3_url_of_file.lower().endswith("_aux.xml") : + logger.debug(f"Received product file: {s3_url_of_file}" ) + save_files_for_product_label(s3_url_of_file, s3_bucket_name, s3_key) save_product_processing_status_in_database(s3_url_of_file, "INCOMPLETE") - save_files_for_product_label(s3_url_of_file, s3_bucket, s3_key) + # Data file received - elif not s3_url_of_file.lower().endswith("/"): # Not a directory - logger.info(f"Received data file: {s3_url_of_file}") + elif not s3_url_of_file.lower().endswith("/"): # Not a directory + logger.info(f"Received data file: {s3_url_of_file}" ) save_data_file_in_database(s3_url_of_file) except Exception as e: @@ -101,8 +345,8 @@ def handle_file_types(s3_url_of_file, s3_bucket, s3_key): raise e +# Extracts .fz to files to .fits def extract_file(file_to_extract): - """ Extracts .fz to files to .fits """ logger.debug(f"Extraction file {file_to_extract}...") @@ -114,8 +358,9 @@ def extract_file(file_to_extract): raise e + +# Creates a mapping record in the database for product and relevant files def save_product_data_file_mapping_in_database(s3_url_of_product_label, s3_url_of_data_file): - """ Creates a mapping record in the database for product and relevant files """ logger.info(f"Saving product data file mapping {s3_url_of_product_label} --> {s3_url_of_data_file} in database") sql = """ @@ -131,30 +376,27 @@ def save_product_data_file_mapping_in_database(s3_url_of_product_label, s3_url_o ) """ - s3_url_of_product_label_param = {'name': 's3_url_of_product_label_param', - 'value': {'stringValue': s3_url_of_product_label}} - s3_url_of_data_file_param = {'name': 's3_url_of_data_file_param', 'value': {'stringValue': s3_url_of_data_file}} - last_updated_epoch_time_param = {'name': 'last_updated_epoch_time_param', - 'value': {'longValue': round(time.time() * 1000)}} + s3_url_of_product_label_param = {'name':'s3_url_of_product_label_param', 'value':{'stringValue': s3_url_of_product_label}} + s3_url_of_data_file_param = {'name':'s3_url_of_data_file_param', 'value':{'stringValue': s3_url_of_data_file}} + last_updated_epoch_time_param = {'name':'last_updated_epoch_time_param', 'value':{'longValue': round(time.time()*1000)}} param_set = [s3_url_of_product_label_param, s3_url_of_data_file_param, last_updated_epoch_time_param] try: response = rds_data.execute_statement( - resourceArn=db_clust_arn, - secretArn=db_secret_arn, - database='pds_nucleus', - sql=sql, - parameters=param_set) + resourceArn = db_clust_arn, + secretArn = db_secret_arn, + database = 'pds_nucleus', + sql = sql, + parameters = param_set) logger.debug(str(response)) except Exception as e: logger.error(f"Error updating product_data_file_mapping table. Exception: {str(e)}") raise e - +# Creates a record for product def save_product_processing_status_in_database(s3_url_of_product_label, processing_status): - """ Creates a record for product """ logger.debug(f"Saving product processing status for: {s3_url_of_product_label} in database") @@ -171,30 +413,27 @@ def save_product_processing_status_in_database(s3_url_of_product_label, processi ) """ - s3_url_of_product_label_param = {'name': 's3_url_of_product_label_param', - 'value': {'stringValue': s3_url_of_product_label}} - processing_status_param = {'name': 'processing_status_param', 'value': {'stringValue': processing_status}} - last_updated_epoch_time_param = {'name': 'last_updated_epoch_time_param', - 'value': {'longValue': round(time.time() * 1000)}} + s3_url_of_product_label_param = {'name':'s3_url_of_product_label_param', 'value':{'stringValue': s3_url_of_product_label}} + processing_status_param = {'name':'processing_status_param', 'value':{'stringValue': processing_status}} + last_updated_epoch_time_param = {'name':'last_updated_epoch_time_param', 'value':{'longValue': round(time.time()*1000)}} param_set = [s3_url_of_product_label_param, processing_status_param, last_updated_epoch_time_param] try: response = rds_data.execute_statement( - resourceArn=db_clust_arn, - secretArn=db_secret_arn, - database='pds_nucleus', - sql=sql, - parameters=param_set) + resourceArn = db_clust_arn, + secretArn = db_secret_arn, + database = 'pds_nucleus', + sql = sql, + parameters = param_set) print(str(response)) except Exception as e: logger.error(f"Error writing to product table. Exception: {str(e)}") raise e - +# Creates a record for data file def save_data_file_in_database(s3_url_of_data_file): - """ Creates a record for data file """ logger.debug(f"Saving data file name in database: {s3_url_of_data_file} in database") @@ -209,19 +448,19 @@ def save_data_file_in_database(s3_url_of_data_file): ) """ - s3_url_of_data_file_param = {'name': 's3_url_of_data_file_param', 'value': {'stringValue': s3_url_of_data_file}} - last_updated_epoch_time_param = {'name': 'last_updated_epoch_time_param', - 'value': {'longValue': round(time.time() * 1000)}} + + s3_url_of_data_file_param = {'name':'s3_url_of_data_file_param', 'value':{'stringValue': s3_url_of_data_file}} + last_updated_epoch_time_param = {'name':'last_updated_epoch_time_param', 'value':{'longValue': round(time.time()*1000)}} param_set = [s3_url_of_data_file_param, last_updated_epoch_time_param] try: response = rds_data.execute_statement( - resourceArn=db_clust_arn, - secretArn=db_secret_arn, - database='pds_nucleus', - sql=sql, - parameters=param_set) + resourceArn = db_clust_arn, + secretArn = db_secret_arn, + database = 'pds_nucleus', + sql = sql, + parameters = param_set) logger.info(str(response)) @@ -230,10 +469,9 @@ def save_data_file_in_database(s3_url_of_data_file): raise e +# Creates a record for product label def save_files_for_product_label(s3_url_of_product_label, bucket, key): - """ Creates a record for product label """ - - s3_base_dir = s3_url_of_product_label.rsplit('/', 1)[0] + s3_base_dir = s3_url_of_product_label.rsplit('/',1)[0] try: s3_response = s3_client.get_object(Bucket=bucket, Key=key) @@ -260,3 +498,4 @@ def save_files_for_product_label(s3_url_of_product_label, bucket, key): except Exception as e: logger.error(f"Error handling missing files for product label: {s3_url_of_product_label}. Exception: {str(e)}") raise e +