Skip to content

Commit

Permalink
Merge pull request #84 from NASA-PDS/lambda-updates
Browse files Browse the repository at this point in the history
Lambda improvements
  • Loading branch information
ramesh-maddegoda authored Feb 7, 2024
2 parents 7d93a2f + 04eb56e commit 6f4d851
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 20 deletions.
9 changes: 4 additions & 5 deletions src/pds/ingress/pds-nucleus-datasync-completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

rds_data = boto3.client('rds-data')


def lambda_handler(event, context):
""" Lambda Handler """

Expand All @@ -43,7 +42,7 @@ def lambda_handler(event, context):
task_id = resource_list[1]
exec_id = resource_list[3]

prefix = f"Detailed-Reports/{task_id}/{exec_id}/{exec_id}.files-transferred-"
prefix = f"Detailed-Reports/{task_id}/{exec_id}/{exec_id}.files-verified-"

datasync_reports_s3_bucket = s3.Bucket(datasync_reports_s3_bucket_name)

Expand All @@ -53,11 +52,11 @@ def lambda_handler(event, context):

transfer_report_file_content = transfer_report.get()['Body'].read().decode('utf-8')
transfer_report_json_content = json.loads(transfer_report_file_content)
trasfered_file_obj_list = transfer_report_json_content['Transferred']
verified_file_obj_list = transfer_report_json_content['Verified']

logger.debug(f"trasfered_file_obj_list: {trasfered_file_obj_list}")
logger.debug(f"verified_file_obj_list: {verified_file_obj_list}")

for file_obj in trasfered_file_obj_list:
for file_obj in verified_file_obj_list:

obj_name = file_obj['RelativePath']
obj_type = file_obj['SrcMetadata']['Type']
Expand Down
37 changes: 22 additions & 15 deletions src/pds/ingress/pds-nucleus-product-completion-checker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
==============================================
pds-nucleus-product-completion-checker-batch.py
==============================================
============================================================
pds-nucleus-product-completion-checker.py (batch processing)
============================================================ =
Lambda function to check if the staging S3 bucket has received a complete product
with all required files. This lambda function is triggered periodically.
Expand All @@ -18,6 +18,7 @@
import http.client
import base64
import ast
import uuid

from xml.dom import minidom

Expand All @@ -29,10 +30,10 @@
rds_data = boto3.client('rds-data')

mwaa_env_name = 'PDS-Nucleus-Airflow-Env'
dag_name = 'PDS_Registry_Use_Case_61_Messenger_Batch-logs'
mwaa_cli_command = 'dags trigger'

# Read environment variables from lambda configurations
dag_name = os.environ.get('AIRFLOW_DAG_NAME')
node_name = os.environ.get('NODE_NAME')
es_url = os.environ.get('ES_URL')
replace_prefix_with = os.environ.get('REPLACE_PREFIX_WITH')
Expand All @@ -47,7 +48,7 @@
def lambda_handler(event, context):
""" Main lambda handler """

logger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

logger.info(f"Lambda Request ID: {context.aws_request_id}")
Expand All @@ -65,11 +66,13 @@ def process_completed_products():

logger.debug("Checking completed products...")

sql = """
select distinct s3_url_of_product_label from product where processing_status = 'INCOMPLETE' and s3_url_of_product_label
NOT IN (select distinct s3_url_of_product_label from product_data_file_mapping
where s3_url_of_data_file
NOT IN (select s3_url_of_data_file from data_file));
sql = """
SELECT DISTINCT s3_url_of_product_label from product
WHERE processing_status = 'INCOMPLETE' and s3_url_of_product_label
NOT IN (SELECT s3_url_of_product_label from product_data_file_mapping
where s3_url_of_data_file
NOT IN (SELECT s3_url_of_data_file from data_file)) and s3_url_of_product_label
IN (SELECT s3_url_of_product_label from product_data_file_mapping);
"""

response = rds_data.execute_statement(
Expand Down Expand Up @@ -136,15 +139,15 @@ def submit_data_to_nucleus(list_of_product_labels_to_process):


def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_process):
""" Creates harvest manifest file and harvest config file and trigger Nucleus workflow """
""" Creates harvest manifest file and harvest config file and trigger Nucleus workflow """

logger.debug('List of product labels to process:' + str(list_of_product_labels_to_process))

efs_mount_path = os.environ.get('EFS_MOUNT_PATH')

harvest_config_dir = efs_mount_path + '/harvest-configs'

file_name = os.path.basename(list_of_product_labels_to_process[0].replace("s3:/", efs_mount_path, 1))
file_name = os.path.basename(list_of_product_labels_to_process[0].replace("s3:/", efs_mount_path, 1) )

harvest_manifest_content = ""
list_of_product_labels_to_process_with_file_paths = []
Expand All @@ -154,10 +157,13 @@ def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_proc
harvest_manifest_content = harvest_manifest_content + efs_product_label_file_location + '\n'
list_of_product_labels_to_process_with_file_paths.append(efs_product_label_file_location)

# Generate a random suffix for harvest config file name and manifest file name to avoid conflicting duplicate file names
random_suffix = uuid.uuid4().hex

try:
os.makedirs(harvest_config_dir, exist_ok=True)
harvest_config_file_path = harvest_config_dir + '/harvest_' + file_name + '.cfg'
harvest_manifest_file_path = harvest_config_dir + '/harvest_manifest_' + file_name + '.txt'
harvest_config_file_path = harvest_config_dir + '/harvest_' + file_name + '_' + random_suffix + '.cfg'
harvest_manifest_file_path = harvest_config_dir + '/harvest_manifest_' + file_name + '_' + random_suffix + '.txt'

logger.debug(f"Manifest content: {str(harvest_manifest_content)}")

Expand Down Expand Up @@ -189,6 +195,7 @@ def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_proc
logger.info(f"Created harvest config XML file: {harvest_config_file_path}")
except Exception as e:
logger.error(f"Error creating harvest config files in : {harvest_config_dir}. Exception: {str(e)}")
return

trigger_nucleus_workflow(harvest_manifest_file_path, harvest_config_file_path,
list_of_product_labels_to_process_with_file_paths)
Expand All @@ -197,7 +204,7 @@ def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_proc


def trigger_nucleus_workflow(harvest_manifest_file_path, pds_harvest_config_file, list_of_product_labels_to_process):
""" Triggers Nucleus workflow with parameters """
""" Triggers Nucleus workflow with parameters """

# Convert list to comma seperated list
delim = ","
Expand Down

0 comments on commit 6f4d851

Please sign in to comment.