14
14
from concurrent .futures import ThreadPoolExecutor
15
15
16
16
import pandas as pd
17
+ from pyspark .dbutils import DBUtils
17
18
from pyspark .sql import SparkSession
18
19
19
20
from childHealth .config import ProjectConfig
22
23
23
24
# Initialize Spark session
24
25
spark = SparkSession .builder .getOrCreate ()
26
+ dbutils = DBUtils (spark )
25
27
26
28
# Define original paths
27
29
dirname_train_ts = "/Volumes/mlops_students/javedhassi/data/series_train.parquet"
28
30
dirname_test_ts = "/Volumes/mlops_students/javedhassi/data/series_test.parquet"
29
31
30
- # COMMAND ----------
31
-
32
32
# Load project configuration from YAML file
33
33
config = ProjectConfig .from_yaml (config_path = "../../project_config.yml" )
34
34
num_features = config .num_features
35
35
cat_features = config .cat_features
36
36
37
37
38
- # COMMAND ----------
39
38
def process_file (filename , dirname ):
40
39
filepath = os .path .join (dirname , filename , "part-0.parquet" )
41
- df = spark .read .parquet (filepath )
42
- df = df .drop ("step" )
43
- # Ensure 'id' column is included
40
+ df = spark .read .parquet (filepath ).drop ("step" )
44
41
if "id" not in df .columns :
45
- df = df .withColumn ("id" , df ["relative_date_PCIAT" ]) # Use an existing column or create a new one
42
+ df = df .withColumn ("id" , df ["relative_date_PCIAT" ])
46
43
return df .toPandas (), filename .split ("=" )[1 ]
47
44
48
45
49
46
def load_time_series (dirname ) -> pd .DataFrame :
50
- # List all subdirectories in the specified path
51
47
directories = [file .path for file in dbutils .fs .ls (dirname ) if file .path .endswith ("/" )]
52
-
53
48
results = []
54
49
with ThreadPoolExecutor () as executor :
55
50
futures = {executor .submit (process_file , path .split ("/" )[- 2 ], dirname ): path for path in directories }
56
51
for i , future in enumerate (futures ):
57
52
result = future .result ()
58
53
results .append (result )
59
54
print (f"Processed { i + 1 } /{ len (directories )} files" )
60
-
61
- # Separate stats and identifiers
62
55
stats , indexes = zip (* results , strict = False ) if results else ([], [])
63
-
64
- # Create DataFrame with statistics and identifiers
65
56
combined_df = pd .concat ([df for df in stats ], ignore_index = True )
66
57
combined_df ["id" ] = indexes
67
-
68
58
return combined_df
69
59
70
60
71
- # COMMAND ----------
61
+ def update (df ):
62
+ for c in cat_features :
63
+ df [c ] = df [c ].fillna ("Missing" ).astype ("category" )
64
+ return df
65
+
72
66
73
67
# Load time series data
74
68
train_ts = load_time_series (dirname_train_ts )
75
69
test_ts = load_time_series (dirname_test_ts )
76
70
77
- # COMMAND ----------
78
-
79
71
# Load train and test CSV files with Spark
80
72
train = spark .read .csv ("/Volumes/mlops_students/javedhassi/data/childHealth.csv" , header = True , inferSchema = True )
81
73
test = spark .read .csv ("/Volumes/mlops_students/javedhassi/data/test.csv" , header = True , inferSchema = True )
@@ -85,69 +77,36 @@ def load_time_series(dirname) -> pd.DataFrame:
85
77
test_pd = test .toPandas ()
86
78
87
79
# Ensure 'id' column exists in both DataFrames
88
- if "id" not in train_pd .columns :
89
- train_pd ["id" ] = train_pd .index
90
- if "id" not in test_pd .columns :
91
- test_pd ["id" ] = test_pd .index
92
-
93
- # COMMAND ----------
80
+ train_pd ["id" ] = train_pd .get ("id" , train_pd .index )
81
+ test_pd ["id" ] = test_pd .get ("id" , test_pd .index )
94
82
95
83
# Merge the data
96
84
train_merged = pd .merge (train_pd , train_ts , how = "left" , on = "id" )
97
85
test_merged = pd .merge (test_pd , test_ts , how = "left" , on = "id" )
98
86
99
- # Check the result
100
- print (train_merged .head ())
101
- print (test_merged .head ())
102
-
103
- # COMMAND ----------
104
-
105
87
# Update the list of numerical features to include time series columns
106
88
time_series_cols = train_ts .columns .tolist ()
107
- time_series_cols .remove ("id" ) # Temporarily remove 'id' column from the list of time series columns
89
+ time_series_cols .remove ("id" )
108
90
num_features += time_series_cols
109
91
110
- # COMMAND ----------
111
-
112
-
113
- def update (df ):
114
- for c in cat_features :
115
- df [c ] = df [c ].fillna ("Missing" )
116
- df [c ] = df [c ].astype ("category" )
117
- return df
118
-
119
-
120
- # COMMAND ----------
121
-
122
92
# Update the train and test DataFrames
123
93
train_merged = update (train_merged )
124
94
test_merged = update (test_merged )
125
95
126
- # COMMAND ----------
127
-
128
- # # Include 'id' column back in the numerical features if needed
129
- # num_features.append('id')
130
-
131
96
# Check the updated DataFrames
132
97
print (train_merged .head ())
133
98
print (test_merged .head ())
134
99
135
-
136
- # COMMAND ----------
137
- # Read the Parquet file
100
+ # Read and show the Parquet file
138
101
df = spark .read .parquet (
139
102
"/Volumes/mlops_students/javedhassi/data/series_train.parquet/id=00115b9f/part-0.parquet" ,
140
103
header = True ,
141
104
inferSchema = True ,
142
105
)
143
-
144
- # Show the DataFrame
145
106
df .show ()
146
107
147
- # COMMAND ----------
148
-
108
+ # Convert to Pandas DataFrame
149
109
df_pandas = df .toPandas ()
150
- # COMMAND ----------
151
- train = spark . read . csv ( "/Volumes/mlops_students/javedhassi/data/childHealth.csv" , header = True , inferSchema = True )
110
+
111
+ # Filter and show specific train data
152
112
train .filter (train .id == "00115b9f" ).show ()
153
- # COMMAND ----------
0 commit comments