Skip to content

Commit

Permalink
Separate config file by environment
Browse files Browse the repository at this point in the history
  • Loading branch information
a0134m committed Nov 26, 2024
1 parent b53f29c commit 62e449a
Show file tree
Hide file tree
Showing 11 changed files with 438 additions and 263 deletions.
98 changes: 98 additions & 0 deletions asset_bundles/wine_quality_model.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
resources:
jobs:
wine-quality:
name: wine-quality-workflow
schedule:
quartz_cron_expression: "0 0 10 ? * MONDAY *"
timezone_id: "America/Chicago"
pause_status: ${var.schedule_pause_status}
tags:
project_name: "wine-quality"
job_clusters:
- job_cluster_key: "wine-quality-cluster"
new_cluster:
spark_version: "15.4.x-scala2.12"
data_security_mode: "SINGLE_USER"
node_type_id: "r3.xlarge"
driver_node_type_id: "r3.xlarge"
autoscale:
min_workers: 1
max_workers: 1

tasks:
- task_key: "preprocessing"
job_cluster_key: "wine-quality-cluster"
spark_python_task:
python_file: "../week5/preprocess.py"
parameters:
- "--root_path"
- ${var.root_path}
- "--env"
- ${var.env}
libraries:
- whl: ../dist/*.whl
- task_key: if_refreshed
condition_task:
op: "EQUAL_TO"
left: "{{tasks.preprocessing.values.refreshed}}"
right: "1"
depends_on:
- task_key: "preprocessing"
- task_key: "train_model"
depends_on:
- task_key: "if_refreshed"
outcome: "true"
job_cluster_key: "wine-quality-cluster"
spark_python_task:
python_file: "../week5/train_model.py"
parameters:
- "--root_path"
- ${var.root_path}
- "--git_sha"
- ${var.git_sha}
- "--job_run_id"
- "{{job.id}}"
- "--env"
- ${var.env}
libraries:
- whl: ../dist/*.whl
- task_key: "evaluate_model"
depends_on:
- task_key: "train_model"
job_cluster_key: "wine-quality-cluster"
spark_python_task:
python_file: "../week5/evaluate_model.py"
parameters:
- "--root_path"
- ${var.root_path}
- "--new_model_uri"
- "{{tasks.train_model.values.new_model_uri}}"
- "--job_run_id"
- "{{job.id}}"
- "--git_sha"
- ${var.git_sha}
- "--env"
- ${var.env}
libraries:
- whl: ../dist/*.whl
- task_key: model_update
condition_task:
op: "EQUAL_TO"
left: "{{tasks.evaluate_model.values.model_update}}"
right: "1"
depends_on:
- task_key: "evaluate_model"
- task_key: "deploy_model"
depends_on:
- task_key: "model_update"
outcome: "true"
job_cluster_key: "wine-quality-cluster"
spark_python_task:
python_file: "../week5/deploy_model.py"
parameters:
- "--root_path"
- ${var.root_path}
- "--env"
- ${var.env}
libraries:
- whl: ../dist/*.whl
101 changes: 10 additions & 91 deletions databricks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ bundle:
#databricks_cli_version: "0.230.0"
cluster_id: 1018-024853-shr990fc

include:
- asset_bundles/*.yaml

permissions:
- level: CAN_MANAGE
user_name: [email protected]
Expand All @@ -29,107 +32,23 @@ variables:
schedule_pause_status:
description: schedule pause status
default: UNPAUSED
env:
description: environment
default: dev

targets:
prod:
variables:
env: prod
workspace:
host: https://dbc-643c4c2b-d6c9.cloud.databricks.com
root_path: ${var.root_path}

dev:
mode: development
default: true
variables:
env: dev
workspace:
host: https://dbc-643c4c2b-d6c9.cloud.databricks.com
# root_path: /Workspace/Users/[email protected]/.bundle/${bundle.name}/dev

resources:
jobs:
wine-quality:
name: wine-quality-workflow
schedule:
quartz_cron_expression: "0 0 10 ? * MONDAY *"
timezone_id: "America/Chicago"
pause_status: ${var.schedule_pause_status}
tags:
project_name: "wine-quality"
job_clusters:
- job_cluster_key: "wine-quality-cluster"
new_cluster:
spark_version: "15.4.x-scala2.12"
data_security_mode: "SINGLE_USER"
node_type_id: "r3.xlarge"
driver_node_type_id: "r3.xlarge"
autoscale:
min_workers: 1
max_workers: 1

tasks:
- task_key: "preprocessing"
job_cluster_key: "wine-quality-cluster"
spark_python_task:
python_file: "week5/preprocess.py"
parameters:
- "--root_path"
- ${var.root_path}
libraries:
- whl: ./dist/*.whl
- task_key: if_refreshed
condition_task:
op: "EQUAL_TO"
left: "{{tasks.preprocessing.values.refreshed}}"
right: "1"
depends_on:
- task_key: "preprocessing"
- task_key: "train_model"
depends_on:
- task_key: "if_refreshed"
outcome: "true"
job_cluster_key: "wine-quality-cluster"
spark_python_task:
python_file: "week5/train_model.py"
parameters:
- "--root_path"
- ${var.root_path}
- "--git_sha"
- ${var.git_sha}
- "--job_run_id"
- "{{job.id}}"
libraries:
- whl: ./dist/*.whl
- task_key: "evaluate_model"
depends_on:
- task_key: "train_model"
job_cluster_key: "wine-quality-cluster"
spark_python_task:
python_file: "week5/evaluate_model.py"
parameters:
- "--root_path"
- ${var.root_path}
- "--new_model_uri"
- "{{tasks.train_model.values.new_model_uri}}"
- "--job_run_id"
- "{{job.id}}"
- "--git_sha"
- ${var.git_sha}
libraries:
- whl: ./dist/*.whl
- task_key: model_update
condition_task:
op: "EQUAL_TO"
left: "{{tasks.evaluate_model.values.model_update}}"
right: "1"
depends_on:
- task_key: "evaluate_model"
- task_key: "deploy_model"
depends_on:
- task_key: "model_update"
outcome: "true"
job_cluster_key: "wine-quality-cluster"
spark_python_task:
python_file: "week5/deploy_model.py"
parameters:
- "--root_path"
- ${var.root_path}
libraries:
- whl: ./dist/*.whl
48 changes: 25 additions & 23 deletions notebooks/week5/00.create_source_data.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pandas as pd
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, to_utc_timestamp

from wine_quality.config import ProjectConfig

# Load configuration
Expand All @@ -14,55 +15,58 @@
train_set = spark.table(f"{catalog_name}.{schema_name}.train_set").toPandas()
test_set = spark.table(f"{catalog_name}.{schema_name}.test_set").toPandas()
combined_set = pd.concat([train_set, test_set], ignore_index=True)
existing_ids = set(int(id) for id in combined_set['id'])
existing_ids = set(int(id) for id in combined_set["id"])


# Define function to create synthetic data without random state
def create_synthetic_data(df, num_rows=100):
synthetic_data = pd.DataFrame()

for column in df.columns:
# Treat float and int differently
# if pd.api.types.is_numeric_dtype(df[column]) and column != 'id':
# mean, std = df[column].mean(), df[column].std()
# synthetic_data[column] = np.random.normal(mean, std, num_rows)
# if pd.api.types.is_numeric_dtype(df[column]) and column != 'id':
# mean, std = df[column].mean(), df[column].std()
# synthetic_data[column] = np.random.normal(mean, std, num_rows)
if pd.api.types.is_float_dtype(df[column]):
mean, std = df[column].mean(), df[column].std()
synthetic_data[column] = np.random.normal(mean, std, num_rows)
elif pd.api.types.is_integer_dtype(df[column]) and column != 'id':
elif pd.api.types.is_integer_dtype(df[column]) and column != "id":
mean, std = df[column].mean(), df[column].std()
synthetic_data[column] = np.random.normal(mean, std, num_rows).astype(int)
elif pd.api.types.is_categorical_dtype(df[column]) or pd.api.types.is_object_dtype(df[column]):
synthetic_data[column] = np.random.choice(df[column].unique(), num_rows,
p=df[column].value_counts(normalize=True))

synthetic_data[column] = np.random.choice(
df[column].unique(), num_rows, p=df[column].value_counts(normalize=True)
)

elif pd.api.types.is_datetime64_any_dtype(df[column]):
min_date, max_date = df[column].min(), df[column].max()
if min_date < max_date:
synthetic_data[column] = pd.to_datetime(
np.random.randint(min_date.value, max_date.value, num_rows)
)
synthetic_data[column] = pd.to_datetime(np.random.randint(min_date.value, max_date.value, num_rows))
else:
synthetic_data[column] = [min_date] * num_rows

else:
synthetic_data[column] = np.random.choice(df[column], num_rows)
# Making sure that generated IDs are unique and do not previously exist

# Making sure that generated IDs are unique and do not previously exist
new_ids = []
i = max(existing_ids) + 1 if existing_ids else 1
while len(new_ids) < num_rows:
if i not in existing_ids:
new_ids.append(i) # Id needs to be string, but leaving it as int to match train/test set. Will convert to string later.
#new_ids.append(str(i)) # Convert numeric ID to string
new_ids.append(
i
) # Id needs to be string, but leaving it as int to match train/test set. Will convert to string later.
# new_ids.append(str(i)) # Convert numeric ID to string
i += 1
synthetic_data['id'] = new_ids
synthetic_data["id"] = new_ids

return synthetic_data


# Create synthetic data
synthetic_df = create_synthetic_data(combined_set)

# Create source_data table manually using Create table like train_set
# Create source_data table manually using Create table like train_set
existing_schema = spark.table(f"{catalog_name}.{schema_name}.source_data").schema

synthetic_spark_df = spark.createDataFrame(synthetic_df, schema=existing_schema)
Expand All @@ -72,6 +76,4 @@ def create_synthetic_data(df, num_rows=100):
)

# Append synthetic data as new data to source_data table
train_set_with_timestamp.write.mode("append").saveAsTable(
f"{catalog_name}.{schema_name}.source_data"
)
train_set_with_timestamp.write.mode("append").saveAsTable(f"{catalog_name}.{schema_name}.source_data")
10 changes: 10 additions & 0 deletions project_config.yml → project_config_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@ catalog_name: mlops_students
schema_name: mahajan134
pipeline_id: 5d52b992-0c14-4abe-a8d8-3880ad86fe93

dev:
catalog_name: mlops_students
schema_name: mahajan134
pipeline_id: 5d52b992-0c14-4abe-a8d8-3880ad86fe93

prod:
catalog_name: mlops_students
schema_name: mahajan134
pipeline_id: 5d52b992-0c14-4abe-a8d8-3880ad86fe93

parameters:
learning_rate: 0.01
n_estimators: 1000
Expand Down
40 changes: 40 additions & 0 deletions project_config_prod.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
catalog_name: mlops_students
schema_name: mahajan134
pipeline_id: 5d52b992-0c14-4abe-a8d8-3880ad86fe93

dev:
catalog_name: mlops_students
schema_name: mahajan134
pipeline_id: 5d52b992-0c14-4abe-a8d8-3880ad86fe93

prod:
catalog_name: mlops_students
schema_name: mahajan134
pipeline_id: 5d52b992-0c14-4abe-a8d8-3880ad86fe93

parameters:
learning_rate: 0.01
n_estimators: 1000
max_depth: 6

ab_test:
learning_rate_a: 0.02
learning_rate_b: 0.02
n_estimators: 1000
max_depth_a: 6
max_depth_b: 10

num_features:
- fixed_acidity
- volatile_acidity
- citric_acid
- residual_sugar
- chlorides
- free_sulfur_dioxide
- total_sulfur_dioxide
- density
- pH
- sulphates
- alcohol

target: quality
8 changes: 8 additions & 0 deletions tests/test_config.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
catalog_name: mlops_students
schema_name: mahajan134
pipeline_id: 5d52b992-0c14-4abe-a8d8-3880ad86fe93

parameters:
learning_rate: 0.01
n_estimators: 1000
max_depth: 6

ab_test:
learning_rate_a: 0.02
learning_rate_b: 0.02
n_estimators: 1000
max_depth_a: 6
max_depth_b: 10

num_features:
- fixed_acidity
- volatile_acidity
Expand Down
Loading

0 comments on commit 62e449a

Please sign in to comment.