From 1d42ec2999f7b3aee722f6eac6053fe2aa576203 Mon Sep 17 00:00:00 2001 From: adithya_dinesh Date: Tue, 26 Dec 2023 14:32:19 +0530 Subject: [PATCH] Memory issue addressed --- urgent_data_metrics/imp_project_metrics.py | 171 +++++++++------------ 1 file changed, 69 insertions(+), 102 deletions(-) diff --git a/urgent_data_metrics/imp_project_metrics.py b/urgent_data_metrics/imp_project_metrics.py index f07ac07..4ba4438 100644 --- a/urgent_data_metrics/imp_project_metrics.py +++ b/urgent_data_metrics/imp_project_metrics.py @@ -169,10 +169,10 @@ def melt(df: DataFrame,id_vars: Iterable[str], value_vars: Iterable[str], ]), True), ]) - - def searchEntities(url,ids_list): try: + returnData = {} + apiSuccessFlag = False headers = { 'Authorization': config.get('API_HEADERS', 'authorization_access_token'), 'content-Type': 'application/json' @@ -188,7 +188,7 @@ def searchEntities(url,ids_list): response = requests.request("POST", url, headers=headers, data=payload) delta_ids = [] entity_name_mapping = {} - + if response.status_code == 200: # convert the response to dictionary response = response.json() @@ -205,65 +205,19 @@ def searchEntities(url,ids_list): # check with the input data to make sure there are no missing data from loc search delta_ids = list(set(ids_list) - set(ids_from_api)) + apiSuccessFlag = True else : delta_ids = ids_list - # if there are missing data , fetch the data from mongo - - if len(delta_ids) > 0 : - # aggregate mongo query to fetch data from mongo - delta_loc = projectsCollec.aggregate([ - { - "$match": { - "userProfile.userLocations": { - "$elemMatch": { - "id": { - "$in": delta_ids - } - } - } - } - }, - { - "$unwind": "$userProfile.userLocations" - }, - { - "$match": { - "userProfile.userLocations.id": { - "$in": delta_ids - } - } - }, - { - "$sort": { - "createdAt": -1 - } - }, - { - "$group": { - "_id": "$userProfile.userLocations.id", - "mostRecentDocument": { "$first": "$$ROOT" } - } - }, - { - "$replaceRoot": { "newRoot": "$mostRecentDocument" } - }, - { - "$project": { - "_id": 1, - "userProfile.userLocations": 1 - } - } - ]) - # add delta entities to master variable - for index in delta_loc: - entity_name_mapping[index['userProfile']["userLocations"]['id']] = index['userProfile']["userLocations"]['name'] - - return entity_name_mapping + returnData['mapping'] = entity_name_mapping + returnData['apiSuccessFlag'] = apiSuccessFlag + returnData['delta'] = delta_ids + return returnData except Exception as e: errorLogger.error(e,exc_info=True) projects_df = spark.createDataFrame(projects_cursorMongo,projects_schema) + projects_df = projects_df.withColumn( "project_evidence_status", F.when( @@ -309,6 +263,20 @@ def searchEntities(url,ids_list): projects_df["district"], projects_df["state"], ) +# DataFrame for user locations values of State and Districts only +userLocations_df = melt(projects_df, + id_vars=["_id","exploded_userLocations.name","exploded_userLocations.type","exploded_userLocations.id"], + value_vars=["exploded_userLocations.code"] + ).select("_id","id","name","value","type").filter((col("type") == "state") | (col("type") == "district")).dropDuplicates() + +# Fetch only Latest Data of Locations from the DF +userLocations_df = userLocations_df.groupBy("id").agg( + first("_id", ignorenulls=True).alias("projectId"), + first("name", ignorenulls=True).alias("name"), + first("value", ignorenulls=True).alias("value"), + first("type", ignorenulls=True).alias("type") +) + projects_df_final = projects_df_final.dropDuplicates() district_final_df = projects_df_final.groupBy("state","district")\ @@ -335,50 +303,49 @@ def searchEntities(url,ids_list): # call function to get the entity from location master response = searchEntities(config.get("API_ENDPOINTS", "base_url") + config.get("API_ENDPOINTS", "location_search"),ids_list) -if response : - # Convert dictionary to list of tuples - data_tuples = list(response.items()) - - # Define the schema - state_schema = StructType([StructField("id", StringType(), True), StructField("state_name", StringType(), True)]) - district_schema = StructType([StructField("id", StringType(), True), StructField("district_name", StringType(), True)]) - - # Create a DataFrame - state_id_mapping = spark.createDataFrame(data_tuples, schema=state_schema) - - # Create a DataFrame - district_id_mapping = spark.createDataFrame(data_tuples, schema=district_schema) - - district_final_df = district_final_df.join(state_id_mapping, district_final_df["state"] == state_id_mapping["id"], "left") - - district_final_df = district_final_df.join(district_id_mapping, district_final_df["district"] == district_id_mapping["id"], "left") - - final_data_to_csv = district_final_df.select("state_name","district_name","Total_Micro_Improvement_Projects","Total_Micro_Improvement_Started","Total_Micro_Improvement_InProgress","Total_Micro_Improvement_Submitted","Total_Micro_Improvement_Submitted_With_Evidence").sort("state_name","district_name") - - # DF To file - local_path = config.get("COMMON", "nvsk_imp_projects_data_local_path") - blob_path = config.get("COMMON", "nvsk_imp_projects_data_blob_path") - final_data_to_csv.coalesce(1).write.format("csv").option("header",True).mode("overwrite").save(local_path) - final_data_to_csv.unpersist() - - # Renaming a file - path = local_path - extension = 'csv' - os.chdir(path) - result = glob.glob(f'*.{extension}') - os.rename(f'{path}' + f'{result[0]}', f'{path}' + 'data.csv') - - # Uploading file to Cloud - cloud_init.upload_to_cloud(blob_Path = blob_path, local_Path = local_path, file_Name = 'data.csv') - - print("file got uploaded to Cloud.") - print("DONE") - -else: - try: - raise ValueError("Entity Search API failed.") - except Exception as e: - # Log the custom error message along with exception information - error_message = "API error: {}".format(e) - errorLogger.error(error_message, exc_info=True) - \ No newline at end of file +data_tuples = [] #empty List for creating the DF + +# if Location search API is success get the mapping details from API +if response['apiSuccessFlag']: + # Convert dictionary to list of tuples + data_tuples = list(response['mapping'].items()) + +# if any delta ids found , fetch the details from DF +if response['delta']: + delta_ids_from_response = userLocations_df.filter(col("id").isin(response['delta'])) + for row in delta_ids_from_response.collect() : + data_tuples.append((row['id'],row['name'])) + +# Define the schema for State details +state_schema = StructType([StructField("id", StringType(), True), StructField("state_name", StringType(), True)]) + +# Define the schema for District details +district_schema = StructType([StructField("id", StringType(), True), StructField("district_name", StringType(), True)]) + +# Create a DataFrame for State +state_id_mapping = spark.createDataFrame(data_tuples, schema=state_schema) + +# Create a DataFrame for District +district_id_mapping = spark.createDataFrame(data_tuples, schema=district_schema) + +# Join to get the State names from State ids +district_final_df = district_final_df.join(state_id_mapping, district_final_df["state"] == state_id_mapping["id"], "left") +# Join to get the State names from District ids +district_final_df = district_final_df.join(district_id_mapping, district_final_df["district"] == district_id_mapping["id"], "left") +# Select only relevant fields to prepare the final DF , Sort it wrt state names +final_data_to_csv = district_final_df.select("state_name","district_name","Total_Micro_Improvement_Projects","Total_Micro_Improvement_Started","Total_Micro_Improvement_InProgress","Total_Micro_Improvement_Submitted","Total_Micro_Improvement_Submitted_With_Evidence").sort("state_name","district_name") +# DF To file +local_path = config.get("COMMON", "nvsk_imp_projects_data_local_path") +blob_path = config.get("COMMON", "nvsk_imp_projects_data_blob_path") +final_data_to_csv.coalesce(1).write.format("csv").option("header",True).mode("overwrite").save(local_path) +final_data_to_csv.unpersist() +# Renaming a file +path = local_path +extension = 'csv' +os.chdir(path) +result = glob.glob(f'*.{extension}') +os.rename(f'{path}' + f'{result[0]}', f'{path}' + 'data.csv') +# Uploading file to Cloud +cloud_init.upload_to_cloud(blob_Path = blob_path, local_Path = local_path, file_Name = 'data.csv') +print("file got uploaded to Cloud.") +print("DONE") \ No newline at end of file