Skip to content

Commit

Permalink
week5 updates - create dab
Browse files Browse the repository at this point in the history
  • Loading branch information
a0134m committed Nov 20, 2024
1 parent 9794c26 commit b53f29c
Show file tree
Hide file tree
Showing 10 changed files with 746 additions and 7 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ __pycache__/
*.so

# Folders
# data/
mlruns/
#data/

# Distribution / packaging
.Python
Expand Down
126 changes: 121 additions & 5 deletions databricks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,132 @@

bundle:
name: marvelous-databricks-course-a0134m
#databricks_cli_version: "0.230.0"
cluster_id: 1018-024853-shr990fc

permissions:
- level: CAN_MANAGE
user_name: [email protected]

artifacts:
default:
type: whl
build: uv build --wheel
# build: python -m build
path: .

variables:
root_path:
description: root_path for the target
# default: /Shared/.bundle/${bundle.target}/${bundle.name}
default: /Workspace/Users/[email protected]/.bundle/marvelous-databricks-course-a0134m/dev/files
git_sha:
description: git_sha
default: abcd
schedule_pause_status:
description: schedule pause status
default: UNPAUSED

targets:
prod:
workspace:
host: https://dbc-643c4c2b-d6c9.cloud.databricks.com
root_path: ${var.root_path}

dev:
mode: development
default: true
workspace:
host: https://dbc-643c4c2b-d6c9.cloud.databricks.com
# root_path: /Workspace/Users/[email protected]/.bundle/${bundle.name}/dev

resources:
jobs:
wine-quality:
name: wine-quality-workflow
schedule:
quartz_cron_expression: "0 0 10 ? * MONDAY *"
timezone_id: "America/Chicago"
pause_status: ${var.schedule_pause_status}
tags:
project_name: "wine-quality"
job_clusters:
- job_cluster_key: "wine-quality-cluster"
new_cluster:
spark_version: "15.4.x-scala2.12"
data_security_mode: "SINGLE_USER"
node_type_id: "r3.xlarge"
driver_node_type_id: "r3.xlarge"
autoscale:
min_workers: 1
max_workers: 1

## Optionally, there could be 'staging' or 'prod' targets here.
#
# prod:
# workspace:
# host: https://dbc-643c4c2b-d6c9.cloud.databricks.com
tasks:
- task_key: "preprocessing"
job_cluster_key: "wine-quality-cluster"
spark_python_task:
python_file: "week5/preprocess.py"
parameters:
- "--root_path"
- ${var.root_path}
libraries:
- whl: ./dist/*.whl
- task_key: if_refreshed
condition_task:
op: "EQUAL_TO"
left: "{{tasks.preprocessing.values.refreshed}}"
right: "1"
depends_on:
- task_key: "preprocessing"
- task_key: "train_model"
depends_on:
- task_key: "if_refreshed"
outcome: "true"
job_cluster_key: "wine-quality-cluster"
spark_python_task:
python_file: "week5/train_model.py"
parameters:
- "--root_path"
- ${var.root_path}
- "--git_sha"
- ${var.git_sha}
- "--job_run_id"
- "{{job.id}}"
libraries:
- whl: ./dist/*.whl
- task_key: "evaluate_model"
depends_on:
- task_key: "train_model"
job_cluster_key: "wine-quality-cluster"
spark_python_task:
python_file: "week5/evaluate_model.py"
parameters:
- "--root_path"
- ${var.root_path}
- "--new_model_uri"
- "{{tasks.train_model.values.new_model_uri}}"
- "--job_run_id"
- "{{job.id}}"
- "--git_sha"
- ${var.git_sha}
libraries:
- whl: ./dist/*.whl
- task_key: model_update
condition_task:
op: "EQUAL_TO"
left: "{{tasks.evaluate_model.values.model_update}}"
right: "1"
depends_on:
- task_key: "evaluate_model"
- task_key: "deploy_model"
depends_on:
- task_key: "model_update"
outcome: "true"
job_cluster_key: "wine-quality-cluster"
spark_python_task:
python_file: "week5/deploy_model.py"
parameters:
- "--root_path"
- ${var.root_path}
libraries:
- whl: ./dist/*.whl
2 changes: 1 addition & 1 deletion notebooks/week2/05.log_and_register_fe_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@

# COMMAND ----------

# Create or replace the house_features table
# Create or replace the wine_features table
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.wine_features
(Id STRING NOT NULL,
Expand Down
77 changes: 77 additions & 0 deletions notebooks/week5/00.create_source_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, to_utc_timestamp
from wine_quality.config import ProjectConfig

# Load configuration
config = ProjectConfig.from_yaml(config_path="../../project_config.yml")
catalog_name = config.catalog_name
schema_name = config.schema_name
spark = SparkSession.builder.getOrCreate()

# Load train and test sets
train_set = spark.table(f"{catalog_name}.{schema_name}.train_set").toPandas()
test_set = spark.table(f"{catalog_name}.{schema_name}.test_set").toPandas()
combined_set = pd.concat([train_set, test_set], ignore_index=True)
existing_ids = set(int(id) for id in combined_set['id'])

# Define function to create synthetic data without random state
def create_synthetic_data(df, num_rows=100):
synthetic_data = pd.DataFrame()

for column in df.columns:
# Treat float and int differently
# if pd.api.types.is_numeric_dtype(df[column]) and column != 'id':
# mean, std = df[column].mean(), df[column].std()
# synthetic_data[column] = np.random.normal(mean, std, num_rows)
if pd.api.types.is_float_dtype(df[column]):
mean, std = df[column].mean(), df[column].std()
synthetic_data[column] = np.random.normal(mean, std, num_rows)
elif pd.api.types.is_integer_dtype(df[column]) and column != 'id':
mean, std = df[column].mean(), df[column].std()
synthetic_data[column] = np.random.normal(mean, std, num_rows).astype(int)
elif pd.api.types.is_categorical_dtype(df[column]) or pd.api.types.is_object_dtype(df[column]):
synthetic_data[column] = np.random.choice(df[column].unique(), num_rows,
p=df[column].value_counts(normalize=True))

elif pd.api.types.is_datetime64_any_dtype(df[column]):
min_date, max_date = df[column].min(), df[column].max()
if min_date < max_date:
synthetic_data[column] = pd.to_datetime(
np.random.randint(min_date.value, max_date.value, num_rows)
)
else:
synthetic_data[column] = [min_date] * num_rows

else:
synthetic_data[column] = np.random.choice(df[column], num_rows)

# Making sure that generated IDs are unique and do not previously exist
new_ids = []
i = max(existing_ids) + 1 if existing_ids else 1
while len(new_ids) < num_rows:
if i not in existing_ids:
new_ids.append(i) # Id needs to be string, but leaving it as int to match train/test set. Will convert to string later.
#new_ids.append(str(i)) # Convert numeric ID to string
i += 1
synthetic_data['id'] = new_ids

return synthetic_data

# Create synthetic data
synthetic_df = create_synthetic_data(combined_set)

# Create source_data table manually using Create table like train_set
existing_schema = spark.table(f"{catalog_name}.{schema_name}.source_data").schema

synthetic_spark_df = spark.createDataFrame(synthetic_df, schema=existing_schema)

train_set_with_timestamp = synthetic_spark_df.withColumn(
"update_timestamp_utc", to_utc_timestamp(current_timestamp(), "UTC")
)

# Append synthetic data as new data to source_data table
train_set_with_timestamp.write.mode("append").saveAsTable(
f"{catalog_name}.{schema_name}.source_data"
)
1 change: 1 addition & 0 deletions project_config.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
catalog_name: mlops_students
schema_name: mahajan134
pipeline_id: 5d52b992-0c14-4abe-a8d8-3880ad86fe93

parameters:
learning_rate: 0.01
Expand Down
1 change: 1 addition & 0 deletions src/wine_quality/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class ProjectConfig(BaseModel):
schema_name: str
parameters: Dict[str, Any] # Dictionary to hold model-related parameters
ab_test: Dict[str, Any] # Dictionary to hold A/B test parameters
pipeline_id: str # pipeline id for data live tables

@classmethod
def from_yaml(cls, config_path: str):
Expand Down
58 changes: 58 additions & 0 deletions week5/deploy_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""
This script handles the deployment of a wine quality prediction model to a Databricks serving endpoint.
Key functionality:
- Loads project configuration from YAML
- Retrieves the model version from previous task values
- Updates the serving endpoint configuration with:
- Model registry reference
- Scale to zero capability
- Workload sizing
- Specific model version
The endpoint is configured for feature-engineered model serving with automatic scaling.
"""

import yaml
import argparse
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import ServedEntityInput
from pyspark.dbutils import DBUtils
from pyspark.sql import SparkSession
from wine_quality.config import ProjectConfig

parser = argparse.ArgumentParser()
parser.add_argument(
"--root_path",
action="store",
default=None,
type=str,
required=True,
)

args = parser.parse_args()
root_path = args.root_path

config_path = (f"{root_path}/project_config.yml")
# config_path = ("/Volumes/mlops_students/mahajan134/mlops_vol/project_config.yml")
config = ProjectConfig.from_yaml(config_path=config_path)

spark = SparkSession.builder.getOrCreate()
dbutils = DBUtils(spark)

model_version = dbutils.jobs.taskValues.get(taskKey="evaluate_model", key="model_version")

workspace = WorkspaceClient()

catalog_name = config.catalog_name
schema_name = config.schema_name

workspace.serving_endpoints.update_config_and_wait(
name="wine-quality-model-serving-fe",
served_entities=[
ServedEntityInput(
entity_name=f"{catalog_name}.{schema_name}.wine-quality-model-fe",
scale_to_zero_enabled=True,
workload_size="Small",
entity_version=model_version,
)
],
)
Loading

0 comments on commit b53f29c

Please sign in to comment.