From 6d3a178bee9a9c51e5b85ba1530dda84b803f14a Mon Sep 17 00:00:00 2001 From: javedhassans Date: Tue, 19 Nov 2024 13:12:56 +0100 Subject: [PATCH] fixed week1 --- notebooks/week1/00.dataexploration.ipynb | 8 +-- notebooks/week1/01.dataExploraton.py | 73 ++++++------------------ src/childHealth/config.py | 12 ++-- 3 files changed, 26 insertions(+), 67 deletions(-) diff --git a/notebooks/week1/00.dataexploration.ipynb b/notebooks/week1/00.dataexploration.ipynb index 2ccea35..d95f764 100644 --- a/notebooks/week1/00.dataexploration.ipynb +++ b/notebooks/week1/00.dataexploration.ipynb @@ -390,11 +390,11 @@ "metadata": {}, "outputs": [], "source": [ - "# Merge the aggregated actigraphy features with the train data\n", - "combined_df = pd.merge(train_df, aggregated_actigraphy_df, on=\"id\", how=\"left\")\n", + "# # Merge the aggregated actigraphy features with the train data\n", + "# combined_df = pd.merge(train_df, aggregated_actigraphy_df, on=\"id\", how=\"left\")\n", "\n", - "# Inspect the combined DataFrame\n", - "print(combined_df.head())" + "# # Inspect the combined DataFrame\n", + "# print(combined_df.head())" ] }, { diff --git a/notebooks/week1/01.dataExploraton.py b/notebooks/week1/01.dataExploraton.py index 84a0b5a..ccf16de 100644 --- a/notebooks/week1/01.dataExploraton.py +++ b/notebooks/week1/01.dataExploraton.py @@ -14,6 +14,7 @@ from concurrent.futures import ThreadPoolExecutor import pandas as pd +from pyspark.dbutils import DBUtils from pyspark.sql import SparkSession from childHealth.config import ProjectConfig @@ -22,34 +23,28 @@ # Initialize Spark session spark = SparkSession.builder.getOrCreate() +dbutils = DBUtils(spark) # Define original paths dirname_train_ts = "/Volumes/mlops_students/javedhassi/data/series_train.parquet" dirname_test_ts = "/Volumes/mlops_students/javedhassi/data/series_test.parquet" -# COMMAND ---------- - # Load project configuration from YAML file config = ProjectConfig.from_yaml(config_path="../../project_config.yml") num_features = config.num_features cat_features = config.cat_features -# COMMAND ---------- def process_file(filename, dirname): filepath = os.path.join(dirname, filename, "part-0.parquet") - df = spark.read.parquet(filepath) - df = df.drop("step") - # Ensure 'id' column is included + df = spark.read.parquet(filepath).drop("step") if "id" not in df.columns: - df = df.withColumn("id", df["relative_date_PCIAT"]) # Use an existing column or create a new one + df = df.withColumn("id", df["relative_date_PCIAT"]) return df.toPandas(), filename.split("=")[1] def load_time_series(dirname) -> pd.DataFrame: - # List all subdirectories in the specified path directories = [file.path for file in dbutils.fs.ls(dirname) if file.path.endswith("/")] - results = [] with ThreadPoolExecutor() as executor: futures = {executor.submit(process_file, path.split("/")[-2], dirname): path for path in directories} @@ -57,25 +52,22 @@ def load_time_series(dirname) -> pd.DataFrame: result = future.result() results.append(result) print(f"Processed {i + 1}/{len(directories)} files") - - # Separate stats and identifiers stats, indexes = zip(*results, strict=False) if results else ([], []) - - # Create DataFrame with statistics and identifiers combined_df = pd.concat([df for df in stats], ignore_index=True) combined_df["id"] = indexes - return combined_df -# COMMAND ---------- +def update(df): + for c in cat_features: + df[c] = df[c].fillna("Missing").astype("category") + return df + # Load time series data train_ts = load_time_series(dirname_train_ts) test_ts = load_time_series(dirname_test_ts) -# COMMAND ---------- - # Load train and test CSV files with Spark train = spark.read.csv("/Volumes/mlops_students/javedhassi/data/childHealth.csv", header=True, inferSchema=True) test = spark.read.csv("/Volumes/mlops_students/javedhassi/data/test.csv", header=True, inferSchema=True) @@ -85,69 +77,36 @@ def load_time_series(dirname) -> pd.DataFrame: test_pd = test.toPandas() # Ensure 'id' column exists in both DataFrames -if "id" not in train_pd.columns: - train_pd["id"] = train_pd.index -if "id" not in test_pd.columns: - test_pd["id"] = test_pd.index - -# COMMAND ---------- +train_pd["id"] = train_pd.get("id", train_pd.index) +test_pd["id"] = test_pd.get("id", test_pd.index) # Merge the data train_merged = pd.merge(train_pd, train_ts, how="left", on="id") test_merged = pd.merge(test_pd, test_ts, how="left", on="id") -# Check the result -print(train_merged.head()) -print(test_merged.head()) - -# COMMAND ---------- - # Update the list of numerical features to include time series columns time_series_cols = train_ts.columns.tolist() -time_series_cols.remove("id") # Temporarily remove 'id' column from the list of time series columns +time_series_cols.remove("id") num_features += time_series_cols -# COMMAND ---------- - - -def update(df): - for c in cat_features: - df[c] = df[c].fillna("Missing") - df[c] = df[c].astype("category") - return df - - -# COMMAND ---------- - # Update the train and test DataFrames train_merged = update(train_merged) test_merged = update(test_merged) -# COMMAND ---------- - -# # Include 'id' column back in the numerical features if needed -# num_features.append('id') - # Check the updated DataFrames print(train_merged.head()) print(test_merged.head()) - -# COMMAND ---------- -# Read the Parquet file +# Read and show the Parquet file df = spark.read.parquet( "/Volumes/mlops_students/javedhassi/data/series_train.parquet/id=00115b9f/part-0.parquet", header=True, inferSchema=True, ) - -# Show the DataFrame df.show() -# COMMAND ---------- - +# Convert to Pandas DataFrame df_pandas = df.toPandas() -# COMMAND ---------- -train = spark.read.csv("/Volumes/mlops_students/javedhassi/data/childHealth.csv", header=True, inferSchema=True) + +# Filter and show specific train data train.filter(train.id == "00115b9f").show() -# COMMAND ---------- diff --git a/src/childHealth/config.py b/src/childHealth/config.py index a3a09af..06222ed 100644 --- a/src/childHealth/config.py +++ b/src/childHealth/config.py @@ -7,8 +7,8 @@ class ProjectConfig(BaseModel): catalog_name: str schema_name: str - random_forest_parameters: Dict[str, Any] # Dictionary to hold Random Forest parameters - lgb_parameters: Dict[str, Any] # Dictionary to hold LightGBM parameters + random_forest_parameters: Dict[str, Any] + lgb_parameters: Dict[str, Any] num_features: List[str] cat_features: List[str] target: str @@ -20,12 +20,12 @@ def from_yaml(cls, config_path: str) -> "ProjectConfig": with open(config_path, "r") as f: config_dict = yaml.safe_load(f) return cls(**config_dict) - except FileNotFoundError: - raise FileNotFoundError(f"Configuration file not found: {config_path}") + except FileNotFoundError as e: + raise FileNotFoundError(f"Configuration file not found: {config_path}") from e except yaml.YAMLError as e: - raise ValueError(f"Error parsing YAML file: {e}") + raise ValueError(f"Error parsing YAML file: {e}") from e except ValidationError as e: - raise ValueError(f"Validation error: {e}") + raise ValueError(f"Validation error: {e}") from e # Example usage: