diff --git a/01_introduction.py b/01_introduction.py new file mode 100644 index 0000000..2aa915e --- /dev/null +++ b/01_introduction.py @@ -0,0 +1,28 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Set Up, Build, Train, and Deploy model in Databricks + +# COMMAND ---------- + +# MAGIC %md +# MAGIC +# MAGIC ## Use case and introduction +# MAGIC +# MAGIC Northern Outfitters, a well-established online retail store specializing in clothing, winter gear, and backpacks, is a prominent user of Salesforce. They extensively utilize various clouds, including sales, service, community, and marketing, to efficiently manage customer operations. Despite allocating significant resources to marketing promotions, the current methodology is expensive and lacks precision in targeting. +# MAGIC +# MAGIC The company's objective is to transition to targeted promotions, focusing on specific products to optimize sales and improve return on investment. Customizing promotions based on individual customer preferences and interests is expected to boost conversion rates and overall customer satisfaction. Northern Outfitters places a high value on providing outstanding service to its club members, aiming to deliver a personalized experience with call center agents offering a "white glove treatment" to these customers. +# MAGIC +# MAGIC The integration of DataCloud has allowed Northern Outfitters to ingest, prepare, and consolidate customer profiles and behaviors from different Salesforce clouds and enterprise systems. This integration has led to the creation of a unified customer view, and the company plans to leverage this comprehensive customer data for strategic intelligence. +# MAGIC +# MAGIC To bridge the gap between data scientists' machine learning models and the system of engagement for sales, service, and marketing teams, Northern Outfitters is seeking a solution that seamlessly integrates data-driven insights into the day-to-day workflows of their employees. By empowering their teams with actionable insights, the company aims to enhance decision-making, improve customer interactions, and automate customer addition to marketing journeys. +# MAGIC +# MAGIC +# MAGIC The objective of this exercise is to create a predictive model for identifying customer product interests. This model will then be utilized to generate personalized experiences and offers for customers. The development of the model is based on historical data, including customer demographics, marketing engagements, and purchase history. +# MAGIC +# MAGIC The dataset comprises 1 million records, each containing observations and information about potential predictors and the products historically purchased by customers. +# MAGIC + +# COMMAND ---------- + +# MAGIC %md +# MAGIC diff --git a/02_ingest_data.py b/02_ingest_data.py new file mode 100644 index 0000000..d6d9c9b --- /dev/null +++ b/02_ingest_data.py @@ -0,0 +1,230 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Data Ingestion +# MAGIC +# MAGIC In this notebook, we'll demonstrate how to load data from Salesforce Data Cloud into Databricks. The primary objective is to give you the tools necessary to construct a data pipeline that pulls in the required data from Salesforce so you can combine it with the rest of the data in your Databricks Lakehouse to produce effective machine learning models. The method outlined here focuses on the Salesforce CDP Connection Python Library. +# MAGIC +# MAGIC ## What You Will Achieve +# MAGIC +# MAGIC By following this notebook, you will learn how to: +# MAGIC +# MAGIC - **Connect and Extract Data**: Establish a connection to Salesforce Data Cloud, enabling you to extract product interest data. +# MAGIC - **Transform Data**: Employ advanced transformation techniques to transition the data from its raw form in the bronze layer to a refined, cleansed state in the silver layer. This process ensures that the data is optimized for analytics and machine learning applications. +# MAGIC - **Load Data into Databricks**: Seamlessly load your transformed data into Databricks, preparing it for sophisticated analysis and insights discovery. +# MAGIC +# MAGIC ## Why This Matters +# MAGIC +# MAGIC In today's data-driven world, the ability to efficiently process and analyze data is paramount. This notebook empowers you to: +# MAGIC +# MAGIC - **Enhance Data Quality**: Through the transformation process, you will improve the quality of your data, making it more reliable for decision-making. +# MAGIC - **Accelerate Time-to-Insight**: By streamlining the data ingestion process, you reduce the time from data collection to actionable insights, enabling faster decision-making. +# MAGIC - **Simplify Data Management**: The use of the Salesforce CDP Connection Python Library simplifies the complexity of data management, making it accessible to users with varying levels of technical expertise. +# MAGIC +# MAGIC ## Separation of Concerns +# MAGIC +# MAGIC Having the data ingestion as a separate notebook from the rest of the model training process provides a couple of key advantages over just loading it directly in your model training notebook: +# MAGIC +# MAGIC - **Speed up Model Experimentation**: If you reload the dataframe every time you start the model training notebook, during experimentation, this can slow things down considerably. Preloading the table as a Delta table in Databricks where it is optimized for both BI and AI workloads can speed up your experimentation greatly. +# MAGIC - **Scheduled Independently**: You may want to have new data or a fresh snapshot on a different schedule than other parts of the workload. Having it as a separate notebook, and thus configurable as a separate task in Databricks Workflows, provides flexibility in this scheduling. +# MAGIC - **Team Collaboration**: It may be a different engineer or SME who is responsible for loading the data from Salesforce, optimizing the data loading process and determining the right tables to query and the right joins to make. Separating concerns in this way makes it easier for the right people to focus on the right parts of the development process. +# MAGIC + +# COMMAND ---------- + +# DBTITLE 1,Common setup +# MAGIC %run ./common + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Set up Salesforce CDP Connection +# MAGIC +# MAGIC The first step towards data ingestion journey involves establishing a connection to the Salesforce Customer Data Platform (CDP). This connection is the bridge that allows us to access the product interest data stored within Salesforce Data Cloud. To achieve this, we leverage the `SalesforceCDPConnection` class, provided by the [Salesforce CDP Connection Python Library](https://github.com/forcedotcom/salesforce-cdp-connector). Below, we detail the process of initializing this connection, ensuring a secure and efficient link to your Salesforce data. +# MAGIC +# MAGIC In this code snippet, we instantiate the `SalesforceCDPConnection` object with five parameters: +# MAGIC +# MAGIC - `sfdc_login_url`: The URL used for logging into Salesforce. This is your gateway to authenticate against the Salesforce platform, ensuring secure access. +# MAGIC - `sfdc_username`: Your Salesforce username. This credential identifies you to the Salesforce services and ensures that your connection is personalized and secure. +# MAGIC - `sfdc_password`: The password associated with your Salesforce account. Combined with your username, it authenticates your access to Salesforce's data. +# MAGIC - `sfdc_client_id`: The client ID provided when you register your application with Salesforce. It's part of the OAuth credentials needed to authorize your application to access Salesforce data on your behalf. +# MAGIC - `sfdc_client_secret`: The client secret is another component of your OAuth credentials, working alongside the client ID to provide a secure authentication mechanism. +# MAGIC +# MAGIC These variables are already initialized in the `common` notebook, where they are either configured directly there, or if they're sensitive, read there using [Databricks secrets management](https://docs.databricks.com/en/security/secrets/index.html). + +# COMMAND ---------- + +# DBTITLE 1,Connect to Salesforce data cloud +conn = SalesforceCDPConnection( + sfdc_login_url, + sfdc_username, + sfdc_password, + sfdc_client_id, + sfdc_client_secret) + +print(conn) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC +# MAGIC ## Retrieving Data with a Salesforce Query +# MAGIC +# MAGIC Following the successful establishment of our connection to Salesforce CDP, we proceed to extract product interest data using a specific SQL query. The query is structured to pull a targeted set of fields from the `sfdc_byom_demo_train__dll`, focusing on crucial metrics such as product purchases, customer engagement scores, and interaction metrics, limited to the first 10,000 records for manageability and performance optimization. +# MAGIC +# MAGIC By executing this command, we fetch the data directly into a pandas DataFrame, leveraging the `get_pandas_dataframe` method of our Salesforce connection object. + +# COMMAND ---------- + +# DBTITLE 1,Query product interest data +query = """ +SELECT + id__c, + product_purchased__c, + club_member__c, + campaign__c, + state__c, + month__c, + case_count__c, + case_type_return__c, + case_type_shipment_damaged__c, + pages_visited__c, + engagement_score__c, + tenure__c, + clicks__c +FROM + sfdc_byom_demo_train__dll +LIMIT + 10000 +""" + +df_pandas = conn.get_pandas_dataframe(query) +display(df_pandas) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Transforming Data for Analysis +# MAGIC +# MAGIC Once the product interest data is retrieved into a pandas DataFrame, the next step is to convert this DataFrame into a Spark DataFrame and refine the column names for ease of analysis. This conversion leverages the Apache Spark framework within Databricks, allowing for scalable data processing and analysis. +# MAGIC +# MAGIC This code snippet performs two key actions: +# MAGIC 1. **Conversion to Spark DataFrame**: The `spark.createDataFrame(df_pandas)` command transforms the pandas DataFrame into a Spark DataFrame, enabling the utilization of Spark's distributed data processing capabilities. +# MAGIC 2. **Column Name Refinement**: The subsequent line iterates over the column names, removing the `__c` suffix that Salesforce appends to custom fields. This simplification of column names facilitates easier access and manipulation of the data in downstream processes. +# MAGIC +# MAGIC The final `display(df_spark)` command visually presents the transformed Spark DataFrame, allowing for a quick verification of the transformations applied. + +# COMMAND ---------- + +# DBTITLE 1,Prepare Spark dataframe +# Convert to spark dataframe +df_spark = spark.createDataFrame(df_pandas) + +# Remove the __c suffix from the column names +df_spark = remove_column_suffix(df_spark, SFDC_CUSTOM_FIELD_SUFFIX) + +# Inspect the results +display(df_spark) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Storing Data in the Bronze Table +# MAGIC +# MAGIC After transforming the product interest data into a Spark DataFrame with cleaned column names, the next step involves persisting this data into a storage layer for further processing. This is achieved by writing the data to a bronze Delta table, which serves as the raw data layer in our lakehouse architecture. +# MAGIC +# MAGIC In this code block, we define the name of the bronze table as `product_interest_bronze`. Using the Spark DataFrame's `.write` method, we specify the write mode as `"overwrite"` to ensure that any existing data in the table with the same name is replaced. This approach helps in maintaining the most current dataset for analysis. The `.saveAsTable(bronze_table_name)` command then persists the DataFrame as a table in the data lake, using the specified table name. This approach is more for simplicities sake, as alternatives such as using a [merge statement](https://docs.databricks.com/en/delta/merge.html) or employing [Delta Live Tables](https://www.databricks.com/product/delta-live-tables) may be better suited depending on your specific use case. +# MAGIC +# MAGIC This process of saving the transformed data into a bronze table is a critical step in building a scalable and reliable data pipeline. It ensures that the raw data is stored in a structured format, ready for subsequent cleansing, enrichment, and analysis in the silver layer. This structured approach to data storage and management, known as the [medallion or multi-hop architecture](https://www.databricks.com/glossary/medallion-architecture), facilitates efficient data processing workflows and supports advanced analytics and machine learning projects. + +# COMMAND ---------- + +# DBTITLE 1,Write bronze table +bronze_table_name = "product_interest_bronze" + +(df_spark.write + .option("mergeSchema", "true") + .mode("overwrite") + .saveAsTable(bronze_table_name)) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Let's take a look at the table produced. We've also provided a link so you can easily jump to the table in [Unity Catalog](https://www.databricks.com/product/unity-catalog). + +# COMMAND ---------- + +# DBTITLE 1,Visualize bronze table +display(spark.table(bronze_table_name)) +display_table_link(catalog_name, schema_name, bronze_table_name) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC The next code cell focuses on creating and cleansing the data for the `product_interest_silver` table, which is aimed at refining the dataset stored in the Spark DataFrame `df_spark`: +# MAGIC +# MAGIC 1. **Basic Cleansing**: The operation `.na.drop()` is applied to the DataFrame, which removes any rows containing null or missing values. This step is crucial for ensuring the quality and reliability of the data by eliminating incomplete records that could potentially skew analysis results. +# MAGIC +# MAGIC 2. **Displaying the Cleansed Data**: After the cleansing process, the `display(product_interest_silver)` function is used to visually present the cleansed dataset. This allows for immediate verification of the data cleaning step, ensuring that the dataset now contains only complete and valid entries, ready for more sophisticated analysis or processing in the Silver layer. +# MAGIC +# MAGIC Your data cleansing steps are likely to be much more involved, and will be highly dependent on your use case. By loading the data from Salesforce in a raw fashion into the bronze layer, as you iterate on these cleansing steps you don't need to continually pull data back across connection to Salesforce. + +# COMMAND ---------- + +# DBTITLE 1,Cleanse and process incoming data +# Create product interest silver + +# basic cleansing +product_interest_silver = ( + df_spark + .na.drop()) + +display(product_interest_silver) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC The next cell is responsible for persisting the cleansed and processed data into the silver table, which is the next step in our medallion architecture. +# MAGIC +# MAGIC - **Specify Silver Table Name**: The variable `silver_table_name` is assigned the value `"product_interest_silver"`, defining the name of the table where the cleansed data will be stored. +# MAGIC +# MAGIC - **Data Persistence Operation**: The `product_interest_silver` Spark DataFrame, which holds the cleansed data, is written to the Silver table using the `.write` method. The `.mode("overwrite")` option specifies that if the table already exists, its current contents should be replaced with the new dataset. Finally, `.saveAsTable(silver_table_name)` persists the DataFrame as a table in the data lake under the defined name. +# MAGIC +# MAGIC This process ensures that the silver table is updated with the latest version of the cleansed data, ready for advanced analytics, reporting, or further processing. The use of the "overwrite" mode ensures that the data remains current, reflecting the latest available information. + +# COMMAND ---------- + +# DBTITLE 1,Write silver table +silver_table_name = "product_interest_silver" + +(product_interest_silver.write + .option("mergeSchema", "true") + .mode("overwrite") + .saveAsTable(silver_table_name)) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Further Processing: Gold Layer +# MAGIC +# MAGIC While not applicable in this particular example, many ML projects need feature engineering that involves aggregating and combining data across many different sources, producing a gold layer in the medallion architecture. For instance, if you had brought in transactional level data from Salesforce or other systems into your lakehouse and wanted to aggregate counts of data or other statistics at a user level, you would perform those aggregates on the silver layer tables to produce a gold table. +# MAGIC +# MAGIC The table we're extracting here is already having features from elsewhere in Salesforce that will lend themselves well to our downstream modeling tasks, but this is definitely something to keep in mind as you tackle new use cases. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Conclusion +# MAGIC +# MAGIC Congratulations on getting the data ingested from Salesforce Data Cloud! This is often one of the most challenging steps in the process for teams that are perhaps used to Databricks but new to Salesforce Data Cloud. Through this notebook, you have successfully navigated the process of connecting to Salesforce CDP, extracting product interest data, and performing essential transformations to prepare the data for advanced analysis. By persisting the data first in the bronze layer and then refining it for the silver layer, you've laid a solid foundation for insightful analytics and data-driven decision-making. +# MAGIC +# MAGIC ### Key Takeaways +# MAGIC +# MAGIC - **Streamlined Data Ingestion**: You've seen firsthand how to efficiently extract data from Salesforce CDP using the Salesforce CDP Connection Python Library, simplifying the process of data retrieval. +# MAGIC - **Data Transformation and Cleansing**: The transformation from the bronze to the silver layer (and in many cases a gold layer), including basic cleansing and column name refinement, ensures that the data is not only more accessible but also of higher quality. +# MAGIC - **Scalable Data Storage**: By leveraging Databricks and Spark DataFrames, you have stored your data in a structured format that supports scalable analysis and processing within a data lake architecture. +# MAGIC +# MAGIC ### Next Steps +# MAGIC +# MAGIC Now that you have some cleansed tables, let's explore the data from a data science perspective and determine which features we want to include in our model. Also, while we're building this set of notebooks in a linear fashion to facilitate learning, please note that in practice this is often a highly iterative process. You'll likely uncover things during data exploration that drive changes to your ingestion process. +# MAGIC +# MAGIC From here, please continue to the [Exploratory Data Analysis notebook]($./03_exploratory_data_analysis). diff --git a/02_ingest_data_bulk.py b/02_ingest_data_bulk.py new file mode 100644 index 0000000..f8a0c50 --- /dev/null +++ b/02_ingest_data_bulk.py @@ -0,0 +1,271 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Distributed bulk load example (optional) +# MAGIC +# MAGIC Some customers may have a large amount of data to be loaded from Salesforce CDP into Databricks, in which case the straightforward implementation may unfortunately timeout, or even if it succeeds may be excessively slow. There are a couple of approaches we can suggest in this situation. In this notebook, we look at one of the approaches: distributed ingest using primary key sampling. We assume here that we have a string id column as is typically found in Salesforce Data Cloud data model objects. +# MAGIC +# MAGIC Note that this approach also works with the dataset loaded in this demo, since we store the primary key as a text column of the form `IdXXXXXX`. +# MAGIC +# MAGIC The high level approach is as follows: +# MAGIC +# MAGIC 1. Do an initial top level aggregate query for a few key basic statistics of the table to be loaded, including the row count and the minimum and maximum id. +# MAGIC 2. Collect a small but useful sample of the keys and then assign tiles based on the number of shards desired. The aggregation happens on the Salesforce side for the tiling, and since its over only a sample of the entire dataset should still run relatively fast. +# MAGIC 3. Use the resulting keys as a guide for distributing the queries over the Databricks cluster. We'll use mapInPandas to execute a Python function on each core, each with its own connection, to collect the shard it is assigned. All of these will be collected in parallel, and since the primary keys are used directly, it should be an indexed query that should be performant. +# MAGIC +# MAGIC Let's get started! + +# COMMAND ---------- + +# DBTITLE 1,Setup and common imports +# MAGIC %run ./common + +# COMMAND ---------- + +# DBTITLE 1,Set the table name and it's ID column +table_name = "sfdc_byom_demo_train__dll" +id_col = "id__c" + +# COMMAND ---------- + +# DBTITLE 1,Helper functions +def get_salesforce_cdp_connection(): + """Connect to Salesforce Data Cloud.""" + return SalesforceCDPConnection( + sfdc_login_url, + sfdc_username, + sfdc_password, + sfdc_client_id, + sfdc_client_secret) + + +def get_id_stats(conn, table_name, id_col): + """Collect a few basic stats about the table and its ID column.""" + query = f""" + SELECT + count(*) AS count, + min({id_col}) AS min_id, + max({id_col}) AS max_id, + max(length({id_col})) AS max_length + FROM + {table_name}""" + df_pandas = conn.get_pandas_dataframe(query) + return df_pandas.iloc[0].to_dict() + + +def get_shard_ranges(conn, table_name, id_col, n_shards, id_stats): + """Get the shard ranges to use for collecting the dataset.""" + # We could potentially use the size of the table to determine + # the proportion to use here, since we collected it in id_stats. + proportion = 1.0 + + # Sample the id column at some proportion, and then within the + # resulting sample assign which shards each should fall in, and + # finally aggregate the shards to find the start_id for each. + query = f""" + SELECT + shard, + MIN({id_col}) AS start_id + FROM ( + SELECT + {id_col}, + NTILE({n_shards}) OVER (ORDER BY {id_col}) AS shard + FROM ( + SELECT + {id_col} + FROM + {table_name} + TABLESAMPLE BERNOULLI({proportion}))) + GROUP BY shard + """ + + # Now the we have the sample, the first start should be close to + # the beginning, and the last should be close to the end. To guarentee + # we don't miss any from the edges, we'll set the beginning of the first + # shard to the empty string, which will sort lexicographically before + # anything else, and a string that is lexicographically higher than + # any other string in our dataset. Each task will collect id >= start_id + # and id < end_id, which guarentees we get all the records, and statistically + # should produce shards that are close to the same size. + shards = conn.get_pandas_dataframe(query) + shards.set_index("shard", inplace=True, drop=False) + shards.sort_index(inplace=True) + + # Make sure the start_id of the first shard enables the >= check for the + # entire first shard. We could do this either by using an empty string, or + # just by using the true min_id. Since we already have the min_id we can + # use that. + shards.loc[1, "start_id"] = id_stats["min_id"] + shards.loc[1:(n_shards - 1), "end_id"] = shards.loc[2:, "start_id"].to_numpy() + + # Make sure the end_id of the last shard is higher than the max id we can get. + # since we're dealing with strings, if we take the current max_id and just append + # any extra character to it, the resulting string will meet that criteria + # --- + # Note: We can't just use "max_id" here because the upper bound check for a shard + # must be < end_id. It has to be < end_id because we're only sampling the key + # space and don't have a mechanism to partition correctly in the other shards + # otherwise. + extra_char = "_" + greater_than_max_id = id_stats["max_id"] + extra_char + assert id_stats["max_id"] < greater_than_max_id + shards.loc[n_shards, "end_id"] = greater_than_max_id + + return shards + + +from contextlib import contextmanager + +@contextmanager +def spark_conf_set(key, value): + """Temporarily set a spark config setting within a code block.""" + prior_value = spark.conf.get(key) + try: + yield spark.conf.set(key, value) + finally: + spark.conf.set(key, prior_value) + +# COMMAND ---------- + +# DBTITLE 1,Establish connection to Salesforce Data Cloud +conn = get_salesforce_cdp_connection() + +# COMMAND ---------- + +# DBTITLE 1,Define the query and template +import jinja2 + +user_query_string = f""" +SELECT + id__c, + product_purchased__c, + club_member__c, + campaign__c, + state__c, + month__c, + case_count__c, + case_type_return__c, + case_type_shipment_damaged__c, + pages_visited__c, + engagement_score__c, + tenure__c, + clicks__c +FROM + {table_name} +""" + +query_template_string = f""" +SELECT + * +FROM ( + {user_query_string} +) +WHERE {id_col} >= '{{{{ start_id }}}}' AND {id_col} < '{{{{ end_id }}}}' +""" + + +# COMMAND ---------- + +# DBTITLE 1,Collect a small sample for the schema +from pyspark.sql import types as T + +sample_query = f"""{user_query_string} LIMIT 10""" +sample_result_pandas = conn.get_pandas_dataframe(sample_query) +sample_result = spark.createDataFrame(sample_result_pandas) +result_schema = sample_result.schema +result_schema.add(T.StructField('shard', T.LongType(), True)); + +# COMMAND ---------- + +# DBTITLE 1,Define ingestion function +def ingest_records(dfs): + """Ingest a batch of records. + + In general we'll get only a single dataframe, but if we end up with + more than one it's no problem. Each dataframe would consist of one or + a few rows specifying the shard assigned along with its start and end + id. We append the shard for this example just so we can assess the + resulting distribution, but it would be fine to remove that later on + if its not needed. + """ + # Each worker core will need its own connection. + conn = get_salesforce_cdp_connection() + + # Along with its own jinja environment. + environment = jinja2.Environment() + + # Set up the query from the template string we closed over from earlier. + query_template = environment.from_string(query_template_string) + + for df in dfs: + for i, (shard, start_id, end_id) in df.iterrows(): + + # Query for this particular shard using the query template + query = query_template.render(start_id=start_id, end_id=end_id) + df = conn.get_pandas_dataframe(query) + + # Append the shard so we can analyze it later and return the result + df['shard'] = shard + yield df + +# COMMAND ---------- + +# DBTITLE 1,Define the shards to collect +num_shards = 32 +id_stats = get_id_stats(conn, table_name, id_col) +shard_ranges = get_shard_ranges(conn, table_name, id_col, num_shards, id_stats) +df_shards = spark.createDataFrame(shard_ranges) +display(df_shards) + +# COMMAND ---------- + +# DBTITLE 1,Define the dataset in terms of the shards to collect +ingested_data = ( + df_shards + .repartition(num_shards) + .mapInPandas(ingest_records, result_schema)) + +# COMMAND ---------- + +# DBTITLE 1,Inspect the results +display(ingested_data) + +# COMMAND ---------- + +# DBTITLE 1,Inspect the resulting shard sizes +# Let's see how many records we ended up with in each shard. +shard_counts = ( + ingested_data + .groupBy("shard") + .count() + .orderBy("shard")) + +# Display the shard counts (you may need to re-add the bar chart visualization) +with spark_conf_set("spark.sql.adaptive.enabled", "false"): + display(shard_counts) + +# COMMAND ---------- + +# DBTITLE 1,Make sure we got all the records +# As a sanity check, let's just make sure we got all the records we're expecting. +record_count = ingested_data.count() +if id_stats["count"] == record_count: + print(f"We got exactly {record_count} records as expected.") +else: + print(f"Oops, we only got {record_count} records, but expected {id_stats['count']}!") + +# COMMAND ---------- + +# DBTITLE 1,Benchmark with a noop write +# Note: We're turning off AQE here because of how the DataFrame is +# for planning the shards collection. In general, AQE is great +# and you should leave it on whenever possible. However, when +# you have a DataFrame where you have a small number of rows +# but each row actually drives a lot of compute, AQE will still +# in many cases try to coalesce partitions which we don't want. +# Here, we really do want the partitions to stay the same even +# through from AQE's perspective that may seem suboptimal. We +# use a context manager to make sure its only in effect temporarily +# as we execute this query. + +with spark_conf_set("spark.sql.adaptive.enabled", "false"): + ingested_data.write.format("noop").mode("overwrite").save() diff --git a/03_exploratory_data_analysis.py b/03_exploratory_data_analysis.py new file mode 100644 index 0000000..9062ef4 --- /dev/null +++ b/03_exploratory_data_analysis.py @@ -0,0 +1,315 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Exploratory Data Analysis +# MAGIC +# MAGIC This is the exploratory data analysis notebook in our series on integrating Salesforce Data Cloud with the Databricks Data Intelligence Platform. In the preceding notebook, we successfully ingested product interest data from Salesforce Data Cloud into our Databricks environment, laying the groundwork for sophisticated data-driven insights. This notebook is dedicated to dissecting and understanding that ingested data through exploratory data analysis (EDA) techniques. +# MAGIC +# MAGIC EDA is a critical step in the data science workflow as it allows us to uncover patterns, anomalies, and relationships in the data, providing valuable insights that inform subsequent stages of feature engineering and model development. By visualizing and summarizing our dataset, we aim to achieve a deep understanding of its characteristics and idiosyncrasies, which is essential for preparing the data for effective machine learning. +# MAGIC +# MAGIC As we proceed, we will explore various dimensions of the product interest data, employing a mix of statistical summaries and visualizations to identify trends, distributions, and potential outliers. This process will not only aid in ensuring the quality and integrity of our data but also in uncovering opportunities for feature creation that can enhance the performance of our eventual product recommendation model. +# MAGIC +# MAGIC Let's dive into the data and uncover the insights that will drive our next steps towards developing a powerful product recommendation system hosted in Databricks. + +# COMMAND ---------- + +# DBTITLE 1,Run common setup +# MAGIC %run ./common + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Load the silver table +# MAGIC +# MAGIC To get started, let's load the silver table we created in the ingestion task we just finished. Running our exploratory data analysis from data already loaded in the lakehouse let's us iterate much faster as we don't have to worry about making a connection back to Salesforce each time we want to run a query, and in the lakehouse data is optimized for running big data analytics. Also, in terms of medallion architecture, our intent is to be working from a cleansed dataset, so either silver or gold tables. If we identify any data quality issues during our EDA process, we would want to propagate those cleansing steps upstream to the bronze to silver transition. + +# COMMAND ---------- + +# DBTITLE 1,Load and view silver product interest table +df_spark = spark.table("product_interest_silver").drop("id") +df = df_spark.toPandas() +display(df_spark) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Run data profiler +# MAGIC +# MAGIC The Databricks built-in data profiling capability is a powerful tool designed to assist in exploratory data analysis (EDA) tasks. It provides an automated way to generate detailed statistical summaries and visualizations of the dataset, offering insights into its structure, quality, and characteristics. The data profiler in Databricks helps data scientists and analysts understand the data they are working with and make informed decisions about preprocessing, feature engineering, and modeling. This is the ideal place for us to begin our exploratory data analysis for the product data, because it gives us all of the following capabilities with a single line of code: +# MAGIC +# MAGIC - **Automated statistical summary:** The data profiler generates descriptive statistics, such as mean, median, standard deviation, minimum, and maximum values for each numerical column in the dataset. This summary provides a quick overview of the dataset's central tendencies, dispersion, and shape. +# MAGIC +# MAGIC - **Distribution visualizations:** The profiler generates visualizations to display the distribution of numerical variables, helping to identify potential outliers, skewness, and other important trends. These visualizations can include histograms, box plots, and density plots, among others. +# MAGIC +# MAGIC - **Categorical variable analysis:** The profiler also analyzes categorical variables by counting the frequency of each category. This information helps to understand the distribution and prevalence of different categories and can be useful for feature engineering or stratified analysis. +# MAGIC +# MAGIC - **Missing values detection:** The profiler identifies missing values in the dataset and reports the percentage of missing values for each column. This information is essential for determining the appropriate handling of missing data, such as imputation or removal. +# MAGIC +# MAGIC - **Correlation analysis:** The data profiler can calculate the correlation between numerical variables to identify any significant relationships. This analysis helps to understand the interdependencies between variables and can guide feature selection or transformation. +# MAGIC +# MAGIC - **Easy integration with Databricks environment:** The data profiler is seamlessly integrated into the Databricks environment, allowing users to execute the profiling on large-scale datasets efficiently. It leverages distributed computing capabilities to handle big data effectively. +# MAGIC +# MAGIC There are two ways to run the data profiler: +# MAGIC +# MAGIC 1. **Using the UI flow:** Any time you display a Spark dataframe in Databricks you have the option to add visualization tabs to the main table output. When you click the plus icon to add a visualization, you can also add a data profile. +# MAGIC +# MAGIC 2. **Calling via the dbutils library:** The same functionality is accessible via code by simply calling the dbutils method `dbutils.data.summarize(df)`. This will output the same results that the UI flow would produce. +# MAGIC +# MAGIC Let's try running it on our product interest silver table now using the dbutils library approach. + +# COMMAND ---------- + +dbutils.data.summarize(df_spark) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## View basic descriptive statistics +# MAGIC +# MAGIC While the profiler gives us a lot of information quickly, often times we'll still want to look at individual attributes using the same approach you'd use anywhere else in the Python data analysis ecosystem. +# MAGIC +# MAGIC In this section, we delve into the fundamental statistics of our dataset to establish a foundational understanding of the product interest data. Descriptive statistics are crucial as they provide a quick summary of the central tendencies, dispersion, and shape of our dataset's distribution. We will use both Spark DataFrame and Pandas DataFrame functionalities to calculate measures such as mean, median, standard deviation, minimum, and maximum values for each numerical column. Additionally, we will examine the distribution of categorical variables by counting the frequency of each category. +# MAGIC +# MAGIC This statistical analysis serves as the first step in identifying patterns, detecting outliers, and understanding the data's overall structure. It is instrumental in guiding our data preprocessing decisions, such as handling missing values, scaling and normalizing data, and potentially identifying features that could be relevant for our predictive model. +# MAGIC +# MAGIC By scrutinizing these statistics, we aim to uncover insights that will inform the more detailed exploratory analysis and feature engineering tasks ahead, ultimately enhancing the performance of our product recommendation model. Let's proceed to analyze our dataset's descriptive statistics to gain a clear view of its characteristics. + +# COMMAND ---------- + +# DBTITLE 1,Check how many rows and columns +# Print number of rows and columns of the dataframe +df.shape + +# COMMAND ---------- + +# DBTITLE 1,Display the first couple of rows of the dataframe +# Preview data +display(df.head()) + +# COMMAND ---------- + +# DBTITLE 1,Check for missing values +# Check for missing values +df.isna().sum() + +# COMMAND ---------- + +# DBTITLE 1,View numerical predictors statistics +# View numerical predictors statistics +display(df.describe().reset_index(drop=False)) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Custom visualizations +# MAGIC +# MAGIC We can also define our own per column type visualizations and summaries (e.g., categorical and numerical) and apply those to each of the columns in our dataset for a more detailed view. Let's look at an example of this with the same product interest dataset. + +# COMMAND ---------- + +# DBTITLE 1,Helper visualization methods +def describe_categorical(t): + """Create descriptive statistics of categorical variables.""" + uniquecategories = len(list(t.unique())) + print("Number of Unique Categories : " + str(uniquecategories)) + tmp = pd.DataFrame() + tmp = t.value_counts().reset_index(name='counts').rename({'index': 'Categories'}, axis=1) + tmp['%'] = 100 * tmp['counts'] / tmp['counts'].sum() + print(tmp) + tmp['percentages'] = tmp['%'].apply(lambda x: '{:.2f}%'.format(x)) + tmp.sort_values(by = '%', inplace = True, ascending = False) + ax = tmp.plot(x="Categories", y=["counts"], kind="bar") + for i, val in enumerate(tmp['counts']): + ax.text(i, val, tmp['percentages'][i], horizontalalignment='center') + + +def describe_continuous(t): + """Create descriptive statistics of continous variables.""" + t.describe() + fig, ax=plt.subplots(nrows =1,ncols=3,figsize=(10,8)) + ax[0].set_title("Distribution Plot") + #sns.histplot(t,ax=ax[0]) + sns.kdeplot(t,fill=True, ax=ax[0]) + ax[1].set_title("Violin Plot") + sns.violinplot(y=t,ax=ax[1], inner="quartile") + ax[2].set_title("Box Plot") + sns.boxplot(y=t,ax=ax[2],orient='v') + +# COMMAND ---------- + +# DBTITLE 1,Describe the product purchased label (our `y` variable) +describe_categorical(df['product_purchased']) + +# COMMAND ---------- + +# DBTITLE 1,Describe the `campaign` feature +describe_categorical(df['campaign']) + +# COMMAND ---------- + +# DBTITLE 1,Describe the `club member` feature +describe_categorical(df['club_member']) + +# COMMAND ---------- + +# DBTITLE 1,Describe the `state` feature +describe_categorical(df['state']) + +# COMMAND ---------- + +# DBTITLE 1,Describe the `month` feature +describe_categorical(df['month']) + +# COMMAND ---------- + +# DBTITLE 1,Describe the `case type return` feature +describe_categorical(df['case_type_return']) + +# COMMAND ---------- + +# DBTITLE 1,Describe the `case type shipment damaged` feature +describe_categorical(df['case_type_shipment_damaged']) + +# COMMAND ---------- + +# DBTITLE 1,Describe the `case count` feature +describe_continuous(df['case_count']) + +# COMMAND ---------- + +# DBTITLE 1,Describe the `pages visited` feature +describe_continuous(df['pages_visited']) + +# COMMAND ---------- + +# DBTITLE 1,Describe the `engagement score` feature +describe_continuous(df['engagement_score']) + +# COMMAND ---------- + +# DBTITLE 1,Describe the `tenure` feature +describe_continuous(df['tenure']) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## View correlation matrix +# MAGIC +# MAGIC Another common technique that is helpful during exploratory data analysis is to view a correlation matrix of the different variables in the dataset. This can be useful for a variety of reasons: +# MAGIC +# MAGIC - **Identify Relationships:** Quickly identify and visualize the strength and direction of relationships between different variables imported from Salesforce Data Cloud. This is crucial for understanding how different Salesforce fields relate to each other, which can inform data cleaning, feature selection, and predictive modeling. +# MAGIC +# MAGIC - **Data Cleaning and Preprocessing:** Spotting highly correlated variables can inform decisions about which variables to keep, combine, or discard, improving model performance and interpretation. +# MAGIC +# MAGIC - **Simplification:** By excluding duplicate correlations, the visualization becomes less cluttered, making it easier for stakeholders to interpret the results without a deep statistical background. +# MAGIC +# MAGIC - **Interactive Exploration:** In a notebook environment, this function complements interactive EDA by allowing users to dynamically explore different subsets of their data and immediately see the impact on variable relationships. +# MAGIC +# MAGIC Let's define a simple helper function to create such a correlation matrix from our dataset. + +# COMMAND ---------- + +# DBTITLE 1,Correlation matrix helper method +def correlation_matrix(df, dropDuplicates = True): + + # Exclude duplicate correlations by masking uper right values + if dropDuplicates: + mask = np.zeros_like(df, dtype=np.bool) + mask[np.triu_indices_from(mask)] = True + + # Set background color / chart style + sns.set_style(style = 'white') + + # Set up matplotlib figure + f, ax = plt.subplots(figsize=(5, 5)) + + # Add diverging colormap from red to blue + cmap = sns.diverging_palette(250, 10, as_cmap=True) + + # Draw correlation plot with or without duplicates + if dropDuplicates: + sns.heatmap(df, mask=mask, cmap=cmap, + square=True, + linewidth=.5, cbar_kws={"shrink": .5}, ax=ax) + else: + sns.heatmap(df, cmap=cmap, + square=True, + linewidth=.5, cbar_kws={"shrink": .5}, ax=ax) + + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Now that we have the method defined, let's apply it to our dataset. +# MAGIC +# MAGIC Note that we also need to expand out the categorical columns with one hot encoding to create dummy variables for their individual values. + +# COMMAND ---------- + +# DBTITLE 1,View correlation matrix +# define catagorical columns to convert +cat_columns = ['state', 'campaign', 'product_purchased'] + +# convert all categorical variables to numeric +df_dummies = pd.get_dummies(df , columns = cat_columns, drop_first = True) + +correlation_matrix(df_dummies.corr(), dropDuplicates = False) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC + +# COMMAND ---------- + +# MAGIC %md +# MAGIC We can also use this correlation information to determine if any values are correlated to the point that they provide completely redundant information, which could severely impact our analysis and the quality of the resulting model. For this, let's define a few additional helper functions and apply those to our dataset. + +# COMMAND ---------- + +# DBTITLE 1,Redundant pairs helper function +def get_redundant_pairs(df): + """Get diagonal and lower triangular pairs of correlation matrix.""" + pairs_to_drop = set() + cols = df.columns + for i in range(0, df.shape[1]): + for j in range(0, i+1): + pairs_to_drop.add((cols[i], cols[j])) + return pairs_to_drop + +def get_top_abs_correlations(df, n=5): + au_corr = df.corr().abs().unstack() + labels_to_drop = get_redundant_pairs(df) + au_corr = au_corr.drop(labels=labels_to_drop).sort_values(ascending=False) + return au_corr[0:n] + +print("Top absolute correlations") +print(get_top_abs_correlations(df_dummies, 5)) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC The correlation between predictors is not found to be significant so we do have to drop any predictors + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Conclusion +# MAGIC +# MAGIC That's it for our exploratory data analysis notebook. We demonstrated a couple of different ways to visualize and summarize the example product interest dataset for our scenario, and along the way we collected a variety of additional transformations to consider (some of which have already been applied to the silver table). +# MAGIC +# MAGIC Here's a summary of some useful transformations to consider for data preparation based on this analysis. +# MAGIC +# MAGIC 1. Remove null records +# MAGIC 2. Since the products purchased have a class imbalance we will need to balance the classes to reduce bias +# MAGIC 3. Since club member is a binary predictor treat it as categorical +# MAGIC 4. Since purchases in some states are much greater than others the states with smaller % of customers should be combined as other +# MAGIC 5. Month is should be treated as categorical +# MAGIC 6. Case counts greater than 3 can be combined into a single category +# MAGIC 7. Since Case Type Return is a binary predictor treat it as categorical +# MAGIC 8. Since Case Type Shipment Damaged is a binary predictor treat it as categorical +# MAGIC 9. Engagement score and Clicks need to be scaled +# MAGIC 10. Tenure needs to be treated as categorical and greater than 3 years can be combined into one bucket. +# MAGIC +# MAGIC Also, please note again that while these notebooks are being presented in a linear fashion, the EDA, data cleansing, data preparation and the rest of the model creation process are often highly iterative. Many of the transformations and observations you make along the way may end up belonging upstream in the data pipeline (for instance, all the way back in Salesforce or some other data source), in the data cleansing and loading process, in the featurization process, or even within the model transformation pipeline. Keep this in mind as you adapt this notebook and the others to your production Salesforce Data Cloud and Databricks Machine Learning projects! +# MAGIC +# MAGIC When you're ready, please proceed to the next notebook: [Feature Engineering]($./04_feature_engineering). diff --git a/04_feature_engineering.py b/04_feature_engineering.py new file mode 100644 index 0000000..80177bd --- /dev/null +++ b/04_feature_engineering.py @@ -0,0 +1,116 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC +# MAGIC # Feature Engineering +# MAGIC +# MAGIC Welcome to the Feature Engineering Notebook, a crucial part of the SalesForce Data Cloud to Databricks integration! This notebook plays a pivotal role in deriving meaningful features from raw data ingested from SalesForce, ensuring that our machine learning models are well-equipped to make accurate predictions and generate valuable insights. This step is also how we publish our features in a way that our colleagues can find and reuse them as well, using Unity Catalog as our Feature Store, in addition to providing useful capabilities for automated feature lookup and online feature serving. +# MAGIC +# MAGIC Let's get started! + +# COMMAND ---------- + +# DBTITLE 1,Run common setup +# MAGIC %run ./common + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Load the Feature Engineering in Unity Catalog API +# MAGIC The API for [Databricks Feature Engineering in Unity Catalog](https://docs.databricks.com/en/machine-learning/feature-store/uc/feature-tables-uc.html) is provided by the package `databricks.feature_engineering`. To use it, simply import the required classes from the package and then instantiate the `FeatureEngineeringClient`. + +# COMMAND ---------- + +# DBTITLE 1,Import the feature engineering library +from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup + +fe = FeatureEngineeringClient() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Load the product interest silver table +# MAGIC +# MAGIC Feature tables are a great place to store all sorts of features about a particular entity. For this particular use case, the tables we retrieved from Salesforce are already more or less feature ready, with each row representing a user and each column representing a feature of the user. However, for most of your use cases this won't be so simple. You'll likely be pulling together features from silver and gold tables across your Lakehouse, both sourced from Salesforce Data Cloud as well as from other systems within and outside your organization. +# MAGIC +# MAGIC The good news is, you don't need to learn any new technologies to pull all this data together in Databricks. At the end of the day, as long as you can retrieve and process those features using a Spark DataFrame, you can create and maintain a feature table in Unity Catalog for those features. The key requirement is that the resulting table has a primary key. To represent this process for our example here, we're simply going to load up our product interest silver table and use that to create our first feature table. +# MAGIC +# MAGIC You can use this as a baseline to know how to create and extend your own feature tables specific to your use case. + +# COMMAND ---------- + +# DBTITLE 1,Load and view silver product interest table +df = ( + spark.table("product_interest_silver") + .drop("product_purchased")) + +display(df) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Create feature table +# MAGIC +# MAGIC The main API for creating a feature table is `create_table`. +# MAGIC +# MAGIC Let's take a look at the API help for this function. + +# COMMAND ---------- + +help(fe.create_table) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC As you can see, you can either call this function with a schema or a dataframe. +# MAGIC +# MAGIC If you call it with a dataframe, it will take the schema of the provided dataframe and then write the table with all the rows of that dataframe as the initial content of the table. +# MAGIC +# MAGIC Alternatively, you can just provide a schema, and then write it later. Here, we'll demonstrate this method, taking the schema from the product interest silver table we loaded earlier. Note that this latter method is idempotent: if you call it again, it will just provide a helpful warning that the table already exists, but won't otherwise fail. You can check for the table existence explitly as well. + +# COMMAND ---------- + +# DBTITLE 1,Create the empty feature table based on the schema +base_table_name = "product_interest_features" +full_table_name = f"{catalog_name}.{schema_name}.{base_table_name}" + +fe.create_table( + name=full_table_name, + primary_keys=["id"], + schema=df.schema) + +display_table_link(catalog_name, schema_name, base_table_name) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Write the table content +# MAGIC +# MAGIC Now that we have the feature table created, we can write our feature data into the table. By default, this will use a merge statement, inserting new rows and updating existing ones based on the primary key. + +# COMMAND ---------- + +# DBTITLE 1,Merge our feature records into the table +fe.write_table(name=full_table_name, df=df) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Now that we've written the table, let's demonstrate reading it back. + +# COMMAND ---------- + +# DBTITLE 1,Read back the records and display them +display(fe.read_table(name=full_table_name)) +display_table_link(catalog_name, schema_name, base_table_name) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Conclusion +# MAGIC +# MAGIC And that's it for our Feature Engineering Notebook! You have successfully transformed raw SalesForce data into a comprehensive set of enriched features, paving the way for the development of powerful machine learning models. Use this simple example as a baseline for combining sources from across your enterprise to feed your SalesForce Data Cloud use cases with powerful machine learning models. Driving your machine learning models with well defined feature tables helps you achieve the following: +# MAGIC +# MAGIC - **Aids in reuse across models and teams:** By using Unity Catalog and the Feature Engineering library, you enable your feature tables to be defined once and discovered for reuse in additional use cases so that teams don't have to reinvent the wheel every time they need the same feature. This also helps maintain a consistent and correct definition of the feature logic. +# MAGIC - **Avoid feature / serving skew:** Since the same feature logic and tables can be used for both training the model and serving it, whether its served via batch, streaming, or as in the case with SalesForce a real-time model serving endpoint, you can rest assured that you won't have to reimplement the logic again and potentially introduce costly errors. Define the features correctly, once, and then use those same features for training, evaluation and inference pipelines. +# MAGIC +# MAGIC Now that we have the features ready to go, let's continue on to the next notebook: [Build and Train Model]($./05_build_and_train_model). diff --git a/05_build_and_train_model.py b/05_build_and_train_model.py new file mode 100644 index 0000000..2089b5a --- /dev/null +++ b/05_build_and_train_model.py @@ -0,0 +1,447 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Build and Train Model +# MAGIC +# MAGIC Now for the fun part! Up to this point, we've loaded data from Salesforce in a way that it can be efficiently cleaned up and combined with additional data from all around our organization. Then we did exploratory data analysis and prepared features in our feature table. We're finally ready to put those features to use and train our machine learning model. +# MAGIC +# MAGIC Let's get to it! + +# COMMAND ---------- + +# DBTITLE 1,Run common setup +# MAGIC %run ./common + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Import libraries +# MAGIC +# MAGIC For this example, we're going to create an XGBoost model using the scikit-learn interface. We'll exploit the power of our Databricks cluster by conducting our hyperparameter sweep in parallel over the cluster. To help us use all this great functionality, we first need to import the relevant libraries. + +# COMMAND ---------- + +# DBTITLE 1,Import libraries +from sklearn.base import BaseEstimator, TransformerMixin +from sklearn.model_selection import train_test_split +from sklearn.pipeline import Pipeline +from sklearn.compose import ColumnTransformer +from sklearn.preprocessing import StandardScaler, OneHotEncoder +from sklearn.model_selection import cross_val_score +import xgboost as xgb +from sklearn.model_selection import RandomizedSearchCV +from sklearn.preprocessing import LabelEncoder +from sklearn import metrics +import hyperopt +from hyperopt import hp + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Define transformation logic +# MAGIC +# MAGIC While feature tables are great for collecting aggregate and detailed information for specific entities, there is often a few final transformations that are better suited for transformations within the machine learning pipeline itself. Common examples are various categorical encoding methods like one-hot encoding, standardization methods and the like. That will be the case here as well, and we'll provide those transformation later as part of the scikit-learn pipeline. +# MAGIC +# MAGIC However, there are a couple of other preprocessing steps we're going to need to fit in to adjust the dataframe slighltly to make sure we match up with what Salesforce is going to pass to our model when it calls it as part of inference down the line. These transformations will be applied outside the model pipeline, but within the model wrapper. + +# COMMAND ---------- + +# DBTITLE 1,Custom transform helper function +# Apply custom transforms here +def transform(X): + # Define the non-other (retained) states list + retained_states_list = [ + 'Washington', + 'Massachusetts', + 'California', + 'Minnesota', + 'Vermont', + 'Colorado', + 'Arizona'] + + # Object conversions + int_object_cols = [ + "club_member", + "month", + "case_type_return", + "case_type_shipment_damaged"] + + # Define columns to drop + dropped_cols = ["state", "case_count", "tenure"] + + # Convert predictor types + for c in int_object_cols: + X[c] = X[c].astype(int).astype(object) + + # Implement your custom formula with if statement + # For example, if you want to create a new column based on a condition: + X['transformed_state'] = X['state'].apply( + lambda x: 'Other' if x not in retained_states_list else x) + X['transformed_cases'] = X['case_count'].apply( + lambda x: 'No Cases' if x == 0 else '1 to 2 Cases' if x <= 2 else 'Greater than 2 Cases') + X['transformed_tenure'] = X['tenure'].apply( + lambda x: 'Less than 1' if x < 1 else '1 to 2 Years' if x == 1 else '1 to 2 Years' if x == 2 else '2 to 3 Years' if x == 3 else 'Greater than 3 Years') + + # Remove columns to ignore + X = X.drop(dropped_cols, axis=1) + + # Rename certain columns + X = X.rename(columns={ + 'transformed_state': 'state', + 'transformed_cases': 'case_count', + 'transformed_tenure': 'tenure'}) + return X + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Read feature table from Unity Catalog +# MAGIC +# MAGIC In the previous notebook, we used the feature engineering API to write feature tables to Unity Catalog. +# MAGIC +# MAGIC In this notebook, we're going to use the same `FeatureEngineeringClient` to load those features back as a training set to train our model. +# MAGIC +# MAGIC We get started in the same way, importing and instantiating the client. + +# COMMAND ---------- + +# DBTITLE 1,Import the feature engineering library +from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup + +fe = FeatureEngineeringClient() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Feature tables work by providing the keys of the data to look up as a batch dataframe we pass to the API. We also need to provide our label. In this case, we're loading from the same silver table. In practice however, the feature tables will often be updated by some separate pipelines and the labels will likely come from a different source anyway. The main takeaway here is that we need to make sure we create this batch data frame to use to drive the feature lookups and provide the labels for our model. + +# COMMAND ---------- + +# DBTITLE 1,Create batch dataframe with keys and labels +batch_df = ( + spark.table("product_interest_silver") + .select("id", "product_purchased")) + +batch_df.printSchema() + +display(batch_df) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Now that we have the batch dataframe, we can create our training set. +# MAGIC +# MAGIC The training set object is created to combine the batch dataframe with the set of features to look up, as well as a mapping that tells it which lookup key in the batch dataframe should match the primary key in the feature table. In our case, it's simply the `id` field again. We didn't provide any specific feature names or renaming mapping, so this will give us all the features back from the table. + +# COMMAND ---------- + +# DBTITLE 1,Create training set from feature lookups +feature_lookups = [ + FeatureLookup( + table_name="product_interest_features", + lookup_key="id") +] + +training_set = fe.create_training_set( + df=batch_df, + feature_lookups=feature_lookups, + label="product_purchased") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Load and split the training set +# MAGIC +# MAGIC With the training set object defined, we can now load the data and create a Pandas dataframe from it as we would for basically any other scikit-learn based model. Once we load up all the data, we can then split it into the normal train, test, validation splits and apply the transformation helper function we defined earlier. + +# COMMAND ---------- + +df_pandas = training_set.load_df().toPandas() + +# Separate features and target variable +X = df_pandas.drop("product_purchased", axis=1) +y = df_pandas["product_purchased"] + +label_encoder = LabelEncoder() +y_encoded = label_encoder.fit_transform(y) + +# Split data into full training and held-out testing sets +X_train_full, X_test, y_train_full, y_test = train_test_split( + X, y_encoded, test_size=0.2, random_state=42) + +# Further divide full training set it training and validation sets +X_train, X_val, y_train, y_val = train_test_split( + X_train_full, y_train_full, test_size=0.2, random_state=42) + +# Apply pre-processing logic to the splits +X_train_full = transform(X_train_full) +X_train = transform(X_train) +X_test = transform(X_test) +X_val = transform(X_val) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Additional transformations +# MAGIC +# MAGIC There are still a few columns in our dataset that need to be preprocessed just a bit. We want to apply standard scaling to all our numeric features, and one hot encoding to all our categorical features. + +# COMMAND ---------- + +# DBTITLE 1,Define the preprocessor transform +numeric_features = [ + 'engagement_score', + 'clicks', + 'pages_visited'] + +categorical_features = [ + 'club_member', + 'campaign', + 'state', + 'month', + 'case_count', + 'case_type_return', + 'case_type_shipment_damaged', + 'tenure'] + +preprocessor = ColumnTransformer( + transformers=[ + ('numeric', StandardScaler(), numeric_features), + ('categorical', OneHotEncoder(handle_unknown="ignore"), categorical_features)]) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC To use hyperopt for our hyperparameter sweep, we need to define our search space. + +# COMMAND ---------- + +# DBTITLE 1,Define the hyperopt search space +from hyperopt.pyll import scope + +search_space = { + 'classifier__n_estimators': scope.int(hp.quniform('n_estimators', 100, 1001, 100)), + 'classifier__max_depth': scope.int(hp.quniform('max_depth', 3, 9, 4)), + 'classifier__learning_rate': hp.loguniform('learning_rate', -2, 0), + 'classifier__subsample': hp.uniform('subsample', 0.8, 1.0), + 'classifier__colsample_bytree': hp.uniform('colsample_bytree', 0.8, 1.0), + 'classifier__gamma': hp.uniform('gamma', 0, 0.2), # Range from 0 to 0.2 (inclusive) with 3 values + 'classifier__reg_alpha': hp.uniform('reg_alpha', 0, 1.0), # Range from 0 to 1 (inclusive) with 3 values + 'classifier__reg_lambda': hp.uniform('reg_lambda', 0, 1.0), +} + +# COMMAND ---------- + +# MAGIC %md +# MAGIC It can be helpful for debugging purposes to have a static set of hyperparameters to test the model structure against. + +# COMMAND ---------- + +# DBTITLE 1,Define a sample set of params for debugging +# Define a set of simple parameters to run a simple trial run and for debugging +static_params = { + 'classifier__n_estimators': 100, + 'classifier__max_depth': 3, + 'classifier__learning_rate': 0.01, + 'classifier__subsample': 0.8, + 'classifier__colsample_bytree': 0.8, + 'classifier__gamma': 0.0, + 'classifier__reg_alpha': 0.0, + 'classifier__reg_lambda': 0.0 +} + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Define the model and pipeline +# MAGIC +# MAGIC Now that we have the search space and preprocessing logic defined, let's create the actual classifier and bundle it with the preprocessor to create a pipeline. + +# COMMAND ---------- + +# DBTITLE 1,Define the model and pipeline +# Define the xgb classifier +xgb_classifier = xgb.XGBClassifier(objective='multi:softmax') + +# Create the ML pipeline +pipeline = Pipeline(steps=[ + ('transformer', preprocessor), + ('classifier', xgb_classifier)]) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Test run +# MAGIC +# MAGIC Before we do a full hyperparameter sweep and train our final model, we can do a quick test run with the static set of hyperparameters we defined earlier. + +# COMMAND ---------- + +# DBTITLE 1,Quick test run +# Run the quick trial run (leaving in here to help debugging) +with mlflow.start_run(run_name=f"{model_name}_static_params") as test_run: + params = static_params + pipeline.set_params(**params) + pipeline.fit(X_train, y_train) + mlflow.log_params(params) + y_hat = pipeline.predict(X_val) + weighted_f1_score = metrics.f1_score(y_val, y_hat, average="weighted") + accuracy_score = metrics.accuracy_score(y_val, y_hat) + mlflow.log_metric("weighted_f1_score", weighted_f1_score) + mlflow.log_metric("accuracy_score", accuracy_score) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Define objective function for hyperopt +# MAGIC +# MAGIC In addition to the search space, we also need to define our objective function for hyperopt. This is the function that hyperopt will probe using its best choices of hyperparameters from the search space we defined. In our case, we just need to train the model using the train split we carved out earlier and then evaluate the performance of that set of hyperparameters using some metric on our validation set. In our case, we'll use a weighted f1 score to cover the multiple classes we're defining for our recommender. Note that since hyperopt provides a minimization function, but for f1 score more is better, we need to multiply our metric by -1 before we return it. +# MAGIC +# MAGIC This is also where we first bump into MLflow. Here, we log a nested run to capture the combination of the set of parameters used for this particular sub-run along with the metrics it produced. However, we don't need to capture anything else, like the model itself. Once we have the best set of hyperparameters, we'll retrain over the full training set and evaluate that using our hold-out test set. + +# COMMAND ---------- + +# DBTITLE 1,Define hyperopt objective function +# Define objective function for hyperopt +def objective_fn(params): + with mlflow.start_run(nested=True): + pipeline.set_params(**params) + mlflow.log_params(params) + pipeline.fit(X_train, y_train) + y_hat = pipeline.predict(X_val) + weighted_f1_score = metrics.f1_score(y_val, y_hat, average="weighted") + accuracy = metrics.accuracy_score(y_val, y_hat) + mlflow.log_metric("weighted_f1_score", weighted_f1_score) + mlflow.log_metric("accuracy", accuracy) + + # Set the loss to -1*weighted_f1_score so fmin maximizes the weighted_f1_score + return {"status": hyperopt.STATUS_OK, "loss": -1 * weighted_f1_score} + + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Also, as mentioned earlier we're going to run this hyperparameters sweep in parallel over the cluster. To do this, we'll need to tell hyperopt how many runs we want to do in parallel. While its technically possible to run your entire budget in one go, that typically won't yield the best performance outcome, as the algorithm hyperopt uses won't be able to focus its search space as the runs proceed. A decent trade-off and heuristic to use here is the square root of your total evaluation budget. Here, we'll just use a simple budget of 16 evals, which means according to the heuristic we can use parallelism of 4. This means 4 runs will happen in parallel, and hyperopt will have multiple opportunities to improve the search space over those runs. + +# COMMAND ---------- + +# DBTITLE 1,Configure parallelism +import math + +from hyperopt import SparkTrials + +# Feel free to change max_evals if you want fewer/more trial runs +# note: we're assuming you're using the 16 core cluster created in RunMe +max_evals = 16 +parallelism = 4 # e.g., int(math.sqrt(max_evals)) or sc.defaultParallelism + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Custom model wrapper +# MAGIC +# MAGIC Since our input data to our model serving endpoint will need some preprocessing applied before we feed it to our scikit-learn pipeline, we need to create a simple wrapper class to apply the same preprocessing as well as the postprocessing to the results. Autologging has already logged the model above as part of hyperparameter tuning, but here we'll log our wrapper model along with the parameters and metrics and this will be the one we'll deploy to the endpoint. + +# COMMAND ---------- + +# Define the custom model wrapper. +class ModelWrapper(mlflow.pyfunc.PythonModel): + + def __init__(self, pipeline, label_encoder): + self.pipeline = pipeline + self.label_encoder = label_encoder + + def predict(self, context, model_input, params=None): + X = transform(model_input.copy(deep=False)) + y = self.pipeline.predict(X) + return self.label_encoder.inverse_transform(y) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Tuning and training run +# MAGIC +# MAGIC Now that we have the wrapper defined, we run our distributed hyperparameter search and then log it explicitly to the MLflow tracking experiment as well as Unity Catalog as our model registry with a call to `log_model`. Along with the model artifact, we also log the metrics and parameters we used, the signature, and sample model input to help users of the model trace back our lineage and aide reproducibility and understanding. + +# COMMAND ---------- + +# DBTITLE 1,Tune and train the model +spark_trials = SparkTrials(parallelism=parallelism) + +with mlflow.start_run(run_name=f"{model_name}_hyperopt_tuning") as run: + + # Find the best set of hyperparameters + best_params = hyperopt.fmin( + fn=objective_fn, + space=search_space, + algo=hyperopt.tpe.suggest, + max_evals=max_evals, + trials=spark_trials) + + params = hyperopt.space_eval(search_space, best_params) + + # Do a final training run with the best parameters + pipeline.set_params(**params) + pipeline.fit(X_train_full, y_train_full) + mlflow.log_params(params) + y_hat = pipeline.predict(X_test) + + # Overall metrics + weighted_f1_score = metrics.f1_score(y_test, y_hat, average="weighted") + accuracy = metrics.accuracy_score(y_test, y_hat) + mlflow.log_metric("weighted_f1_score", weighted_f1_score) + mlflow.log_metric("accuracy", accuracy) + + # Per class metrics + cm = metrics.confusion_matrix(y_test, y_hat) + tp = cm.diagonal() + fp = cm.sum(axis=0) - tp + n_classes = len(tp) + mlflow.log_metrics({f"class_{i}_tp": tp[i] for i in range(n_classes)}) + mlflow.log_metrics({f"class_{i}_fp": fp[i] for i in range(n_classes)}) + mlflow.log_metrics({f"class_{i}_accuracy": tp[i] for i in range(n_classes)}) + + # Log the model with pre and post processing logic + mlflow.pyfunc.log_model( + python_model=ModelWrapper(pipeline, label_encoder), + artifact_path="model", + signature=mlflow.models.infer_signature(X, y), + input_example=X.sample(3), + registered_model_name=f"{catalog_name}.{schema_name}.{model_name}") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Test the registered model +# MAGIC +# MAGIC Let's load the model back from the registry and make sure we can use it for predictions as a sanity test. + +# COMMAND ---------- + +# DBTITLE 1,Run a sanity test on the model +client = mlflow.MlflowClient() +model_versions = client.search_model_versions(f"name='{catalog_name}.{schema_name}.{model_name}'") +latest_version = str(max(int(v.version) for v in model_versions)) +latest_uri = f"models:/{catalog_name}.{schema_name}.{model_name}/{latest_version}" +loaded_model = mlflow.pyfunc.load_model(latest_uri) +sample_model_input = X.sample(3) +sample_model_output = loaded_model.predict(sample_model_input) +display(sample_model_output) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Assign champion alias +# MAGIC +# MAGIC For downstream scoring pipelines, including deployment to a model serving endpoint, we can reference the model by an alias to better communicate which is considered the intended live model. + +# COMMAND ---------- + +# DBTITLE 1,Assign a model alias +client.set_registered_model_alias(model_name, "champion", latest_version) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Conclusion +# MAGIC +# MAGIC Congratulations! You've just created and registered a machine learning model based on product interest loaded from Salesforce Data Cloud to recommend products for users. The techniques we used here are going to be fairly typical so we're hopeful this gives you a good head start in doing something similar with your own use case. However, we're not quite done yet! Even though we have the model deployed to the registry and could apply it from there to batch and streaming workloads, to integrate with Salesforce Data Cloud we need one more step: we need to set up a real-time serving endpoint in Databricks. When you're ready to tackle this step, continue on to the next notebook: [Deploy Serving Endpoint]($./06_deploy_serving_endpoint). diff --git a/06_deploy_serving_endpoint.py b/06_deploy_serving_endpoint.py new file mode 100644 index 0000000..d3d4175 --- /dev/null +++ b/06_deploy_serving_endpoint.py @@ -0,0 +1,175 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Deploy Serving Endpoint +# MAGIC +# MAGIC You've come a long way. You got data from Salesforce, prepared feature tables from it, and even trained and registered a model from it. Now it's time to make that model available for use in Salesforce. In this notebook, we're going to deploy our model serving endpoint. +# MAGIC +# MAGIC As with most things in Databricks, you can do this either via the UI or via the API. Here, we're going to use the API. Fortunately for us, Databricks makes this a piece of cake in both cases. Once you have a model registered in Unity Catalog, its basically as easy as pointing to that model, telling it how much concurrency you need for your users, and then clicking go. +# MAGIC +# MAGIC Databricks takes care of the rest! + +# COMMAND ---------- + +# DBTITLE 1,Run common setup +# MAGIC %run ./common + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Configure MLflow client and token +# MAGIC +# MAGIC First, we need a couple of pieces of information. To be able to refer to the model we created previously, we'll look up the version of the model by its alias, which we named as `champion`. +# MAGIC +# MAGIC We also need a token and URL to access the REST API. Normally you'd use a service principle for these and look them up from the Databricks secrets utility, but as with many of our demo notebooks, we'll grab them from our notebook context to keep the immediate focus on the deployment process and simplify things just a little more. + +# COMMAND ---------- + +# DBTITLE 1,Configure client and token +# Get the SDK client handles. +mlflow_client = mlflow.MlflowClient() + +# Pull our latest champion version from the registry +model_version = mlflow_client.get_model_version_by_alias(model_name, "champion") +print(model_version) + +# Get a token from the notebook context for testing purposes. For production you'll want +# to access a service principal token you create and store in the dbutils secrets API. +notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext() +databricks_url = notebook_context.apiUrl().getOrElse(None) +databricks_token = notebook_context.apiToken().getOrElse(None) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Configure serving endpoint +# MAGIC +# MAGIC The API call to create the serving endpoint needs to know the model to be served, and what we want to call this endpoint. In this case, we already looked up the model to be served, and we can derive the name of the endpoint from that information. We also need to tell it the workload size, and since its just a demo we'll turn on _scale to zero_ so we don't incur any costs when its not in use (Note: you'd typically want to turn this off for production deployments). +# MAGIC +# MAGIC We're also going to turn on inference tables to facilitate monitoring of our models request response pairs, which you can see in the `auto_capture_config` section below. + +# COMMAND ---------- + +# DBTITLE 1,Define endpoint configuration +# Define the endpoint configuration. +served_entity_name = f"{model_name}-{model_version.version}" +config = { + "served_entities": [ + { + "name": served_entity_name, + "entity_name": model_version.name, + "entity_version": model_version.version, + "workload_size": "Small", + "workload_type": "CPU", + "scale_to_zero_enabled": True + } + ], + "traffic_config": { + "routes": [ + { + "served_model_name": served_entity_name, + "traffic_percentage": 100 + } + ] + }, + "auto_capture_config": { + "catalog_name": catalog_name, + "schema_name": schema_name, + "table_name_prefix": model_name, + "enabled": True + } +} + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Create or update the serving endpoint +# MAGIC +# MAGIC The code below looks a little fancy, but that's basically just because its handling the case where the endpoint already exists in case you are running this for the second time. At the end of the day, once you have the configuration specified, creating the endpoint itself just boils down to the one-liner to `POST` to the serving-endpoints API endpoint to initiate the deployment of the configured endpoint (or `PUT` in case you're updating it). It works basically the same through the UI. +# MAGIC +# MAGIC Once you execute this line, it'll probably take 5 to 10 minutes to actually bring up your model serving endpoint, so run the next cell and then go grab a fresh cup of coffee ☕️, and hopefully by the time your back the endpoint will be ready to go! + +# COMMAND ---------- + +# DBTITLE 1,Create or update the model serving endpoint +# The model serving API is hosted at an endpoint in your Databricks workspace. +serving_api_endpoint = f"{databricks_url}/api/2.0/serving-endpoints" +headers = {"Authorization": f"Bearer {databricks_token}"} + +# Determine if we need to create a new endpoint or update an existing one. +list_endpoint_response = requests.get(serving_api_endpoint, headers=headers) +all_endpoints = list_endpoint_response.json()["endpoints"] +endpoint_names = [endpoint["name"] for endpoint in all_endpoints] +endpoint_already_exists = endpoint_name in endpoint_names + +# Create or update the endpoint based ont he config. +if not endpoint_already_exists: + print("creating new endpoint") + create_json = { + "name": endpoint_name, + "config": config + } + endpoint_response = requests.post(serving_api_endpoint, headers=headers, json=create_json) + endpoint = endpoint_response.json() +else: + print("updating existing endpoint") + update_endpoint_uri = f"{serving_api_endpoint}/{endpoint_name}/config" + update_json = config + endpoint_response = requests.put(update_endpoint_uri, headers=headers, json=update_json) + endpoint = endpoint_response.json() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Wait for the endpoint to be ready +# MAGIC +# MAGIC That's it really. +# MAGIC +# MAGIC The rest of the code here basically just polls the API to let us know when its ready (or, when necessary, to get some information to help with troubleshooting). You can either watch it here, or over in the endpoints UI for the endpoint we just created. + +# COMMAND ---------- + +# DBTITLE 1,Poll periodically to check if the endpoint is ready +if "error_code" in endpoint: + print(endpoint) +else: + print("waiting for endpoint to be ready...") + + # Wait for the endpoint to be ready. + endpoint_ready = False + endpoint_status_check_interval = 10 + + while True: + endpoint_response = requests.get(f"{serving_api_endpoint}/{endpoint_name}", headers=headers) + if "error_code" in endpoint_response: + print(endpoint_response) + break + state = endpoint["state"] + if state["ready"] == "READY" and state["config_update"] == "NOT_UPDATING": + print("endpoint ready") + break + # TODO: check for better ways to identify failed state or other conditions to watch out for + elif "FAILED" in state["ready"] or "FAILED" in state["config_update"]: + print("deployment failed - please check the logs") + break + else: + endpoint = endpoint_response.json() + time.sleep(endpoint_status_check_interval) + +print(endpoint) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Conclusion +# MAGIC +# MAGIC Congratulations! 🎉 +# MAGIC +# MAGIC Now your endpoint is ready to go! You could stop here if you want and continue on with the instructions we mentioned in the README for this repo to configure the endpoint integration on the Salesforce Data Cloud side. +# MAGIC +# MAGIC However, we've provided two additional notebooks to check out as well if your interested. +# MAGIC +# MAGIC - **[Test Model Inference]($./07_test_model_inference):** This notebook basically just lets you hit the model serving endpoint you just created via the REST API directly so you can test it out and experiment with it. This is particularly useful if for some reason you run into trouble when you try to set things up on the Salesforce side. So if you want to test it out before hand, you can go to that one next to try things out. +# MAGIC +# MAGIC - **[Monitoring]($./08_monitoring):** This notebook sets up a Lakehouse Monitoring pipeline based on the inference table we creater earlier. It also simulates getting labels down stream and shows you how to join those in, and then creates an inference table monitor based on that. It's a little more meaningful after you have some inferences to look at in the table, so it might be worthwhile to either test things out using the above notebook first and setting things up in Salesforce before you come back around to this one. +# MAGIC +# MAGIC Whichever way you go, thanks for making it this far! Great job! 🥳 diff --git a/07_test_model_inference.py b/07_test_model_inference.py new file mode 100644 index 0000000..97e5fbf --- /dev/null +++ b/07_test_model_inference.py @@ -0,0 +1,97 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Test Model Inference +# MAGIC +# MAGIC This supplementary notebook provides a simple way to test the model serving endpoint we created in the prior notebooks. It assumes you completed all the previous notebooks and have the silver table configured, the model registered, and the model serving endpoint deployed, so please make sure you've completed all that before trying this one! + +# COMMAND ---------- + +# DBTITLE 1,Run common setup +# MAGIC %run ./common + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Define URI and token +# MAGIC +# MAGIC As before, to use the REST API we need a URL and token, so let's get those from our notebook context for ease of use. +# MAGIC +# MAGIC Note: for production deployments, please use service principals with the secrets API for this. + +# COMMAND ---------- + +# DBTITLE 1,Configure token +# Get a token from the notebook context for testing purposes. For production you'll want +# to access a service principal token you create and store in the dbutils secrets API. +notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext() +databricks_url = notebook_context.apiUrl().getOrElse(None) +databricks_token = notebook_context.apiToken().getOrElse(None) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Grab some sample data +# MAGIC +# MAGIC We need some data to send to the endpoint. +# MAGIC +# MAGIC Fortunately, since we already created a silver table from the Salesforce Data earlier, we can just pull a few records from that. + +# COMMAND ---------- + +# DBTITLE 1,Grab some sample rows from the silver table +df = spark.table("product_interest_silver").limit(10).toPandas() + +# Separate features and target variable +X = df.drop("product_purchased", axis=1) +y = df["product_purchased"] + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Prepare the payload +# MAGIC +# MAGIC To feed the data to the REST API, we need to convert it to JSON. The easiest way to do that is via the Pandas `to_json` method using one of the `orient` options. I tend to use the `records` format as that is particularly readable and easy to inspect manually, but others can work as well. The `records` format happens to also line up with what's expected in the [monitoring notebook]($./08_monitoring) and what we've mentioned in the instructions for setting up the integration in the instructions for Salesforce Data Cloud. + +# COMMAND ---------- + +# DBTITLE 1,Prepare the payload +import json +payload = {"dataframe_records": json.loads(X[:3].to_json(orient="records"))} +print(json.dumps(payload, indent=4)) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Prepare the headers and URI +# MAGIC +# MAGIC To make the call, you need to format the URI with the name of the endpoint to call, and also put the token in an authorization header. Conveniently, this is similar to the information you'll put into Salesforce when you configure the endpoint in Salesforce (the main difference being you'll want to use a different token than the one we got from the Notebook context for that one). + +# COMMAND ---------- + +# DBTITLE 1,Prepare the headers and URI +# The model serving API is hosted at an endpoint in your Databricks workspace. +invocations_uri = f"{databricks_url}/serving-endpoints/{endpoint_name}/invocations" +headers = {"Authorization": f"Bearer {databricks_token}"} + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Post to the endpoint +# MAGIC +# MAGIC With that information defined, you now just make the `POST` call to that endpoint, passing along the authorization header and the payload. + +# COMMAND ---------- + +# DBTITLE 1,Invoke the model serving endpoint +# Call the endpoint and print the response! +response = requests.post(invocations_uri, headers=headers, json=payload) +print(response.json()) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Conclusion +# MAGIC +# MAGIC That's it. You now know how to successfully hit the model serving endpoint via the REST API. +# MAGIC +# MAGIC You are definitely ready by this point to go set up the integration in Salesforce Data Cloud. After you've done that and have some data ready to review from the inference tables, don't forgot to come back and have a look at the [monitoring notebook]($./08_monitoring) as well! diff --git a/08_monitoring.py b/08_monitoring.py new file mode 100644 index 0000000..ca1d7b8 --- /dev/null +++ b/08_monitoring.py @@ -0,0 +1,303 @@ +# Databricks notebook source +# MAGIC %md-sandbox +# MAGIC +# MAGIC # Inference Table Monitoring +# MAGIC +# MAGIC +# MAGIC +# MAGIC #### About this notebook +# MAGIC This starter notebook is intended to be used with **Databricks Model Serving** endpoints which have the *Inference Table* feature enabled. To set up a generation endpoint, refer to the guide on model serving endpoints ([AWS](https://docs.databricks.com/en/machine-learning/model-serving/score-model-serving-endpoints.html)|[Azure](https://learn.microsoft.com/en-us/azure/databricks/machine-learning/model-serving/score-model-serving-endpoints)).
+# MAGIC This notebook has three high-level purposes: +# MAGIC +# MAGIC 1. Unpack the logged requests and responses by converting your model raw JSON payloads as string. +# MAGIC 2. Compute text evaluation metrics over the extracted input/output. +# MAGIC 3. Setup Databricks Lakehouse Monitoring on the resulting table to produce data and model quality/drift metrics. +# MAGIC +# MAGIC #### How to run the notebook +# MAGIC The notebook is set up to be run step-by-step. Here are the main configuration to set: +# MAGIC * Define your model serving endpoint name (mandatory) +# MAGIC * Ensure the unpacking function works with your model input/output schema +# MAGIC * Define the checkpoint location (prefer using a Volume within your schema) +# MAGIC For best results, run this notebook on any cluster running **Machine Learning Runtime 12.2LTS or higher**. +# MAGIC +# MAGIC #### Scheduling +# MAGIC Feel free to run this notebook manually to test out the parameters; when you're ready to run it in production, you can schedule it as a recurring job.
+# MAGIC Note that in order to keep this notebook running smoothly and efficiently, we recommend running it at least **once a week** to keep output tables fresh and up to date. + +# COMMAND ---------- + +# MAGIC %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.6-py3-none-any.whl" +# MAGIC +# MAGIC dbutils.library.restartPython() + +# COMMAND ---------- + +# MAGIC %run ./common + +# COMMAND ---------- + +# MAGIC %md-sandbox +# MAGIC ## Exploring the Model Serving Inference table content +# MAGIC +# MAGIC +# MAGIC +# MAGIC Let's start by analyzing what's inside our inference table. +# MAGIC +# MAGIC The inference table name can be fetched from the model serving endpoint configuration. +# MAGIC +# MAGIC We'll first get the table name and simply run a query to view its content. + +# COMMAND ---------- + +import requests +from typing import Dict + + +def get_endpoint_status(endpoint_name: str) -> Dict: + """Fetch the PAT token to send in the API request.""" + workspace_url = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() + token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None) + + headers = {"Authorization": f"Bearer {token}"} + response = requests.get(f"{workspace_url}/api/2.0/serving-endpoints/{endpoint_name}", json={"name": endpoint_name}, headers=headers).json() + + # Verify that Inference Tables is enabled. + if "auto_capture_config" not in response.get("config", {}) or not response["config"]["auto_capture_config"]["enabled"]: + raise Exception(f"Inference Tables is not enabled for endpoint {endpoint_name}. \n" + f"Received response: {response} from endpoint.\n" + "Please create an endpoint with Inference Tables enabled before running this notebook.") + + return response + +response = get_endpoint_status(endpoint_name=endpoint_name) + +auto_capture_config = response["config"]["auto_capture_config"] +catalog = auto_capture_config["catalog_name"] +schema = auto_capture_config["schema_name"] +# These values should not be changed - if they are, the monitor will not be accessible from the endpoint page. +payload_table_name = auto_capture_config["state"]["payload_table"]["name"] +payload_table_name = f"`{catalog}`.`{schema}`.`{payload_table_name}`" +print(f"Endpoint {endpoint_name} configured to log payload in table {payload_table_name}") + +processed_table_name = f"{auto_capture_config['table_name_prefix']}_processed" +processed_table_name = f"`{catalog}`.`{schema}`.`{processed_table_name}`" +print(f"Processed requests with text evaluation metrics will be saved to: {processed_table_name}") + +payloads = spark.table(payload_table_name).limit(10) +display(payloads) + +# COMMAND ---------- + +# MAGIC %md-sandbox +# MAGIC ## Unpacking the inference table requests and responses +# MAGIC +# MAGIC +# MAGIC +# MAGIC ### Unpacking the table +# MAGIC +# MAGIC The request and response columns contains your model input and output as a `string`. +# MAGIC +# MAGIC Note that the format depends of your model definition and can be custom. Inputs are usually represented as JSON with TF format, and the output depends of your model definition. +# MAGIC +# MAGIC Because our model is designed to potentially batch multiple entries, we need to unpack the value from the request and response. +# MAGIC +# MAGIC We will use Spark JSON Path annotation to directly access the query and response as string, concatenate the input/output together with an `array_zip` and ultimately `explode` the content to have 1 input/output per line (unpacking the batches) +# MAGIC +# MAGIC **Make sure you change the following selectors based on your model definition** +# MAGIC +# MAGIC *Note: This will be made easier within the product directly, we provide this notebook to simplify this task for now.* + +# COMMAND ---------- + +# The format of the input payloads, following the TF "inputs" serving format with a "query" field. +# Single query input format: {"inputs": [{"query": "User question?"}]} +INPUT_REQUEST_JSON_PATH = "inputs[*].query" +# Matches the schema returned by the JSON selector (inputs[*].query is an array of string) +INPUT_JSON_PATH_TYPE = "array" + +# Answer format: {"predictions": ["answer"]} +OUTPUT_REQUEST_JSON_PATH = "predictions" +# Matches the schema returned by the JSON selector (predictions is an array of string) +OUPUT_JSON_PATH_TYPE = "array" + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Since we want to measure the model performance over time, we also need an ingestion pipeline to ingest the actual purchases once they're known in the system, wherever those are available. For demo purposes, we'll suppose the data for purchases is again coming from Salesforce and simply ingest it inline within this notebook, but for a production setting you'd want to set this up alongside the rest of your data pipeline (e.g., via a DLT pipeline). + +# COMMAND ---------- + +packed_requests = spark.table(f"{catalog_name}.{schema_name}.recommender_payload") +display(packed_requests) + +# COMMAND ---------- + +input_request_json_path = "" + +def unpack_requests(packed_requests: DataFrame) -> DataFrame: + + input_record_type = ( + spark.table("product_interest_silver") + .drop("product_purchased").schema) + + request_schema = T.StructType([ + T.StructField("dataframe_records", T.ArrayType(input_record_type))]) + + response_schema = T.StructType([ + T.StructField("predictions", T.ArrayType(T.StringType()))]) + + df = ( + packed_requests + .filter(F.col("status_code") == "200") + .withColumn("request_unpacked", F.from_json("request", request_schema)) + .withColumn("response_unpacked", F.from_json("response", response_schema)) + .withColumn("request_response_unpacked", F.arrays_zip( + "request_unpacked.dataframe_records", + "response_unpacked.predictions")) + .withColumn("exploded", F.explode(F.expr("request_response_unpacked"))) + .withColumn("model_id", F.concat( + F.col("request_metadata.model_name").alias("model_name"), + F.lit("/"), + F.col("request_metadata.model_version").alias("model_version"))) + .select( + "databricks_request_id", + (F.col("timestamp_ms") / 1000).cast("timestamp").alias("timestamp"), + "exploded.dataframe_records.*", + F.col("exploded.predictions").alias("prediction"), + "model_id")) + + return df + +unpacked_requests = unpack_requests(packed_requests) + +# COMMAND ---------- + +# DBTITLE 1,Ingest the ground truth labels +def get_sfdc_actual_purchases(): + """Retrieve our actual purchases table from SFDC.""" + + conn = SalesforceCDPConnection( + sfdc_login_url, + sfdc_username, + sfdc_password, + sfdc_client_id, + sfdc_client_secret) + + query = """ + SELECT + id__c, + product_purchased__c + FROM + sfdc_byom_demo_validate__dll + """ + + df_pandas = conn.get_pandas_dataframe(query) + df = spark.createDataFrame(df_pandas) + return remove_column_suffix(df, SFDC_CUSTOM_FIELD_SUFFIX) + + +# For demo purposes, we'll go directly to a silver layer table. +# In a real setting you'll want to stick with ingesting to bronze +# and then letting silver be a cleansed and processed layer. + +# Create the table by loading from SFDC in case it doesn't exist. +if not spark.catalog.tableExists("actual_purchases_silver"): + get_sfdc_actual_purchases().write.saveAsTable("actual_purchases_silver") + +# Load the actual purchases table from Delta Lake +df_actual_purchases = spark.table("actual_purchases_silver") +display(df_actual_purchases) + +# COMMAND ---------- + +product_purchases_join = ( + unpacked_requests + .join(df_actual_purchases, "id", "left")) + +display(product_purchases_join) + +# COMMAND ---------- + +import delta +from delta import DeltaTable + +processed_table_name = "recommender_payload_with_actuals" + +if spark.catalog.tableExists(processed_table_name): + payload_actuals_table = DeltaTable.forName(spark, processed_table_name) + payload_actuals_merge = ( + payload_actuals_table.alias("target") + .merge(product_purchases_join.alias("source"), "source.id = target.id") + .whenNotMatchedInsertAll() + .whenMatchedUpdate(set={"target.product_purchased": "source.product_purchased"})) + payload_actuals_merge.execute() +else: + product_purchases_join.write.saveAsTable(processed_table_name) + spark.sql(f""" + ALTER TABLE {processed_table_name} + SET TBLPROPERTIES (delta.enableChangeDataFeed = true)""") + +display(spark.table(processed_table_name)) + +# COMMAND ---------- + +# MAGIC %md-sandbox +# MAGIC +# MAGIC ### Monitor the inference table +# MAGIC +# MAGIC +# MAGIC In this step, we create a monitor on our inference table by using the `create_monitor` API. If the monitor already exists, we pass the same parameters to `update_monitor`. In steady state, this should result in no change to the monitor. +# MAGIC +# MAGIC Afterwards, we queue a metric refresh so that the monitor analyzes the latest processed requests. +# MAGIC +# MAGIC See the Lakehouse Monitoring documentation ([AWS](https://docs.databricks.com/lakehouse-monitoring/index.html) | [Azure](https://learn.microsoft.com/azure/databricks/lakehouse-monitoring/index)) for more details on the parameters and the expected usage. + +# COMMAND ---------- + +import databricks.lakehouse_monitoring as lm + +# Optional parameters to control monitoring analysis. +# For help, use the command help(lm.create_monitor). +GRANULARITIES = ["1 day"] # Window sizes to analyze data over +SLICING_EXPRS = None # Expressions to slice data with +CUSTOM_METRICS = None # A list of custom metrics to compute +BASELINE_TABLE = None # Baseline table name, if any, for computing baseline drift + +monitor_params = { + "profile_type": lm.InferenceLog( + timestamp_col="timestamp", + granularities=GRANULARITIES, + problem_type="classification", + prediction_col="prediction", + label_col="product_purchased", + model_id_col="model_id" + ), + "output_schema_name": f"{catalog_name}.{schema_name}", + "schedule": None, # We will refresh the metrics on-demand in this notebook + "baseline_table_name": BASELINE_TABLE, + "slicing_exprs": SLICING_EXPRS, + "custom_metrics": CUSTOM_METRICS +} + +try: + info = lm.create_monitor(table_name=processed_table_name, **monitor_params) + print(info) +except Exception as e: + # Ensure the exception was expected + assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" + + # Update the monitor if any parameters of this notebook have changed. + lm.update_monitor(table_name=processed_table_name, updated_params=monitor_params) + # Refresh metrics calculated on the requests table. + refresh_info = lm.run_refresh(table_name=processed_table_name) + print(refresh_info) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC +# MAGIC ## Our table is now monitored +# MAGIC +# MAGIC Databricks Lakehouse Monitoring automatically builds dashboard to track your metrics and their evolution over time. +# MAGIC +# MAGIC You can leverage your metric table to track your model behavior over time, and setup alerts to detect potential changes in accuracy or drift, and even trigger retrainings. diff --git a/CONNECTED_APP.md b/CONNECTED_APP.md new file mode 100644 index 0000000..5810dee --- /dev/null +++ b/CONNECTED_APP.md @@ -0,0 +1,80 @@ +## Set up a connected app in Salesforce + +To be able to access SalesForce Data Cloud via the connector, you'll first need a connected app in Salesforce. Here's how to create that. + +### 1. Go to Salesforce setup + +Log in to Salesforce and go to setup + +![image](files/sfdc_byom/images/connected_app_01.png) + + +### 2. Open up App Manager + +Search for App Manager + +![image](files/sfdc_byom/images/connected_app_02.png) + +When you open it, it should look like this + +![image](files/sfdc_byom/images/connected_app_03.png) + + +### 3. Create Connected App + +Click on New Connected App + +![image](files/sfdc_byom/images/connected_app_04.png) + +1. Give the app a name +2. Enter email +3. Check Enable OAuth settings +4. Put "https://login.salesforce.com/services/oauth2/success" in the callback url +5. Ensure that the following are selected in the scopes + 1. Manage user data via APIs (api) + 2. Access all datacloud resources + 3. Perform ANSI SQL queries on DataCloud + +![image](files/sfdc_byom/images/connected_app_05.png) + +Click on Save. + +![image](files/sfdc_byom/images/connected_app_06.png) + + +### 4. Update policies + +In set up go to Manage Connected App + +![image](files/sfdc_byom/images/connected_app_07.png) + +Click on the newly created connected app and then click on Edit Policies. + +![image](files/sfdc_byom/images/connected_app_08.png) + +Make sure that under oauth policies we have "Relax IP restrictions" and "Allow all users to self authorize" and then click Save. + +![image](files/sfdc_byom/images/connected_app_09.png) + + +### 5. Set up customer keys (optional) + +Click on Manage Customer Keys and provide validation code if applicable. + +![image](files/sfdc_byom/images/connected_app_10.png) + +Copy the keys. + +![image](files/sfdc_byom/images/connected_app_11.png) + + +%md +### 6. Ensure Oauth and OpenId are enabled + +In setup, go to Oauth and OpenId settings. Ensure all the options are turned on. + +![image](files/sfdc_byom/images/connected_app_12.png) + +**Note:** If you want to restrict IP's, you can set it up in the connected app. See the article [Restrict Access to Trusted IP Ranges for a Connected App](https://help.salesforce.com/s/articleView?id=sf.connected_app_edit_ip_ranges.htm&type=5) for more details. + + diff --git a/DATABRICKS_SECRETS.md b/DATABRICKS_SECRETS.md new file mode 100644 index 0000000..b900283 --- /dev/null +++ b/DATABRICKS_SECRETS.md @@ -0,0 +1,12 @@ +## Install Databricks CLI and set up secrets + +1. Use the steps in the link to install the Databricks CLI on your machine - https://docs.databricks.com/en/dev-tools/cli/install.html +2. Follow these instructions to set up authentication between the Databricks CLI and your Databricks accounts and workspaces - https://docs.databricks.com/en/dev-tools/cli/authentication.html +3. To establish a Salesforce connection and configure connection secrets from the command line interface (CLI), execute the following commands. Keep in mind that "sfdc—byom-scope" is the scope's name, and you can assign any relevant name of your choice. + +``` +databricks secrets create-scope sfdc—byom-scope +databricks secrets put-secret sfdc—byom-scope sfdc-byom-cdpcrma-password —string-value <> +databricks secrets put-secret sfdc—byom-scope sfdc-byom-cdpcrma-client-id —string-value <> +databricks secrets put-secret sfdc—byom-scope sfdc-byom-cdpcrma-client-secret —string-value <> +``` \ No newline at end of file diff --git a/DATA_STREAM.md b/DATA_STREAM.md new file mode 100644 index 0000000..e45500c --- /dev/null +++ b/DATA_STREAM.md @@ -0,0 +1,67 @@ +## Upload training data set into Datacloud + +A synthetic dataset comprising 1 million rows was generated for this purpose, encompassing the following attributes, + +- **Club Member:** Indicates whether the customer is a club member. +- **Campaign:** Represents the campaign the customer is associated with. +- **State:** Denotes the state where the customer resides. +- **Month:** Indicates the month of purchase. +- **Case Count:** The number of cases raised by the customer. +- **Case Type Return:** Specifies whether the customer returned any product in the last year. +- **Case Type Shipment Damaged:** Indicates whether the customer experienced any shipment damage in the last year. +- **Engagement Score:** Reflects the level of customer engagement, including responses to mailing campaigns, logins to the online platform, etc. +- **Tenure:** This represents the number of years the customer has been part of NT. +- **Clicks:** The average number of clicks the customer made within one week before purchase. +- **Pages Visited:** The average number of page visits the customer made within one week before purchase. +- **Product Purchased:** Specifies the product purchased by the customer. + +In a real-life scenario, DataCloud can be utilized to ingest data from various sources, employing powerful batch and streaming transformational capabilities to create a robust dataset for model training. + +The dataset can be accessed here. Afterward, you have the option to upload the CSV file to an S3 bucket. + +Here are the steps to create Data Streams from S3 in Salesforce: + + +Log in to the org + +![image](files/sfdc_byom/images/create_data_stream_01.png) + + +Navigate to "Data Streams" and click "New" + +![image](files/sfdc_byom/images/create_data_stream_02.png) + + +Select "Amazon S3" and click on Next + +![image](files/sfdc_byom/images/create_data_stream_03.png) + + +Enter S3 bucket and file details + +![image](files/sfdc_byom/images/create_data_stream_04.png) + + +Click Next + +![image](files/sfdc_byom/images/create_data_stream_05.png) + + +Click Next + +![image](files/sfdc_byom/images/create_data_stream_06.png) + + +Click on Full Refresh + +![image](files/sfdc_byom/images/create_data_stream_07.png) + + +Select Frequency = "None" + +![image](files/sfdc_byom/images/create_data_stream_08.png) + + +Click Deploy to create data stream + +![image](files/sfdc_byom/images/create_data_stream_09.png) diff --git a/EINSTEIN_MODEL.md b/EINSTEIN_MODEL.md new file mode 100644 index 0000000..97fe5ed --- /dev/null +++ b/EINSTEIN_MODEL.md @@ -0,0 +1,140 @@ +## Set up model in Einstein Studio + + +### 1. Log in to the org + +![image](files/sfdc_byom/images/deploy_model_01.png) + + +### 2. Navigate to ML Workspace / Einstein Studio + +![image](files/sfdc_byom/images/deploy_model_02.png) + + +### 3. Select ML Workspace + +![image](files/sfdc_byom/images/deploy_model_03.png) + + +### 4. Click New + +You should see a toast message that the end point was saved successfully + +![image](files/sfdc_byom/images/deploy_model_04.png) + + +### 5. Give your model a name and click create + +![image](files/sfdc_byom/images/deploy_model_05.png) + + +### 6. Select Endpoint + +![image](files/sfdc_byom/images/deploy_model_06.png) + + +### 7. Click on add endpoint + +![image](files/sfdc_byom/images/deploy_model_07.png) + + +### 8. Enter inference url from databrisck as well as request format as dataframe split + +![image](files/sfdc_byom/images/deploy_model_08.png) + + +### 9. Select Authentication type, Auth Header= "Authorization" + +![image](files/sfdc_byom/images/deploy_model_09.png) + + +### 10. Secret Key = "Bearer <>" + +![image](files/sfdc_byom/images/deploy_model_10.png) + + +### 11. Click Save. + +You should see a toast message that the end point was saved successfully + +![image](files/sfdc_byom/images/deploy_model_11.png) + + +### 12. Select input features + +![image](files/sfdc_byom/images/deploy_model_12.png) + + +### 13. Click on Add input features + +![image](files/sfdc_byom/images/deploy_model_13.png) + + +### 14. Choose the DMO + +Choose the DMO that has all the fields for model scoring in this case it is account contact DMO. + +![image](files/sfdc_byom/images/deploy_model_14.png) + + +### 15. Click Save + +![image](files/sfdc_byom/images/deploy_model_15.png) + + +### 16. Select fields from DMO for scoring + +Now start selecting the fields from the DMO for model scoring. Note that the feature API name of the field selected should match the names the model is expecting for instance as shown in the query endpoint dialog above + +![image](files/sfdc_byom/images/deploy_model_16.png) + + +### 17. Drag each predictor and click done one by one in the specific order + +![image](files/sfdc_byom/images/deploy_model_17.png) + + +### 18. Once you enter all the predictors in the click on save + +![image](files/sfdc_byom/images/deploy_model_18.png) + + +### 19. Next go to output Predictions + +![image](files/sfdc_byom/images/deploy_model_19.png) + + +### 20. Give the DMO a name. + +This is where the output predictions will be saved + +![image](files/sfdc_byom/images/deploy_model_20.png) + + +### 21. Click save + +![image](files/sfdc_byom/images/deploy_model_21.png) + + +### 22. Enter the outcome variable API name and the json key + +Note that in this case the json key is - $.predictions + +![image](files/sfdc_byom/images/deploy_model_22.png) + + +### 23. Click Save + +![image](files/sfdc_byom/images/deploy_model_23.png) + + +### 24. Now activate the model + +![image](files/sfdc_byom/images/deploy_model_24.png) + + +### 25. Once model is activated refresh it to see the predictions in the DMO + +![image](files/sfdc_byom/images/deploy_model_25.png) + + diff --git a/common.py b/common.py new file mode 100644 index 0000000..6b069db --- /dev/null +++ b/common.py @@ -0,0 +1,130 @@ +# Databricks notebook source +# DBTITLE 1,Common imports and setup +import os +import re +import time +import requests +import pandas as pd +import matplotlib.pyplot as plt +import seaborn as sns +import numpy as np +import mlflow +import mlflow.sklearn +import mlflow.pyfunc +import mlflow.deployments + +from pyspark.sql import types as T +from pyspark.sql import functions as F +from pyspark.sql import DataFrame, Window + +from salesforcecdpconnector.connection import SalesforceCDPConnection + +SFDC_CUSTOM_FIELD_SUFFIX = "__c" + +%config InlineBackend.figure_format = "retina" + +# COMMAND ---------- + +# DBTITLE 1,Specify Unity Catalog for Model Registry +# Use Unity Catalog as our model registry +mlflow.set_registry_uri("databricks-uc") + +# COMMAND ---------- + +# DBTITLE 1,Get username for naming +# Set up some object naming helpers +project_slug = "sfdc_byom" +current_user = ( + dbutils.notebook.entry_point.getDbutils().notebook() + .getContext().tags().apply('user')) +current_user_no_at = current_user[:current_user.rfind('@')] +current_user_no_at = re.sub(r'\W+', '_', current_user_no_at) + +# COMMAND ---------- + +# DBTITLE 1,Unity Catalog configuration +# Configure the names of the catalog and schema (a.k.a. database) to use +# for storing data and AI assets related to this project. You can use an +# existing one or let it create a new one. You will need permissions to do +# so, and in case you don't you'll need to work with your Databricks admins +# to get it set up. +catalog_name = "main" +schema_name = f"{project_slug}_{current_user_no_at}" +model_name = f"recommender" +endpoint_name = f"{project_slug}-{current_user_no_at}-{model_name}".replace("_", "-") +#spark.sql(f"create catalog if not exists {catalog_name}"); +spark.sql(f"use catalog {catalog_name}") +spark.sql(f"create schema if not exists {schema_name}") +spark.sql(f"use schema {schema_name}"); + +# COMMAND ---------- + +# DBTITLE 1,Experiment name and local dir +local_working_dir = f'/tmp/{current_user}/{project_slug}' +experiment_path = f'/Users/{current_user}/{project_slug}' +os.makedirs(local_working_dir, exist_ok=True) +os.chdir(local_working_dir) +experiment = mlflow.set_experiment(experiment_path) + +# Make sure we're using Unity Catalog for storing models. +mlflow.set_registry_uri("databricks-uc") + +# COMMAND ---------- + +# DBTITLE 1,Salesforce connection config +# Update these with the configuration matching your environment. +# 1. Create a secret scope if you don't already have one. +# 2. Add the three secret keys to the scope with the corresponding values. +# 3. Update the key names and scope name here. +# 4. Also update the login URL and username to use here. +sfdc_secret_scope = "corey-abshire" +sfdc_password_key = "sfdc-byom-cdpcrma-password" +sfdc_client_id_key = "sfdc-byom-cdpcrma-client-id" +sfdc_client_secret_key = "sfdc-byom-cdpcrma-client-secret" +sfdc_login_url = "https://login.salesforce.com/" +sfdc_username = "corey.abshire+sfdc-partner@databricks.com" +sfdc_password = dbutils.secrets.get(sfdc_secret_scope, sfdc_password_key) +sfdc_client_id = dbutils.secrets.get(sfdc_secret_scope, sfdc_client_id_key) +sfdc_client_secret = dbutils.secrets.get(sfdc_secret_scope, sfdc_client_secret_key) + +# COMMAND ---------- + +# DBTITLE 1,Helpful utility functions +# These are just some helper functions to assist with displaying some +# helpful links within some of the notebooks. + +def display_link(link, text=None): + """Format and display a link in a Databricks notebook cell.""" + if text is None: + text = link + html = f"""{text}""" + displayHTML(html) + + +def display_table_link(catalog_name: str, schema_name: str, table_name: str): + """Format a link for the given table into Unity Catalog.""" + link = f"./explore/data/{catalog_name}/{schema_name}/{table_name}" + text = f"{catalog_name}.{schema_name}.{table_name}" + display_link(link, text) + + +def remove_column_suffix(df: DataFrame, suffix: str) -> DataFrame: + """Remove the given suffix from each column.""" + + # need to define remove suffix ourselves in case user is on python 3.8 + def remove_suffix(s): + if s.endswith(suffix): + return s[:-len(suffix)] + + return df.toDF(*[remove_suffix(c) for c in df.columns]) + +# COMMAND ---------- + +# DBTITLE 1,Report helpful config info +# Report out the configuration settings the user may need +print(f"using catalog {catalog_name}") +print(f"using schema (database) {schema_name}") +print(f"using experiment path {experiment_path}") +print(f"using local working dir {local_working_dir}") +print(f"using model name {model_name}") +print(f"using endpoint name {endpoint_name}") diff --git a/images/connected_app_01.png b/images/connected_app_01.png new file mode 100644 index 0000000..36dafe4 Binary files /dev/null and b/images/connected_app_01.png differ diff --git a/images/connected_app_02.png b/images/connected_app_02.png new file mode 100644 index 0000000..c3deb95 Binary files /dev/null and b/images/connected_app_02.png differ diff --git a/images/connected_app_03.png b/images/connected_app_03.png new file mode 100644 index 0000000..b14180c Binary files /dev/null and b/images/connected_app_03.png differ diff --git a/images/connected_app_04.png b/images/connected_app_04.png new file mode 100644 index 0000000..c9f3ab7 Binary files /dev/null and b/images/connected_app_04.png differ diff --git a/images/connected_app_05.png b/images/connected_app_05.png new file mode 100644 index 0000000..69d4be8 Binary files /dev/null and b/images/connected_app_05.png differ diff --git a/images/connected_app_06.png b/images/connected_app_06.png new file mode 100644 index 0000000..a058132 Binary files /dev/null and b/images/connected_app_06.png differ diff --git a/images/connected_app_07.png b/images/connected_app_07.png new file mode 100644 index 0000000..94d83c3 Binary files /dev/null and b/images/connected_app_07.png differ diff --git a/images/connected_app_08.png b/images/connected_app_08.png new file mode 100644 index 0000000..616244d Binary files /dev/null and b/images/connected_app_08.png differ diff --git a/images/connected_app_09.png b/images/connected_app_09.png new file mode 100644 index 0000000..6773347 Binary files /dev/null and b/images/connected_app_09.png differ diff --git a/images/connected_app_10.png b/images/connected_app_10.png new file mode 100644 index 0000000..0ee8844 Binary files /dev/null and b/images/connected_app_10.png differ diff --git a/images/connected_app_11.png b/images/connected_app_11.png new file mode 100644 index 0000000..34a8753 Binary files /dev/null and b/images/connected_app_11.png differ diff --git a/images/connected_app_12.png b/images/connected_app_12.png new file mode 100644 index 0000000..9a885cd Binary files /dev/null and b/images/connected_app_12.png differ diff --git a/images/consumption_01.png b/images/consumption_01.png new file mode 100644 index 0000000..d9e8979 Binary files /dev/null and b/images/consumption_01.png differ diff --git a/images/consumption_02.png b/images/consumption_02.png new file mode 100644 index 0000000..87da29e Binary files /dev/null and b/images/consumption_02.png differ diff --git a/images/consumption_03.png b/images/consumption_03.png new file mode 100644 index 0000000..8c80f21 Binary files /dev/null and b/images/consumption_03.png differ diff --git a/images/consumption_04.png b/images/consumption_04.png new file mode 100644 index 0000000..da6a39e Binary files /dev/null and b/images/consumption_04.png differ diff --git a/images/consumption_05.png b/images/consumption_05.png new file mode 100644 index 0000000..0f05ba2 Binary files /dev/null and b/images/consumption_05.png differ diff --git a/images/consumption_06.png b/images/consumption_06.png new file mode 100644 index 0000000..6a4b348 Binary files /dev/null and b/images/consumption_06.png differ diff --git a/images/consumption_07.png b/images/consumption_07.png new file mode 100644 index 0000000..6ac3a01 Binary files /dev/null and b/images/consumption_07.png differ diff --git a/images/create_data_stream_01.png b/images/create_data_stream_01.png new file mode 100644 index 0000000..10fa83a Binary files /dev/null and b/images/create_data_stream_01.png differ diff --git a/images/create_data_stream_02.png b/images/create_data_stream_02.png new file mode 100644 index 0000000..dc5f67d Binary files /dev/null and b/images/create_data_stream_02.png differ diff --git a/images/create_data_stream_03.png b/images/create_data_stream_03.png new file mode 100644 index 0000000..4171ee2 Binary files /dev/null and b/images/create_data_stream_03.png differ diff --git a/images/create_data_stream_04.png b/images/create_data_stream_04.png new file mode 100644 index 0000000..629656d Binary files /dev/null and b/images/create_data_stream_04.png differ diff --git a/images/create_data_stream_05.png b/images/create_data_stream_05.png new file mode 100644 index 0000000..93cf8fb Binary files /dev/null and b/images/create_data_stream_05.png differ diff --git a/images/create_data_stream_06.png b/images/create_data_stream_06.png new file mode 100644 index 0000000..a46d60f Binary files /dev/null and b/images/create_data_stream_06.png differ diff --git a/images/create_data_stream_07.png b/images/create_data_stream_07.png new file mode 100644 index 0000000..2667f9c Binary files /dev/null and b/images/create_data_stream_07.png differ diff --git a/images/create_data_stream_08.png b/images/create_data_stream_08.png new file mode 100644 index 0000000..1a3655a Binary files /dev/null and b/images/create_data_stream_08.png differ diff --git a/images/create_data_stream_09.png b/images/create_data_stream_09.png new file mode 100644 index 0000000..6e7a630 Binary files /dev/null and b/images/create_data_stream_09.png differ diff --git a/images/deploy_model_01.png b/images/deploy_model_01.png new file mode 100644 index 0000000..10fa83a Binary files /dev/null and b/images/deploy_model_01.png differ diff --git a/images/deploy_model_02.png b/images/deploy_model_02.png new file mode 100644 index 0000000..d92e230 Binary files /dev/null and b/images/deploy_model_02.png differ diff --git a/images/deploy_model_03.png b/images/deploy_model_03.png new file mode 100644 index 0000000..fc31fc7 Binary files /dev/null and b/images/deploy_model_03.png differ diff --git a/images/deploy_model_04.png b/images/deploy_model_04.png new file mode 100644 index 0000000..ebea493 Binary files /dev/null and b/images/deploy_model_04.png differ diff --git a/images/deploy_model_05.png b/images/deploy_model_05.png new file mode 100644 index 0000000..2a77768 Binary files /dev/null and b/images/deploy_model_05.png differ diff --git a/images/deploy_model_06.png b/images/deploy_model_06.png new file mode 100644 index 0000000..cad46d8 Binary files /dev/null and b/images/deploy_model_06.png differ diff --git a/images/deploy_model_07.png b/images/deploy_model_07.png new file mode 100644 index 0000000..c208c43 Binary files /dev/null and b/images/deploy_model_07.png differ diff --git a/images/deploy_model_08.png b/images/deploy_model_08.png new file mode 100644 index 0000000..75953bb Binary files /dev/null and b/images/deploy_model_08.png differ diff --git a/images/deploy_model_09.png b/images/deploy_model_09.png new file mode 100644 index 0000000..590a036 Binary files /dev/null and b/images/deploy_model_09.png differ diff --git a/images/deploy_model_10.png b/images/deploy_model_10.png new file mode 100644 index 0000000..33270ae Binary files /dev/null and b/images/deploy_model_10.png differ diff --git a/images/deploy_model_11.png b/images/deploy_model_11.png new file mode 100644 index 0000000..eaa66d9 Binary files /dev/null and b/images/deploy_model_11.png differ diff --git a/images/deploy_model_12.png b/images/deploy_model_12.png new file mode 100644 index 0000000..e880672 Binary files /dev/null and b/images/deploy_model_12.png differ diff --git a/images/deploy_model_13.png b/images/deploy_model_13.png new file mode 100644 index 0000000..fb2323a Binary files /dev/null and b/images/deploy_model_13.png differ diff --git a/images/deploy_model_14.png b/images/deploy_model_14.png new file mode 100644 index 0000000..5ad42d1 Binary files /dev/null and b/images/deploy_model_14.png differ diff --git a/images/deploy_model_15.png b/images/deploy_model_15.png new file mode 100644 index 0000000..cd8f40d Binary files /dev/null and b/images/deploy_model_15.png differ diff --git a/images/deploy_model_16.png b/images/deploy_model_16.png new file mode 100644 index 0000000..3d946ea Binary files /dev/null and b/images/deploy_model_16.png differ diff --git a/images/deploy_model_17.png b/images/deploy_model_17.png new file mode 100644 index 0000000..5bee4b1 Binary files /dev/null and b/images/deploy_model_17.png differ diff --git a/images/deploy_model_18.png b/images/deploy_model_18.png new file mode 100644 index 0000000..b9a6bca Binary files /dev/null and b/images/deploy_model_18.png differ diff --git a/images/deploy_model_19.png b/images/deploy_model_19.png new file mode 100644 index 0000000..8870564 Binary files /dev/null and b/images/deploy_model_19.png differ diff --git a/images/deploy_model_20.png b/images/deploy_model_20.png new file mode 100644 index 0000000..1e4f9dd Binary files /dev/null and b/images/deploy_model_20.png differ diff --git a/images/deploy_model_21.png b/images/deploy_model_21.png new file mode 100644 index 0000000..b0528e4 Binary files /dev/null and b/images/deploy_model_21.png differ diff --git a/images/deploy_model_22.png b/images/deploy_model_22.png new file mode 100644 index 0000000..4065406 Binary files /dev/null and b/images/deploy_model_22.png differ diff --git a/images/deploy_model_23.png b/images/deploy_model_23.png new file mode 100644 index 0000000..a9b6373 Binary files /dev/null and b/images/deploy_model_23.png differ diff --git a/images/deploy_model_24.png b/images/deploy_model_24.png new file mode 100644 index 0000000..61bb850 Binary files /dev/null and b/images/deploy_model_24.png differ diff --git a/images/deploy_model_25.png b/images/deploy_model_25.png new file mode 100644 index 0000000..5f87dc7 Binary files /dev/null and b/images/deploy_model_25.png differ diff --git a/images/ml_experiment.png b/images/ml_experiment.png new file mode 100644 index 0000000..e977340 Binary files /dev/null and b/images/ml_experiment.png differ diff --git a/images/serving_endpoint_01.png b/images/serving_endpoint_01.png new file mode 100644 index 0000000..8bd6d87 Binary files /dev/null and b/images/serving_endpoint_01.png differ diff --git a/images/serving_endpoint_02.png b/images/serving_endpoint_02.png new file mode 100644 index 0000000..4d5023d Binary files /dev/null and b/images/serving_endpoint_02.png differ diff --git a/images/serving_endpoint_03.png b/images/serving_endpoint_03.png new file mode 100644 index 0000000..282dd01 Binary files /dev/null and b/images/serving_endpoint_03.png differ diff --git a/images/serving_endpoint_04.png b/images/serving_endpoint_04.png new file mode 100644 index 0000000..b349290 Binary files /dev/null and b/images/serving_endpoint_04.png differ diff --git a/images/test_endpoint.png b/images/test_endpoint.png new file mode 100644 index 0000000..05eaa06 Binary files /dev/null and b/images/test_endpoint.png differ