-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy path01_feature_engineering.py
153 lines (108 loc) · 4.58 KB
/
01_feature_engineering.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# Databricks notebook source
# MAGIC %md
# MAGIC ## Churn Prediction Feature Engineering
# MAGIC
# MAGIC <img src="https://github.com/RafiKurlansik/laughing-garbanzo/blob/main/step1.png?raw=true">
# COMMAND ----------
# MAGIC %run ./00_includes
# COMMAND ----------
# MAGIC %md
# MAGIC ### Featurization Logic
# MAGIC
# MAGIC This is a fairly clean dataset so we'll just do some one-hot encoding, and clean up the column names afterward.
# COMMAND ----------
# Read into Spark
telcoDF = spark.table(f"{database_name}.bronze_customers")
display(telcoDF)
# COMMAND ----------
# import pyspark.pandas as ps
# tf = telcoDF.to_pandas_on_spark()
# dummy = ps.get_dummies(tf,
# columns=['gender', 'partner', 'dependents',
# 'phoneService', 'multipleLines', 'internetService',
# 'onlineSecurity', 'onlineBackup', 'deviceProtection',
# 'techSupport', 'streamingTV', 'streamingMovies',
# 'contract', 'paperlessBilling', 'paymentMethod'],
# dtype = 'int64'
# )
# COMMAND ----------
# MAGIC %md
# MAGIC ##### Pandas API on Spark
# COMMAND ----------
from databricks.feature_store import feature_table
import pyspark.pandas as ps
def compute_churn_features(data):
# Convert to pandas
data = data.to_pandas_on_spark()
# OHE
data = ps.get_dummies(data,
columns=['gender', 'partner', 'dependents',
'phoneService', 'multipleLines', 'internetService',
'onlineSecurity', 'onlineBackup', 'deviceProtection',
'techSupport', 'streamingTV', 'streamingMovies',
'contract', 'paperlessBilling', 'paymentMethod'],dtype = 'int64')
# Convert label to int and rename column
data['churnString'] = data['churnString'].map({'Yes': 1, 'No': 0})
data = data.astype({'churnString': 'int32'})
data = data.rename(columns = {'churnString': 'churn'})
# Clean up column names
data.columns = data.columns.str.replace(' ', '', regex=True)
data.columns = data.columns.str.replace('(', '-', regex=True)
data.columns = data.columns.str.replace(')', '', regex=True)
# Drop missing values
data = data.dropna()
data = data.to_spark()
return data
# COMMAND ----------
# MAGIC %md
# MAGIC #### Compute and write features
# MAGIC - [Feature Store Python API Reference](https://docs.databricks.com/dev-tools/api/python/latest/index.html)
# MAGIC - [Work with Feature Store Tables](https://docs.databricks.com/applications/machine-learning/feature-store/feature-tables.html#register-an-existing-delta-table-as-a-feature-table)
# COMMAND ----------
# # clean up feature store
# from databricks.feature_store import FeatureStoreClient
# fs = FeatureStoreClient()
# fs._catalog_client.delete_feature_table(f"{database_name}.churn_features")
# COMMAND ----------
# MAGIC %sql
# MAGIC --drop table churn_features
# COMMAND ----------
from databricks.feature_store import FeatureStoreClient
fs = FeatureStoreClient()
# fs._catalog_client.delete_feature_table(f"{database_name}.churn_features")
churn_features_df = compute_churn_features(telcoDF)
churn_feature_table = fs.create_table(
name=f'{database_name}.churn_features',
primary_keys=['customerID'],
df=churn_features_df,
description='These features are derived from the ibm_telco_churn.bronze_customers table in the lakehouse. I created dummy variables for the categorical columns, cleaned up their names, and added a boolean flag for whether the customer churned or not. No aggregations were performed.'
)
fs.write_table(
name=f'{database_name}.churn_features',
df=churn_features_df,
mode='overwrite'
)
# COMMAND ----------
# MAGIC %md
# MAGIC As an alternative we could always write to Delta Lake:
# COMMAND ----------
# # Write out silver-level data to Delta lake
# trainingDF = spark.createDataFrame(training_df)
# trainingDF.write.format('delta').mode('overwrite').save(silver_tbl_path)
# # Create silver table
# spark.sql('''
# CREATE TABLE `{}`.{}
# USING DELTA
# LOCATION '{}'
# '''.format(database_name,silver_tbl_name,silver_tbl_path))
# # Drop customer ID for AutoML
# automlDF = trainingDF.drop('customerID')
# # Write out silver-level data to Delta lake
# automlDF.write.format('delta').mode('overwrite').save(automl_tbl_path)
# # Create silver table
# _ = spark.sql('''
# CREATE TABLE `{}`.{}
# USING DELTA
# LOCATION '{}'
# '''.format(database_name,automl_tbl_name,automl_tbl_path))
# COMMAND ----------