-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add CI/CD workflow for Databricks deployment #4
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThis pull request introduces several changes to enhance the continuous deployment and management of machine learning workflows within a Databricks environment. Key updates include the addition of a GitHub Actions workflow for continuous deployment, modifications to the Databricks configuration for job management and artifact handling, and the introduction of new scripts for model training, evaluation, and data generation. These changes collectively aim to streamline the deployment process and improve the overall functionality of the machine learning operations. Changes
Possibly related PRs
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 24
🧹 Outside diff range and nitpick comments (22)
src/mlops_with_databricks/pipeline/deploy_model.py (2)
1-7
: Enhance the docstring with more detailsThe docstring could be more comprehensive by including:
- Input/output specifications
- Error handling scenarios
- Return values
- Usage example
Example improvement:
-"""This script is used to deploy the model to the serving endpoint. The model version is fetched from the evaluate_model task.""" +"""Deploy a trained model to the Databricks serving endpoint. + +This script retrieves the model version from the evaluate_model task results and +deploys it to a Databricks serving endpoint with specified configuration. + +Raises: + RuntimeError: If model version cannot be retrieved from task values + Exception: If deployment to serving endpoint fails + +Example: + $ python deploy_model.py +"""
1-27
: Consider adding logging and metrics collectionAs this is a critical deployment script in your CI/CD pipeline, consider these architectural improvements:
- Add structured logging for better operational visibility
- Implement metrics collection for deployment monitoring
- Consider adding deployment rollback capability
Would you like me to provide an example implementation of these improvements?
pyproject.toml (1)
20-20
: LGTM! Consider adding logging configuration.The addition of
loguru
is appropriate for improving logging in the CI/CD pipeline.Consider adding a logging configuration section to standardize log formats across the pipeline. Example:
[tool.loguru] format = "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>".github/workflows/cd.yaml (1)
3-9
: Consider enhancing workflow triggers for better controlThe current trigger configuration could be improved:
- Add path filters to avoid unnecessary runs when irrelevant files change
- Use a stricter semver pattern for tags
- Consider adding pull_request trigger for validation before merging
on: push: branches: - 'main' + paths: + - 'src/**' + - 'conf/**' + - 'databricks.yml' + - '.github/workflows/**' tags: - - '[0-9]+.[0-9]+.[0-9]+' + - 'v[0-9]+.[0-9]+.[0-9]+' + - 'v[0-9]+.[0-9]+.[0-9]+-rc.[0-9]+' + + pull_request: + branches: + - 'main' + paths: + - 'src/**' + - 'conf/**' + - 'databricks.yml' + - '.github/workflows/**'src/mlops_with_databricks/pipeline/preprocess.py (3)
14-17
: Document or remove unused argument parser.The argument parser is initialized but no arguments are defined. Either:
- Remove the unused argument parser, or
- Document that it's reserved for future use, or
- Add the intended command-line arguments
If keeping it for future use, apply this diff:
+# TODO: Add command line arguments for future customization parser = argparse.ArgumentParser() +parser.description = "Preprocess data and update train/test sets in Databricks."
48-56
: Enhance results handling and add logging.The results handling could be improved by:
- Adding error handling for job context access
- Providing more detailed status information
- Adding proper logging
Apply this diff:
+import logging + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) affected_rows_train = new_data_train.count() affected_rows_test = new_data_test.count() +logger.info(f"Processed {affected_rows_train} training rows and {affected_rows_test} test rows") if affected_rows_train > 0 or affected_rows_test > 0: refreshed = 1 + logger.info("Dataset refreshed successfully") else: refreshed = 0 + logger.info("No updates were necessary") +try: + workspace.dbutils.jobs.taskValues.set(key="refreshed", value=refreshed) + workspace.dbutils.jobs.taskValues.set(key="affected_rows_train", value=affected_rows_train) + workspace.dbutils.jobs.taskValues.set(key="affected_rows_test", value=affected_rows_test) +except Exception as e: + raise RuntimeError(f"Failed to set task values: {str(e)}") -workspace.dbutils.jobs.taskValues.set(key="refreshed", value=refreshed)
1-56
: Consider adding tests and monitoring.To improve reliability and maintainability:
- Add unit tests using a mock Spark session
- Add integration tests for the Databricks workflow
- Consider adding monitoring and alerting for:
- Data quality metrics
- Processing time
- Error rates
Would you like me to help create a test suite template or suggest monitoring metrics?
run.sh (2)
37-47
: Add production deployment safeguardsThe production deployment function requires additional safety measures:
- No validation before production deployment
- Missing rollback mechanism
- No logging of deployment activities
Consider implementing these production-specific safeguards:
+function validate_prod_deployment() { + # Add validation logic here + local git_branch=$(git rev-parse --abbrev-ref HEAD) + if [ "$git_branch" != "main" ]; then + echo "Error: Production deployments must be from main branch" + return 1 + fi + # Add more validation as needed +} function run_databricks_bundle_prod() { + echo "Starting production deployment..." + + # Validate production deployment + validate_prod_deployment || return 1 + + # Store current state for potential rollback + local timestamp=$(date +%Y%m%d_%H%M%S) + local deployment_log="logs/prod_deployment_${timestamp}.log" + + # Execute with logging + { + run_databricks_command "deploy" "prod" || { + echo "Error: Production deployment failed, initiating rollback..." + # Add rollback logic here + return 1 + } + generate_data || return 1 + run_databricks_command "run" "prod" || return 1 + } 2>&1 | tee "$deployment_log" + + echo "Production deployment completed successfully" }
Line range hint
1-47
: Enhance script structure and usabilityThe script would benefit from better organization and documentation:
- Missing usage documentation
- No command-line argument handling
- No clear execution flow
Consider adding these improvements:
#!/bin/bash +# Usage: ./run.sh [stage|prod] +# Deploys and runs Databricks bundles in staging or production environment + +set -euo pipefail + +# Configuration +PYTHON_PATH=${PYTHON_PATH:-"python"} +DATA_GENERATOR_SCRIPT="src/mlops_with_databricks/pipeline/generate_data.py" + +# Main execution +main() { + local environment=$1 + + case $environment in + stage) + run_databricks_bundle_stage + ;; + prod) + run_databricks_bundle_prod + ;; + *) + echo "Usage: $0 [stage|prod]" + exit 1 + ;; + esac +} + +# Execute main if script is run directly +if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then + if [[ $# -ne 1 ]]; then + echo "Usage: $0 [stage|prod]" + exit 1 + fi + main "$1" +fisrc/mlops_with_databricks/data_preprocessing/dataclasses.py (1)
108-108
: Review consistency with ABTestConfig valuesThe new learning_rate (0.1) is significantly higher than both values defined in ABTestConfig (0.01 and 0.001). Similarly, the max_depth (15) differs from both A/B test variants (10 and 100). This inconsistency might indicate:
- A potential oversight in configuration alignment
- Missing documentation about why these values differ
- Possible need to update ABTestConfig to reflect new findings
Consider:
- Documenting the reasoning behind these different configurations
- Aligning the values if they should be consistent
- Adding comments to explain why they intentionally differ if that's the case
src/mlops_with_databricks/pipeline/generate_data.py (1)
1-75
: Consider adding data versioning and monitoringAs this is part of a CI/CD workflow for ML operations, consider these architectural improvements:
- Implement data versioning using Delta Lake's time travel capabilities
- Add data quality checks before writing
- Set up monitoring for data drift between synthetic and real data
- Add metrics collection for the synthetic data generation process
Would you like me to provide specific implementation details for any of these suggestions?
databricks.yml (5)
5-10
: Enhance artifact build configurationThe current artifact build configuration could be improved:
- Consider specifying the Python wheel version in the build command
- The path specification is very generic, consider being more explicit about the build artifacts location
artifacts: default: type: whl - build: python -m build + build: python -m build --wheel path: .
19-19
: Fix incorrect variable descriptionThe description for
git_branch
is incorrectly set to "git_sha".- description: git_sha + description: git_branch
28-30
: Review timezone configurationThe schedule is hardcoded to "Europe/Warsaw". Consider making the timezone configurable through variables to support different deployment regions and teams.
38-39
: Consider environment-specific instance typesThe instance types are hardcoded to "i3.xlarge". Consider making these configurable per environment to optimize costs (e.g., smaller instances for dev/stage).
40-42
: Review autoscaling configurationThe current autoscaling configuration (min=1, max=1) effectively disables autoscaling. Consider either:
- Removing autoscaling if it's not needed
- Configuring meaningful min/max values if autoscaling is desired
src/mlops_with_databricks/pipeline/evaluate_model.py (5)
8-8
: Remove unused import and variablefeature_engineering
andfe
.The variable
fe
is assigned but not used in the script. Removing the unused import and variable will clean up the code.Apply this diff to remove the unused import and variable:
-from databricks import feature_engineering ... -fe = feature_engineering.FeatureEngineeringClient()Also applies to: 67-67
98-99
: Avoid logging full prediction arrays to reduce log verbosity.Logging the entire prediction arrays can clutter the logs and expose sensitive data. Consider logging summary statistics or samples instead.
Apply this diff to modify the logging statements:
-logger.info(f"Predictions for New Model: {predictions_new}") -logger.info(f"Previous for Old Model: {predictions_previous}") +logger.info(f"Sample Predictions for New Model: {predictions_new[:5]}") +logger.info(f"Sample Predictions for Previous Model: {predictions_previous[:5]}")
78-83
: Add error handling for serving endpoint retrieval.Accessing the serving endpoint configuration may fail if the endpoint does not exist or is misconfigured. Adding error handling will prevent the script from crashing unexpectedly.
You can modify the code to include try-except blocks:
try: serving_endpoint = workspace.serving_endpoints.get(serving_endpoint_name) model_name = serving_endpoint.config.served_models[0].model_name model_version = serving_endpoint.config.served_models[0].model_version previous_model_uri = f"models:/{model_name}/{model_version}" except Exception as e: logger.error(f"Failed to retrieve serving endpoint or model details: {e}") sys.exit(1)
84-85
: Add exception handling when loading the test dataset.Converting a Spark DataFrame to a pandas DataFrame may cause issues with large datasets or if the table does not exist. Including error handling will make the script more robust.
Modify the code to handle exceptions:
try: test_set = spark.table(f"{catalog_name}.{schema_name}.test_set").toPandas() except Exception as e: logger.error(f"Failed to load test dataset: {e}") sys.exit(1)
65-67
: Remove unnecessary initialization of unused clients.The
FeatureEngineeringClient
is initialized but not used. Removing it will streamline the script.Apply this diff to remove the unused client:
-spark = SparkSession.builder.getOrCreate() -workspace = WorkspaceClient() -fe = feature_engineering.FeatureEngineeringClient() +spark = SparkSession.builder.getOrCreate() +workspace = WorkspaceClient()src/mlops_with_databricks/pipeline/train_model.py (1)
86-114
: Add error handling to the training processThe training process lacks error handling, which can make debugging difficult if an exception occurs. Wrap the training and evaluation code within a try-except block to catch exceptions, log errors, and ensure MLflow captures any failures.
Suggested change:
with mlflow.start_run(tags={"branch": git_branch, "git_sha": f"{git_sha}", "job_run_id": job_run_id}) as run: run_id = run.info.run_id + try: pipeline.fit(X_train, y_train) y_pred = pipeline.predict(X_test) # Metrics calculation and logging ... mlflow.sklearn.log_model(sk_model=pipeline, artifact_path="lightgbm-pipeline-model", signature=signature) + except Exception as e: + mlflow.log_param("error", str(e)) + raise
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (1)
uv.lock
is excluded by!**/*.lock
📒 Files selected for processing (10)
.github/workflows/cd.yaml
(1 hunks)databricks.yml
(1 hunks)pyproject.toml
(1 hunks)run.sh
(1 hunks)src/mlops_with_databricks/data_preprocessing/dataclasses.py
(1 hunks)src/mlops_with_databricks/pipeline/deploy_model.py
(1 hunks)src/mlops_with_databricks/pipeline/evaluate_model.py
(1 hunks)src/mlops_with_databricks/pipeline/generate_data.py
(1 hunks)src/mlops_with_databricks/pipeline/preprocess.py
(1 hunks)src/mlops_with_databricks/pipeline/train_model.py
(1 hunks)
🔇 Additional comments (6)
pyproject.toml (1)
21-21
: LGTM! Verify build system compatibility.
The addition of build
package is appropriate for the CI/CD pipeline.
Let's verify there are no conflicts between build and hatchling:
✅ Verification successful
Build and Hatchling are compatible - no conflicts found
Based on the PyPI metadata and GitHub issues:
- No direct incompatibilities reported between build and hatchling
- Both packages have compatible dependency requirements
- Existing issues are unrelated to package compatibility
- Build's core dependencies (packaging, pyproject_hooks) are compatible with Hatchling's requirements
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any known incompatibilities between build and hatchling
# Check if there are any issues reported in the build package that mention hatchling
gh api graphql -f query='
{
search(query: "repo:pypa/build is:issue hatchling", type: ISSUE, first: 5) {
nodes {
... on Issue {
title
state
url
}
}
}
}
'
# Check latest versions and compatibility notes
curl -s https://pypi.org/pypi/build/json | jq '.info.requires_dist'
curl -s https://pypi.org/pypi/hatchling/json | jq '.info.requires_dist'
Length of output: 2577
src/mlops_with_databricks/data_preprocessing/dataclasses.py (1)
108-108
: Verify impact of significant hyperparameter changes
The changes to LightGBM hyperparameters are quite substantial:
- 100x increase in learning_rate (0.001 -> 0.1)
- 2x increase in n_estimators (200 -> 400)
- 1.5x increase in max_depth (10 -> 15)
These changes could significantly impact:
- Model convergence and stability
- Training time and resource usage
- Risk of overfitting
✅ Verification successful
The initial results show that these hyperparameters were actually determined through a grid search process, with the following ranges:
- learning_rate: [0.001, 0.01, 0.1]
- n_estimators: [100, 200, 300, 400]
- max_depth: [5, 10, 15, 20]
Let's verify the ABTest configuration and original values to make a more informed decision.
The previous search didn't yield results, so let's try a different approach to find these configurations and any related test files.
We found that:
- ABTestConfig uses different learning rates (0.01 and 0.001)
- The grid search includes our new learning_rate (0.1) in its search space
- Let's verify how these are used in AB testing and check if there are any logged metrics to validate the performance.
Hyperparameter changes are supported by grid search results
The hyperparameter changes appear to be justified based on the following findings:
- The new values (learning_rate=0.1, n_estimators=400, max_depth=15) all fall within the grid search ranges defined in
train.py
:- learning_rate: [0.001, 0.01, 0.1]
- n_estimators: [100, 200, 300, 400]
- max_depth: [5, 10, 15, 20]
- These parameters were likely selected through systematic hyperparameter tuning using GridSearchCV with F1-score optimization
- While ABTestConfig uses different values, it serves a different purpose for experimental comparisons and doesn't invalidate the main model configuration
The changes are backed by proper hyperparameter optimization process and don't require additional verification.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for model evaluation results or configuration justification
rg -A 5 "model.*evaluation|hyperparameter.*tuning|grid.*search"
# Look for any documentation about these specific parameters
fd "README|CHANGELOG" --exec grep -A 5 -i "learning.*rate|n.*estimators|max.*depth"
Length of output: 1928
Script:
#!/bin/bash
# Let's check the ABTestConfig values for comparison
ast-grep --pattern 'class ABTestConfig {
$$$
}'
# Also check the original values of LightGBMConfig before this change
git log -p -- src/mlops_with_databricks/data_preprocessing/dataclasses.py | grep -A 10 "LightGBMConfig"
Length of output: 51
Script:
#!/bin/bash
# Let's try a different approach to find ABTestConfig and LightGBMConfig
rg -A 10 "class (ABTestConfig|LightGBMConfig)"
# Also check for any test files that might validate these configurations
fd "test.*\.py" --exec rg -l "LightGBMConfig|ABTestConfig" {}
Length of output: 1596
Script:
#!/bin/bash
# Let's check the AB testing implementation to understand how these configs are used
rg -A 10 "class ABTest|def test_|def run_" src/mlops_with_databricks/serving/ab_testing.py
# Also check for any model evaluation metrics or results
rg "mlflow.log_metric|f1_score =|precision =|recall =" src/mlops_with_databricks/training/train.py
Length of output: 319
databricks.yml (1)
37-37
: Review security configuration
The data_security_mode
is set to "SINGLE_USER". Please verify:
- This is the intended security configuration for your use case
- Access controls are properly set up in each environment
- Secrets are managed appropriately (not in clear text)
Let's check for any potential secrets in the codebase:
✅ Verification successful
Let me check for environment variables or configuration files that might contain sensitive information.
Security configuration appears to be clean
Based on the verification results:
- No hardcoded secrets, passwords, or credentials were found in the YAML files (only task and cluster keys which are identifiers)
- No environment files (.env) or secret configuration files were detected
- No environment variable interpolation patterns were found
The data_security_mode: "SINGLE_USER"
setting is a valid Databricks configuration and doesn't pose a security concern by itself.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for potential hardcoded secrets or credentials
rg -i '(password|secret|token|key|credential).*[=:].+' --type yaml
Length of output: 943
Script:
#!/bin/bash
# Check for environment variables or configuration patterns
rg -i '(env|environment|config|variable|secret).*\$\{.*?\}' --type yaml
# Also check for any .env files or similar configuration files
fd -e env -e conf -e config -e secrets
Length of output: 114
src/mlops_with_databricks/pipeline/evaluate_model.py (2)
69-70
: 🛠️ Refactor suggestion
Set MLflow registry URI using the correct scheme.
Ensure that the correct registry URI scheme is used for the Databricks MLflow tracking server.
Verify that "databricks-uc"
is the correct scheme for your environment. If not, consider using "databricks"
:
-mlflow.set_registry_uri("databricks-uc")
+mlflow.set_registry_uri("databricks")
Alternatively, confirm that "databricks-uc"
is intended for Unity Catalog if that's your setup.
116-117
:
Verify the usage of workspace.dbutils.jobs.taskValues.set
.
Accessing dbutils
through workspace
may not be valid in a script context. In Databricks jobs, dbutils
is typically accessed differently. Please verify whether this approach works or adjust the code to correctly set task values.
Run the following script to check for correct usage of dbutils
:
Consider importing DBUtils
and initializing dbutils
as follows:
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
dbutils.jobs.taskValues.set(key="model_version", value=model_version.version)
Also applies to: 121-121
src/mlops_with_databricks/pipeline/train_model.py (1)
117-117
: Verify the usage of workspace.dbutils.jobs.taskValues.set
Ensure that workspace.dbutils.jobs.taskValues.set
correctly sets the task values in the Databricks job context when using the Databricks SDK. Typically, dbutils.jobs.taskValues.set
is used within a Databricks notebook. Verify that this method works as intended in your script.
workspace = WorkspaceClient() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for workspace initialization
The workspace client initialization should include error handling and potentially accept configuration parameters for different environments.
Consider this improvement:
-workspace = WorkspaceClient()
+try:
+ workspace = WorkspaceClient(
+ host=DatabricksConfig.workspace_url, # Add this to DatabricksConfig
+ token=DatabricksConfig.access_token # Add this to DatabricksConfig
+ )
+except Exception as e:
+ raise RuntimeError(f"Failed to initialize Databricks workspace client: {str(e)}")
Committable suggestion skipped: line range outside the PR's diff.
catalog_name = DatabricksConfig.catalog_name | ||
schema_name = DatabricksConfig.schema_name | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Validate configuration values
Configuration values should be validated to ensure they are not empty or invalid before use.
Suggested improvement:
+def validate_config_value(name: str, value: str) -> str:
+ if not value or not isinstance(value, str):
+ raise ValueError(f"Invalid {name}: {value}")
+ return value
+
-catalog_name = DatabricksConfig.catalog_name
-schema_name = DatabricksConfig.schema_name
+catalog_name = validate_config_value("catalog_name", DatabricksConfig.catalog_name)
+schema_name = validate_config_value("schema_name", DatabricksConfig.schema_name)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
catalog_name = DatabricksConfig.catalog_name | |
schema_name = DatabricksConfig.schema_name | |
def validate_config_value(name: str, value: str) -> str: | |
if not value or not isinstance(value, str): | |
raise ValueError(f"Invalid {name}: {value}") | |
return value | |
catalog_name = validate_config_value("catalog_name", DatabricksConfig.catalog_name) | |
schema_name = validate_config_value("schema_name", DatabricksConfig.schema_name) |
|
||
model_version = workspace.dbutils.jobs.taskValues.get(taskKey="evaluate_model", key="model_version") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add validation for model version retrieval
The model version retrieval lacks error handling and validation. This could lead to deployment failures if the task value is missing or invalid.
Suggested improvement:
-model_version = workspace.dbutils.jobs.taskValues.get(taskKey="evaluate_model", key="model_version")
+try:
+ model_version = workspace.dbutils.jobs.taskValues.get(taskKey="evaluate_model", key="model_version")
+ if not model_version:
+ raise ValueError("Model version not found in task values")
+ # Validate model version format if applicable
+ if not isinstance(model_version, (int, str)):
+ raise TypeError(f"Invalid model version type: {type(model_version)}")
+except Exception as e:
+ raise RuntimeError(f"Failed to retrieve model version: {str(e)}")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
model_version = workspace.dbutils.jobs.taskValues.get(taskKey="evaluate_model", key="model_version") | |
try: | |
model_version = workspace.dbutils.jobs.taskValues.get(taskKey="evaluate_model", key="model_version") | |
if not model_version: | |
raise ValueError("Model version not found in task values") | |
# Validate model version format if applicable | |
if not isinstance(model_version, (int, str)): | |
raise TypeError(f"Invalid model version type: {type(model_version)}") | |
except Exception as e: | |
raise RuntimeError(f"Failed to retrieve model version: {str(e)}") |
workspace.serving_endpoints.update_config_and_wait( | ||
name=ModelServingConfig.serving_endpoint_name, | ||
served_entities=[ | ||
ServedEntityInput( | ||
entity_name=f"{catalog_name}.{schema_name}.{ModelConfig.model_name}", | ||
scale_to_zero_enabled=True, | ||
workload_size="Small", | ||
entity_version=model_version, | ||
) | ||
], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enhance deployment robustness and monitoring
The endpoint update lacks error handling, timeout configuration, and deployment validation.
Consider these improvements:
+import time
+from databricks.sdk.service.serving import EndpointStateResponse
+
+def wait_for_endpoint_ready(workspace: WorkspaceClient, endpoint_name: str, timeout_seconds: int = 300) -> None:
+ start_time = time.time()
+ while time.time() - start_time < timeout_seconds:
+ state = workspace.serving_endpoints.get_state(name=endpoint_name)
+ if state.ready:
+ return
+ time.sleep(10)
+ raise TimeoutError(f"Endpoint {endpoint_name} not ready after {timeout_seconds} seconds")
+
+try:
workspace.serving_endpoints.update_config_and_wait(
name=ModelServingConfig.serving_endpoint_name,
served_entities=[
ServedEntityInput(
entity_name=f"{catalog_name}.{schema_name}.{ModelConfig.model_name}",
scale_to_zero_enabled=True,
workload_size="Small",
entity_version=model_version,
)
],
+ timeout=300 # 5 minutes timeout
)
+ # Validate deployment
+ wait_for_endpoint_ready(workspace, ModelServingConfig.serving_endpoint_name)
+ print(f"Successfully deployed model version {model_version} to endpoint {ModelServingConfig.serving_endpoint_name}")
+except Exception as e:
+ raise RuntimeError(f"Failed to deploy model to serving endpoint: {str(e)}")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
workspace.serving_endpoints.update_config_and_wait( | |
name=ModelServingConfig.serving_endpoint_name, | |
served_entities=[ | |
ServedEntityInput( | |
entity_name=f"{catalog_name}.{schema_name}.{ModelConfig.model_name}", | |
scale_to_zero_enabled=True, | |
workload_size="Small", | |
entity_version=model_version, | |
) | |
], | |
) | |
import time | |
from databricks.sdk.service.serving import EndpointStateResponse | |
def wait_for_endpoint_ready(workspace: WorkspaceClient, endpoint_name: str, timeout_seconds: int = 300) -> None: | |
start_time = time.time() | |
while time.time() - start_time < timeout_seconds: | |
state = workspace.serving_endpoints.get_state(name=endpoint_name) | |
if state.ready: | |
return | |
time.sleep(10) | |
raise TimeoutError(f"Endpoint {endpoint_name} not ready after {timeout_seconds} seconds") | |
try: | |
workspace.serving_endpoints.update_config_and_wait( | |
name=ModelServingConfig.serving_endpoint_name, | |
served_entities=[ | |
ServedEntityInput( | |
entity_name=f"{catalog_name}.{schema_name}.{ModelConfig.model_name}", | |
scale_to_zero_enabled=True, | |
workload_size="Small", | |
entity_version=model_version, | |
) | |
], | |
timeout=300 # 5 minutes timeout | |
) | |
# Validate deployment | |
wait_for_endpoint_ready(workspace, ModelServingConfig.serving_endpoint_name) | |
print(f"Successfully deployed model version {model_version} to endpoint {ModelServingConfig.serving_endpoint_name}") | |
except Exception as e: | |
raise RuntimeError(f"Failed to deploy model to serving endpoint: {str(e)}") |
- name: Install UV | ||
uses: astral-sh/setup-uv@2e657c127d5b1635d5a8e3fa40e0ac50a5bf6992 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Remove unused UV installation step
The UV package manager is installed but never used in the workflow. Either remove this step or utilize UV for dependency management.
bundle: | ||
name: armak-bundle | ||
cluster_id: 1020-170922-owfkgqkz |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Hardcoded cluster ID should be replaced with job cluster configuration
The cluster ID is only used once in the codebase, and interestingly, the same file already demonstrates a better practice by using a job cluster configuration (job_clusters
) with a logical key "ad-click-cluster". You should:
- Remove the hardcoded
cluster_id: 1020-170922-owfkgqkz
- Use the job cluster configuration pattern that's already defined in the file, which provides better maintainability and environment independence
- Define cluster requirements (like
spark_version
anddata_security_mode
) instead of binding to a specific cluster ID
🔗 Analysis chain
Consider using cluster policies instead of hardcoded cluster IDs
Hardcoding cluster IDs in configuration files can be problematic as clusters may be terminated or recreated, breaking the workflow. Consider using cluster policies or templates to define cluster configurations consistently across environments.
Let's verify if this cluster ID is used elsewhere in the codebase:
Let me gather more context about the Databricks configuration patterns in the codebase:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for the cluster ID usage across the codebase
rg "1020-170922-owfkgqkz"
Length of output: 75
Script:
#!/bin/bash
# Look for other Databricks configuration files and cluster configurations
fd -e yml -e yaml | xargs rg -l "cluster"
# Search for any cluster policy references
rg -i "cluster.?policy"
# Check if there are any template or variable patterns for cluster configurations
rg "cluster_id:|cluster:|CLUSTER" -A 2 -B 2
Length of output: 538
model_new = mlflow.sklearn.load_model(new_model_uri) | ||
predictions_new = model_new.predict(X_test) | ||
|
||
model_previous = mlflow.sklearn.load_model(previous_model_uri) | ||
predictions_previous = model_previous.predict(X_test) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add exception handling when loading models and making predictions.
Loading models and making predictions might fail due to issues like incorrect URIs or model incompatibilities. Adding exception handling will improve the robustness of the script.
You can modify the code to include try-except blocks:
try:
model_new = mlflow.sklearn.load_model(new_model_uri)
predictions_new = model_new.predict(X_test)
except Exception as e:
logger.error(f"Failed to load or predict with new model: {e}")
sys.exit(1)
try:
model_previous = mlflow.sklearn.load_model(previous_model_uri)
predictions_previous = model_previous.predict(X_test)
except Exception as e:
logger.error(f"Failed to load or predict with previous model: {e}")
sys.exit(1)
train_set = spark.table(f"{catalog_name}.{schema_name}.train_set").toPandas() | ||
test_set = spark.table(f"{catalog_name}.{schema_name}.test_set").toPandas() | ||
|
||
X_train = train_set[list(num_features) + list(cat_features)] | ||
y_train = train_set[target] | ||
|
||
X_test = test_set[list(num_features) + list(cat_features)] | ||
y_test = test_set[target] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid converting Spark DataFrames to Pandas
Converting large Spark DataFrames to Pandas DataFrames can lead to memory issues and reduce scalability. It's recommended to use Spark DataFrames throughout the pipeline to leverage distributed computing and handle large datasets efficiently.
preprocessor = ColumnTransformer( | ||
transformers=[("cat", OneHotEncoder(handle_unknown="ignore"), cat_features)], remainder="passthrough" | ||
) | ||
pipeline = Pipeline(steps=[("onehot", preprocessor), ("classifier", LGBMClassifier(**light_gbm_config))]) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use Spark ML for preprocessing and modeling
Currently, the code uses scikit-learn's Pipeline
and OneHotEncoder
, which require Pandas DataFrames and aren't optimized for distributed processing. Consider using Spark ML's feature transformers and estimators to build the pipeline directly on Spark DataFrames. This approach will improve performance and scalability for large datasets.
y_pred = pipeline.predict(X_test) | ||
|
||
f1 = f1_score(y_test, y_pred) | ||
precision = precision_score(y_test, y_pred) | ||
recall = recall_score(y_test, y_pred) | ||
roc_auc = roc_auc_score(y_test, y_pred) | ||
accuracy = accuracy_score(y_test, y_pred) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use predicted probabilities for ROC AUC calculation
The roc_auc_score
function is more meaningful when using predicted probabilities rather than class labels. Modify the code to use pipeline.predict_proba(X_test)[:, 1]
to obtain the predicted probabilities for the positive class.
Suggested change:
y_pred = pipeline.predict(X_test)
+ y_pred_proba = pipeline.predict_proba(X_test)[:, 1]
f1 = f1_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
- roc_auc = roc_auc_score(y_test, y_pred)
+ roc_auc = roc_auc_score(y_test, y_pred_proba)
accuracy = accuracy_score(y_test, y_pred)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
y_pred = pipeline.predict(X_test) | |
f1 = f1_score(y_test, y_pred) | |
precision = precision_score(y_test, y_pred) | |
recall = recall_score(y_test, y_pred) | |
roc_auc = roc_auc_score(y_test, y_pred) | |
accuracy = accuracy_score(y_test, y_pred) | |
y_pred = pipeline.predict(X_test) | |
y_pred_proba = pipeline.predict_proba(X_test)[:, 1] | |
f1 = f1_score(y_test, y_pred) | |
precision = precision_score(y_test, y_pred) | |
recall = recall_score(y_test, y_pred) | |
roc_auc = roc_auc_score(y_test, y_pred_proba) | |
accuracy = accuracy_score(y_test, y_pred) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you get a chance to play with monitoring tables?
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Documentation