From e97a5637d0212006774bc91dc945ea52a4c74fde Mon Sep 17 00:00:00 2001 From: Garett Sidwell <63107655+Garett601@users.noreply.github.com> Date: Wed, 30 Oct 2024 08:27:21 +0200 Subject: [PATCH 01/11] build(pyproject.toml): fix package versions --- pyproject.toml | 45 +++++++++++----------- uv.lock | 102 ++++++++++++++++++++++++------------------------- 2 files changed, 73 insertions(+), 74 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 1cc56ec..e53a588 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,32 +5,33 @@ description = "MLOps with Databricks Course Project" readme = "README.md" requires-python = ">=3.11, <3.12" dependencies = [ - "lightgbm>=4.5.0, <5", - "scikit-learn>=1.5.1, <2", - "cloudpickle>=3.0.0, <4", - "mlflow>=2.16.0, <3", - "numpy>=1.26.4, <2", - "pandas>=2.2.2, <3", - "pyarrow>=15.0.2, <16", - "cffi>=1.17.1, <2", - "scipy>=1.14.1, <2", - "matplotlib>=3.9.2, <4", - "databricks-feature-engineering>=0.6, <1", - "ucimlrepo>=0.0.7", - "loguru>=0.7.2", - "pandera>=0.20.4", + "lightgbm==4.5.0", + "scikit-learn==1.5.2", + "cloudpickle==3.1.0", + "mlflow==2.17.0", + "numpy==1.26.4", + "pandas==2.2.3", + "pyarrow==14.0.1", + "cffi==1.17.1", + "scipy==1.14.1", + "matplotlib==3.9.2", + "databricks-feature-engineering==0.6", + "loguru==0.7.2", + "pandera==0.20.4", + "pydantic==2.9.2", ] [project.optional-dependencies] dev = [ - "databricks-connect>=15.4.1, <16", - "databricks-sdk>=0.32.0, <0.33", - "ipykernel>=6.29.5, <7", - "pip>=24.2", - "pytest>=8.3.3", - "pytest-mock>=3.14.0", - "pytest-sugar>=1.0.0", - "pytest-cov>=5.0.0", + "databricks-connect==15.4.1", + "databricks-sdk==0.32.0", + "ipykernel==6.29.5", + "pip==24.2", + "pytest==8.3.3", + "pytest-mock==3.14.0", + "pytest-sugar==1.0.0", + "pytest-cov==5.0.0", + "pyspark==3.5.3", ] [tool.ruff] diff --git a/uv.lock b/uv.lock index ece0c82..49de6da 100644 --- a/uv.lock +++ b/uv.lock @@ -342,7 +342,7 @@ wheels = [ [[package]] name = "databricks-connect" -version = "15.4.2" +version = "15.4.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "databricks-sdk" }, @@ -358,12 +358,12 @@ dependencies = [ { name = "six" }, ] wheels = [ - { url = "https://files.pythonhosted.org/packages/a4/8c/fa0a5845f684efc423d3e0bfe9da4415f29b1b852c015f0614fdc36ff74a/databricks_connect-15.4.2-py2.py3-none-any.whl", hash = "sha256:6ee3c549ca188003da835e48a933a58c6ae0609f3b7e61218ee09940f2f65432", size = 2295330 }, + { url = "https://files.pythonhosted.org/packages/c3/4f/9783091810cd60515be55a00b660872f39909b16308c1a31e5503588bc32/databricks_connect-15.4.1-py2.py3-none-any.whl", hash = "sha256:1f1dda8b929046be4ac8705cfab6b350d6ac515c9d4ab55758a1fc081f087fd0", size = 2295272 }, ] [[package]] name = "databricks-feature-engineering" -version = "0.7.0" +version = "0.6.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "azure-cosmos" }, @@ -377,20 +377,20 @@ dependencies = [ { name = "sqlparse" }, ] wheels = [ - { url = "https://files.pythonhosted.org/packages/dd/62/7e4085fa5e550bec290cc1c5348ba1bd64cac58205ade06cdd4d8fb1dd43/databricks_feature_engineering-0.7.0-py3-none-any.whl", hash = "sha256:b48e19f3ce9b70de5eb1f8cb2c51d4711157906d59b4aea1258fb6346591b8e1", size = 255546 }, + { url = "https://files.pythonhosted.org/packages/ae/73/ab301d1b538014c91fc4481b7e13c374cc60dd2cb0c0ea0ebf36a3f789c6/databricks_feature_engineering-0.6.0-py3-none-any.whl", hash = "sha256:4774868e1b1d132897e06c940c19a447b4c9999fa1afb43193d372ac4716b6d1", size = 253465 }, ] [[package]] name = "databricks-sdk" -version = "0.32.3" +version = "0.32.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "google-auth" }, { name = "requests" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/1f/2c/a6fe9382dc06a3bc5ce8ba5ddda2d8236cb81f85a1f0b7746eb329bd9277/databricks_sdk-0.32.3.tar.gz", hash = "sha256:bdb8acef3d79e83f05e7fca6cd6128b2b46a4bb4de240374c70050b5f029d4f2", size = 572703 } +sdist = { url = "https://files.pythonhosted.org/packages/fa/95/c6949a497390bcfb633dedb1fa9fe81ba96afbbd90bd4668939f9e1004e5/databricks_sdk-0.32.0.tar.gz", hash = "sha256:ede9db99fe87dabf549e47b9cb0ddabb1170d20d9d88d22386e727984eff3167", size = 568533 } wheels = [ - { url = "https://files.pythonhosted.org/packages/0d/45/a740906549189c434033788446f202358a99301cb6b7884e9cac47578126/databricks_sdk-0.32.3-py3-none-any.whl", hash = "sha256:66b57cfd7731f541df014d136d5c257aa45ea4caa95d0d3fbaaed5b43142f32a", size = 555103 }, + { url = "https://files.pythonhosted.org/packages/b5/a7/0f7ce505b256c4b25bd9ce2ffc4304a77e78f933e942d80f11809e2b0a28/databricks_sdk-0.32.0-py3-none-any.whl", hash = "sha256:980b3b25647d752a0a350e249701085a2e2d870764091f877d5cd00b96015d49", size = 551988 }, ] [[package]] @@ -1278,9 +1278,9 @@ dependencies = [ { name = "pandas" }, { name = "pandera" }, { name = "pyarrow" }, + { name = "pydantic" }, { name = "scikit-learn" }, { name = "scipy" }, - { name = "ucimlrepo" }, ] [package.optional-dependencies] @@ -1289,6 +1289,7 @@ dev = [ { name = "databricks-sdk" }, { name = "ipykernel" }, { name = "pip" }, + { name = "pyspark" }, { name = "pytest" }, { name = "pytest-cov" }, { name = "pytest-mock" }, @@ -1297,28 +1298,29 @@ dev = [ [package.metadata] requires-dist = [ - { name = "cffi", specifier = ">=1.17.1,<2" }, - { name = "cloudpickle", specifier = ">=3.0.0,<4" }, - { name = "databricks-connect", marker = "extra == 'dev'", specifier = ">=15.4.1,<16" }, - { name = "databricks-feature-engineering", specifier = ">=0.6,<1" }, - { name = "databricks-sdk", marker = "extra == 'dev'", specifier = ">=0.32.0,<0.33" }, - { name = "ipykernel", marker = "extra == 'dev'", specifier = ">=6.29.5,<7" }, - { name = "lightgbm", specifier = ">=4.5.0,<5" }, - { name = "loguru", specifier = ">=0.7.2" }, - { name = "matplotlib", specifier = ">=3.9.2,<4" }, - { name = "mlflow", specifier = ">=2.16.0,<3" }, - { name = "numpy", specifier = ">=1.26.4,<2" }, - { name = "pandas", specifier = ">=2.2.2,<3" }, - { name = "pandera", specifier = ">=0.20.4" }, - { name = "pip", marker = "extra == 'dev'", specifier = ">=24.2" }, - { name = "pyarrow", specifier = ">=15.0.2,<16" }, - { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.3.3" }, - { name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=5.0.0" }, - { name = "pytest-mock", marker = "extra == 'dev'", specifier = ">=3.14.0" }, - { name = "pytest-sugar", marker = "extra == 'dev'", specifier = ">=1.0.0" }, - { name = "scikit-learn", specifier = ">=1.5.1,<2" }, - { name = "scipy", specifier = ">=1.14.1,<2" }, - { name = "ucimlrepo", specifier = ">=0.0.7" }, + { name = "cffi", specifier = "==1.17.1" }, + { name = "cloudpickle", specifier = "==3.1.0" }, + { name = "databricks-connect", marker = "extra == 'dev'", specifier = "==15.4.1" }, + { name = "databricks-feature-engineering", specifier = "==0.6" }, + { name = "databricks-sdk", marker = "extra == 'dev'", specifier = "==0.32.0" }, + { name = "ipykernel", marker = "extra == 'dev'", specifier = "==6.29.5" }, + { name = "lightgbm", specifier = "==4.5.0" }, + { name = "loguru", specifier = "==0.7.2" }, + { name = "matplotlib", specifier = "==3.9.2" }, + { name = "mlflow", specifier = "==2.17.0" }, + { name = "numpy", specifier = "==1.26.4" }, + { name = "pandas", specifier = "==2.2.3" }, + { name = "pandera", specifier = "==0.20.4" }, + { name = "pip", marker = "extra == 'dev'", specifier = "==24.2" }, + { name = "pyarrow", specifier = "==14.0.1" }, + { name = "pydantic", specifier = "==2.9.2" }, + { name = "pyspark", marker = "extra == 'dev'", specifier = "==3.5.3" }, + { name = "pytest", marker = "extra == 'dev'", specifier = "==8.3.3" }, + { name = "pytest-cov", marker = "extra == 'dev'", specifier = "==5.0.0" }, + { name = "pytest-mock", marker = "extra == 'dev'", specifier = "==3.14.0" }, + { name = "pytest-sugar", marker = "extra == 'dev'", specifier = "==1.0.0" }, + { name = "scikit-learn", specifier = "==1.5.2" }, + { name = "scipy", specifier = "==1.14.1" }, ] [[package]] @@ -1403,20 +1405,20 @@ wheels = [ [[package]] name = "pyarrow" -version = "15.0.2" +version = "14.0.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "numpy" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/35/a1/b7c9bacfd17a9d1d8d025db2fc39112e0b1a629ea401880e4e97632dbc4c/pyarrow-15.0.2.tar.gz", hash = "sha256:9c9bc803cb3b7bfacc1e96ffbfd923601065d9d3f911179d81e72d99fd74a3d9", size = 1064226 } +sdist = { url = "https://files.pythonhosted.org/packages/e0/c3/48602ef0a293af9297c0c65cdef8a2339256e485c54a4ff375d3e95d3415/pyarrow-14.0.1.tar.gz", hash = "sha256:b8b3f4fe8d4ec15e1ef9b599b94683c5216adaed78d5cb4c606180546d1e2ee1", size = 1062511 } wheels = [ - { url = "https://files.pythonhosted.org/packages/34/50/93f6104e79bec6e1af4356f5164695a0b6338f230e1273706ec9eb836bea/pyarrow-15.0.2-cp311-cp311-macosx_10_15_x86_64.whl", hash = "sha256:5f8bc839ea36b1f99984c78e06e7a06054693dc2af8920f6fb416b5bca9944e4", size = 27187122 }, - { url = "https://files.pythonhosted.org/packages/47/cb/be17c4879e60e683761be281d955923d586a572fbc2503e08f08ca713349/pyarrow-15.0.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:f5e81dfb4e519baa6b4c80410421528c214427e77ca0ea9461eb4097c328fa33", size = 24217346 }, - { url = "https://files.pythonhosted.org/packages/ac/f6/57d67d7729643ebc80f0df18420b9fc1857ca418d1b2bb3bc5be2fd2119e/pyarrow-15.0.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3a4f240852b302a7af4646c8bfe9950c4691a419847001178662a98915fd7ee7", size = 36151795 }, - { url = "https://files.pythonhosted.org/packages/ff/42/df219f3a1e06c2dd63599243384d6ba2a02a44a976801fbc9601264ff562/pyarrow-15.0.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4e7d9cfb5a1e648e172428c7a42b744610956f3b70f524aa3a6c02a448ba853e", size = 38398065 }, - { url = "https://files.pythonhosted.org/packages/4a/37/a32de321c7270df01b709f554903acf4edaaef373310ff116302224348a9/pyarrow-15.0.2-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:2d4f905209de70c0eb5b2de6763104d5a9a37430f137678edfb9a675bac9cd98", size = 35672270 }, - { url = "https://files.pythonhosted.org/packages/61/94/0b28417737ea56a4819603c0024c8b24365f85154bb938785352e09bea55/pyarrow-15.0.2-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:90adb99e8ce5f36fbecbbc422e7dcbcbed07d985eed6062e459e23f9e71fd197", size = 38346410 }, - { url = "https://files.pythonhosted.org/packages/96/2f/0092154f3e1ebbc814de1f8a9075543d77a7ecc691fbad407df174799abe/pyarrow-15.0.2-cp311-cp311-win_amd64.whl", hash = "sha256:b116e7fd7889294cbd24eb90cd9bdd3850be3738d61297855a71ac3b8124ee38", size = 24799922 }, + { url = "https://files.pythonhosted.org/packages/1d/a6/b333f35d513dd16294d5fa1535ddb26ec5877f800f3c71c903cc8c7c2656/pyarrow-14.0.1-cp311-cp311-macosx_10_14_x86_64.whl", hash = "sha256:c7331b4ed3401b7ee56f22c980608cf273f0380f77d0f73dd3c185f78f5a6220", size = 26892386 }, + { url = "https://files.pythonhosted.org/packages/58/4e/bd9bf0aaead74ba46996cf11a608894e1867e8e5f850fd7679018a117c60/pyarrow-14.0.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:922e8b49b88da8633d6cac0e1b5a690311b6758d6f5d7c2be71acb0f1e14cd61", size = 23986729 }, + { url = "https://files.pythonhosted.org/packages/39/50/f7b0a7142a8f5cf627dda896451f8dea2ecf4e08f452e4b688df0aa1ece4/pyarrow-14.0.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:58c889851ca33f992ea916b48b8540735055201b177cb0dcf0596a495a667b00", size = 35940020 }, + { url = "https://files.pythonhosted.org/packages/02/35/132fcd8439b295e11094a27a9a9ef3fbc907db4f58388bd346446e82e316/pyarrow-14.0.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:30d8494870d9916bb53b2a4384948491444741cb9a38253c590e21f836b01222", size = 38069780 }, + { url = "https://files.pythonhosted.org/packages/0a/98/a75075869ff88b409df2e38bcfc27933f5cf24e84fb3a84d311410d112d3/pyarrow-14.0.1-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:be28e1a07f20391bb0b15ea03dcac3aade29fc773c5eb4bee2838e9b2cdde0cb", size = 35421474 }, + { url = "https://files.pythonhosted.org/packages/fe/2b/72ca700c2ecc82a05a8e2742a04853f9ebf0feab06aa4d61f37a4d5bb279/pyarrow-14.0.1-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:981670b4ce0110d8dcb3246410a4aabf5714db5d8ea63b15686bce1c914b1f83", size = 37993198 }, + { url = "https://files.pythonhosted.org/packages/d4/f0/607f50ec87ac4775d6124855ae6be2c48bab58aa0a660ccd46e9af52bcd9/pyarrow-14.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:4756a2b373a28f6166c42711240643fb8bd6322467e9aacabd26b488fa41ec23", size = 24564125 }, ] [[package]] @@ -1504,6 +1506,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/be/ec/2eb3cd785efd67806c46c13a17339708ddc346cbb684eade7a6e6f79536a/pyparsing-3.2.0-py3-none-any.whl", hash = "sha256:93d9577b88da0bbea8cc8334ee8b918ed014968fd2ec383e868fb8afb1ccef84", size = 106921 }, ] +[[package]] +name = "pyspark" +version = "3.5.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "py4j" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/9a/90/cb80c8cf194958ab9a3242851c62fa5aef1a0b42f2d9642f1e2eca098005/pyspark-3.5.3.tar.gz", hash = "sha256:68b7cc0c0c570a7d8644f49f40d2da8709b01d30c9126cc8cf93b4f84f3d9747", size = 317304325 } + [[package]] name = "pytest" version = "8.3.3" @@ -1874,19 +1885,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a6/ab/7e5f53c3b9d14972843a647d8d7a853969a58aecc7559cb3267302c94774/tzdata-2024.2-py2.py3-none-any.whl", hash = "sha256:a48093786cdcde33cad18c2555e8532f34422074448fbc874186f0abd79565cd", size = 346586 }, ] -[[package]] -name = "ucimlrepo" -version = "0.0.7" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "certifi" }, - { name = "pandas" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/87/7c/f5a400cc99a5365d153609ebf803084f78b4638b0f7925aa31d9abb62b8e/ucimlrepo-0.0.7.tar.gz", hash = "sha256:4cff3f9e814367dd60956da999ace473197237b9fce4c07e9a689e77b4ffb59a", size = 9369 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/3b/07/1252560194df2b4fad1cb3c46081b948331c63eb1bb0b97620d508d12a53/ucimlrepo-0.0.7-py3-none-any.whl", hash = "sha256:0a5ce7e21d7ec850a0da4427c47f9dd96fcc6532f1c7e95dcec63eeb40f08026", size = 8041 }, -] - [[package]] name = "urllib3" version = "2.2.3" From 8754b8d5a7987847a9c0b7297dc642f60056c697 Mon Sep 17 00:00:00 2001 From: Garett Sidwell <63107655+Garett601@users.noreply.github.com> Date: Thu, 31 Oct 2024 07:19:01 +0200 Subject: [PATCH 02/11] chore: fix package versions and grouping --- main.py | 49 ------------------------------------------------- pyproject.toml | 21 ++++++++++++--------- uv.lock | 29 +++++++++-------------------- 3 files changed, 21 insertions(+), 78 deletions(-) delete mode 100644 main.py diff --git a/main.py b/main.py deleted file mode 100644 index 8d796ca..0000000 --- a/main.py +++ /dev/null @@ -1,49 +0,0 @@ -"""Main script for the power consumption project.""" - -from loguru import logger - -from power_consumption.config import Config -from power_consumption.model import ConsumptionModel -from power_consumption.preprocessing import DataProcessor -from power_consumption.utils import plot_actual_vs_predicted, plot_feature_importance, visualise_results - -# Load configuration -config = Config.from_yaml("./configs/project_configs.yml") - -# Initialise data processor and preprocess data -processor = DataProcessor(config=config) -processor.preprocess_data() - -# Split data into train and test sets -X_train, X_test, y_train, y_test = processor.split_data() - -# Transform features -X_train_transformed, X_test_transformed = processor.fit_transform_features(X_train, X_test) - -# Initialise and train the model -model = ConsumptionModel(processor.preprocessor, config) -model.train(X_train, y_train) - -# Make predictions -y_pred = model.predict(X_test) - -# Evaluate the model -mse, r2 = model.evaluate(X_test, y_test) - -# Log Mean Squared Error for each target -logger.info("Mean Squared Error for each target:") -for i, target in enumerate(config.target.target): - logger.info(f"{target}: {mse[i]:.4f}") - -# Log R-squared for each target -logger.info("\nR-squared for each target:") -for i, target in enumerate(config.target.target): - logger.info(f"{target}: {r2[i]:.4f}") - -# Visualise results -visualise_results(y_test, y_pred, config.target.target) -plot_actual_vs_predicted(y_test.values, y_pred, config.target.target) - -# Get and plot feature importance -feature_importance, feature_names = model.get_feature_importance() -plot_feature_importance(feature_importance, feature_names) diff --git a/pyproject.toml b/pyproject.toml index e53a588..9cb1c76 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,19 +19,18 @@ dependencies = [ "loguru==0.7.2", "pandera==0.20.4", "pydantic==2.9.2", + "databricks-sdk==0.32.0", ] [project.optional-dependencies] dev = [ - "databricks-connect==15.4.1", - "databricks-sdk==0.32.0", - "ipykernel==6.29.5", - "pip==24.2", - "pytest==8.3.3", - "pytest-mock==3.14.0", - "pytest-sugar==1.0.0", - "pytest-cov==5.0.0", - "pyspark==3.5.3", + "databricks-connect>=15.4.1", + "ipykernel>=6.29.5, <7", + "pip>=24.2", + "pytest>=8.3.3", + "pytest-mock>=3.14.0", + "pytest-sugar>=1.0.0", + "pytest-cov>=5.0.0", ] [tool.ruff] @@ -61,3 +60,7 @@ packages = ["power_consumption"] [tool.setuptools.package-data] power_consumption = ["**/*.py"] + +[build-system] +requires = ["setuptools>=72.0"] +build-backend = "setuptools.build_meta" diff --git a/uv.lock b/uv.lock index 49de6da..24f72e5 100644 --- a/uv.lock +++ b/uv.lock @@ -1270,6 +1270,7 @@ dependencies = [ { name = "cffi" }, { name = "cloudpickle" }, { name = "databricks-feature-engineering" }, + { name = "databricks-sdk" }, { name = "lightgbm" }, { name = "loguru" }, { name = "matplotlib" }, @@ -1286,10 +1287,8 @@ dependencies = [ [package.optional-dependencies] dev = [ { name = "databricks-connect" }, - { name = "databricks-sdk" }, { name = "ipykernel" }, { name = "pip" }, - { name = "pyspark" }, { name = "pytest" }, { name = "pytest-cov" }, { name = "pytest-mock" }, @@ -1300,10 +1299,10 @@ dev = [ requires-dist = [ { name = "cffi", specifier = "==1.17.1" }, { name = "cloudpickle", specifier = "==3.1.0" }, - { name = "databricks-connect", marker = "extra == 'dev'", specifier = "==15.4.1" }, + { name = "databricks-connect", marker = "extra == 'dev'", specifier = ">=15.4.1" }, { name = "databricks-feature-engineering", specifier = "==0.6" }, - { name = "databricks-sdk", marker = "extra == 'dev'", specifier = "==0.32.0" }, - { name = "ipykernel", marker = "extra == 'dev'", specifier = "==6.29.5" }, + { name = "databricks-sdk", specifier = "==0.32.0" }, + { name = "ipykernel", marker = "extra == 'dev'", specifier = ">=6.29.5,<7" }, { name = "lightgbm", specifier = "==4.5.0" }, { name = "loguru", specifier = "==0.7.2" }, { name = "matplotlib", specifier = "==3.9.2" }, @@ -1311,14 +1310,13 @@ requires-dist = [ { name = "numpy", specifier = "==1.26.4" }, { name = "pandas", specifier = "==2.2.3" }, { name = "pandera", specifier = "==0.20.4" }, - { name = "pip", marker = "extra == 'dev'", specifier = "==24.2" }, + { name = "pip", marker = "extra == 'dev'", specifier = ">=24.2" }, { name = "pyarrow", specifier = "==14.0.1" }, { name = "pydantic", specifier = "==2.9.2" }, - { name = "pyspark", marker = "extra == 'dev'", specifier = "==3.5.3" }, - { name = "pytest", marker = "extra == 'dev'", specifier = "==8.3.3" }, - { name = "pytest-cov", marker = "extra == 'dev'", specifier = "==5.0.0" }, - { name = "pytest-mock", marker = "extra == 'dev'", specifier = "==3.14.0" }, - { name = "pytest-sugar", marker = "extra == 'dev'", specifier = "==1.0.0" }, + { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.3.3" }, + { name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=5.0.0" }, + { name = "pytest-mock", marker = "extra == 'dev'", specifier = ">=3.14.0" }, + { name = "pytest-sugar", marker = "extra == 'dev'", specifier = ">=1.0.0" }, { name = "scikit-learn", specifier = "==1.5.2" }, { name = "scipy", specifier = "==1.14.1" }, ] @@ -1506,15 +1504,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/be/ec/2eb3cd785efd67806c46c13a17339708ddc346cbb684eade7a6e6f79536a/pyparsing-3.2.0-py3-none-any.whl", hash = "sha256:93d9577b88da0bbea8cc8334ee8b918ed014968fd2ec383e868fb8afb1ccef84", size = 106921 }, ] -[[package]] -name = "pyspark" -version = "3.5.3" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "py4j" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/9a/90/cb80c8cf194958ab9a3242851c62fa5aef1a0b42f2d9642f1e2eca098005/pyspark-3.5.3.tar.gz", hash = "sha256:68b7cc0c0c570a7d8644f49f40d2da8709b01d30c9126cc8cf93b4f84f3d9747", size = 317304325 } - [[package]] name = "pytest" version = "8.3.3" From 3d3368c629597915f3e72536682870cd1208b4f7 Mon Sep 17 00:00:00 2001 From: Garett Sidwell <63107655+Garett601@users.noreply.github.com> Date: Thu, 31 Oct 2024 09:31:32 +0200 Subject: [PATCH 03/11] build(*.whl): update power_consumption wheel package based on new build and dependancies --- .../power_consumption-0.0.1-py3-none-any.whl | Bin 9284 -> 9914 bytes notebooks/week3/01_feature_serving.py | 8 ++++ pyproject.toml | 2 + uv.lock | 35 +++++++++++++++++- 4 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 notebooks/week3/01_feature_serving.py diff --git a/notebooks/power_consumption-0.0.1-py3-none-any.whl b/notebooks/power_consumption-0.0.1-py3-none-any.whl index 1c313c2c70c2c595f87545e22cf86e2b0833bca7..387c6c3b5fbe1dfa68231b0ba461faf3c0ac564b 100644 GIT binary patch delta 3717 zcmZ8kbyQUU_8q!or~xE~7`hdZ_@a_hI;7IlA)P8QAPgWOF?4s&0MaR-h;&N`3^+<7 z-NO8kx7Pc;_wHKfv+q6U?0fDX_pG(|=f|zu6i`h(d^!LCKn%!|vq+$j#k+x8jbAvx z8#LNnT82d80RX$C001)rNJfw7029}l;U?nUmU*+k=gHh~)oz08wj4)Dh`2-eso1jaVb(>?n_~)E6VmO_9W_Q3`=kmX>RK|jJ*8N3 zde=PMGF@p~;zb7VrAg%j_sAPxCu*fS;vnwgFg&lT0yW4uwUZ zIcFNXCwp$eiS{=reTHNATQBX|rd_s5^fnAI%dDIWm6YLol#*+xul0jJKVKeyLTx$H zA{_*p1!E(_GSneq2~LFN<^i>MEjh_0NA;;~%>71|&Ae1^{JWl)n#!cVy5H%Xpbat~ z-gp7p?@4)L2etszv2L<8NgzSCK$sOi4)-tJ66)sHp5aQ0LLc++CG+BYwvTIIYEQ0{@RmgfMlgJ${Yf{PrmyD`e>#QaJgKCPb{>}aOeyms-SAbifTdn% zV-?I}ZcF8AcR$r^QT_{c^F;ZJv91NvDJv~wyS0VLByD_$0&VVmxw3H3H8lLl4d0Sz zGtxfgQR_F$GHY=K0!Nc^BA88)-J90HH}3>i`E5vW%V(EkhWpP0jq_IR_=hr(%W~IZ zf-H1m@99Rl`2rH((Ymb~r)sUL@OVBSEg~bvAT8JH6|fvy;r}?<4XRTz$i7t4v4Xz9 z-KcCv|Zd-2K5n1pC$~zJ4F7Q@N_v8 zsL5}*EgAzNAbqwxIW3$_CZq#SCFKx6C_k4Iu5;dTS*oA<)*8x%QoOs|GqW98Ad+)` z#bY8E-KUP1CP(YOz>-^|zVQ{juMd3y#$OrK7jVoM7-Vc>aMTW-=ZRA+Xhhh%zefPq z{PrNZ#Gy4h%n!n#=P+X8h9p&g?QD}C-ET}VMZzTLYz2Xn=JqfK@6n3Q6M3(9e8IyX z#N3!-=hD1iA9w40=kv#M*gf+Kxy~CD7?R~bq~muq-&&x($K$LXz4NcZCh>- zZz?+wYCLpp*~S%*0Mdla0JdErzmk3ASZ(wXGNkz~EnAaC956cvBRzHTke8|1Bd_Ee zy%KD*J^NGQt#p9$Tjzc6;ePM-a@%OjTc;RPpelc+EM9JLg&G=!O>_Io_GJ?9a-l1P zJSO|SO?jg@!y-CFp}??NWCP?1YmiAG-Kx*ew&XY_o8xEUcQ`$}+rBw+Sshq)xwaTo zEw`SZQkcS%3%lnc1`V+te?4s8o=0om{?N`kDIX0EA!DC)U}}^QXc11OM%I>JdM(>2 z`BEFA5H-Q9G3EC6Y9#(RKiWbYG=FL9GNggwE@<@NY2}jG%tF;oasdineaR6DT~|x9 znB|qzhwa#hkc46_M>UnSf=a$8*?8dj1|Lp)$@}&4FkR3wAR?@L&g4Crv(+k1t#jRs z4hI;H{yJ}XT#&^(ubP!<6Xe0bk5tCcrC+j`xE*fFObH6XBdHIV9e_&!e6FeiJ6s&w zve{R>lYRMA=Q+W{QV`(`X1Z{K;?-hlu+@W5x}`Al)F@nxewhu0ebtDD#hjpQQGNZc zff_pu=QLTrG_20Q$2a;3b?N|sNCE4?)httQ1|v^cLFK-tf(&vJYr>m4M)zAubc1q zVi^+=tua>>XJ5)KNnVOR8{HahVxqaH%q?~VJ4Nbjb@oO&wE0WIGt3USnUtPXVHpi~xK6@hOGQd~Z zxXvswfSMiCE)M7~v*>DYRa7FpyW025sI-@kr5PD7oJ!Fz9i7)bno4}wE8SaWdfhA! zTb$O>9v3^7ni|93M+$iN$i{T`1%6Aa+1H12!51sXQ3f9>e+1o~ZeR2NiEZ21T9fe4 z+!}n>j%Ln<=JO?Z%*NRL7@a>X&O*QZ(Ny%Rlo{87aK>`S`B`AcFafE{=PBDFjSq=Y z{nZ%-4UW1Ud&McUpJf=-RfEL@v0PTZu;_QPr+2LdD29H_;#-oazpy6h{vz(|F=gZnqbM+=lG_r<)+QTXp zlXgXR2O?ZC{O6HPhcNs2Qg3PDn-ycE;r`oEyYxsP*43wVXXF6hrD8qjb6vyN4YM2{ z0m<~$x^-BX8WJLHZQaM6#vT|%Uk@LG2r`8)T3Ji!IdPI42za@dEW|V391)L9RO^?g zb_tW-!9(t)iQM~B#n-%V!wysoU8PQl3Qwg(&y$|a&S>#ubY7?1v3<}R5AADeHP+#C zt9IKYxo&4(kcaeG`}GGsIP&j_gt-buq;Bv1UcKJg`SfW=bXRpode_wrMfG&nk`egBjFK*8G!2)~_OtIZVB>f=tM3`d^t> zv+qt#{?63#5wDBb?`9vid&^>Y4C%7yP-K{C=OcvRo-ItLlA6*31iK@HU<*l$bTg7) z?JrZZ{ZaEyMwe}nbs#MggmeAPEZoY5Eaw#>UMa}HXrlEy*ru{!RZwkex1?BOQN?ex?3>Q)qxb<#$D=WQt&b5;J#P>`rC3=G3 zTPS^f!A1GfOtNGGC?nj?S2c zp&r1(L2XX5AK=vd9$1a1RM(CXS8>`DeWP)?IS-Ej4rn$J_idl&1BGlg>4tG)4kRZ@b4wSali6_snIgUUc< z*{iaFcK*PVXXNxVL4TU1+jHMO-RdA!dJ^O`66W$Lr3JedG6j~Nn(mw4o2a>9|9@5b zR}A8iACL>0-iSq$o8U!QvO*AHEOZY1`S<}LW5}e5A(4Q@;DonAR4svM6TiFx&=Te3 z(CNJiTzIP3=mPZ>dENY()PsqnFCj~I9HTtdzsveWyM%n1{O5vwFjy)NS+o$Bei@G>6Ifx!q|Kwc1u z_oE~_KSY&>sn>!X5k+6gR^G1d+w{g`?vAC=m=S=cm1K5GooolGE+wm$qNyO|U4QPlCBu0*? zbl^E*@Qacm#?5##c1n#^}y*&5xc}fuxFnljnjwuwaS9jxD2q}l{mpzGwW`e=Q=6T{PWOCX^q1; zxjr_M|DQV^@l#0H`JoE*;*#&6&~7zxZ$4#QOIEfFJ`)-dZdru<9+wfK_gB3ViFq z5D<=kp)ZHwZGM!);ugg@p|=>uY52c(Yn@XAe7ki2xpTME{&f_Ew6h^_Sh*3tTtK{= xMnn|X9b7Z^o0K?WmW%wBd%GYv4lf=b0O0!%Zhc7S-$rf(A2$P`1?S(&{{f!b*sK5m delta 3034 zcmZ8jXH*l)5)KK1lwd$Ws+5E-y-NUTp@t?%4Ty9?F98t<9qB?S5~N+4G^Gnjm0koX zDovUckzQ1Y;CsNi@4k25oU{AQnVD~Qe(aq6T97>&V0~>6F)aW9xDKG@H%kHE1zkXm z)*2D@1)6BO7Y~(<;!01)GrqFGKEVfA;B?|=w2RB z=So6DeCgoA$gx)e5JfBL74M=P0KD8TjO&Uw4ZE2lM|o96dJ|7XMR##AXH_x5o)iEW zqQFk60IV^jhXIaLD1ont=M}5^Rm`gatYzvsrIpP?5>LSbGr@GN>h_ zS2upLdP--77^K+rt_aUMo^=v#(DQi9yf#JlOnI}4+vXXzJ*sR-61W*=lO9ifO z!6xP$s9`Vz`V^?^Zw$h)>XbRv8Hou(rew-@*9SLz8brkcF)ZNl_M=C>m=S$4>&(7P zm&Vwi{k8(VQdygplw%;bb~NPpd!amEnt{j3)| zjZ?^g z#ZNJ(nH)qpr^Yy^!WM&LVmZBEdmE%q_}Cf@!Cx2>kVD2Z2C>_PMy!m9>QNc5AUTEO zy>>D>>WWL!sE*e6?4EAqnjh|k@JT-^Vred}uFcOPM()y>eG@c-#yw2aFSk%S32TQG zLp3~nO>gVN*4bJ42~Y7-cIz?NPs|b}yzIjeUgTGCbMO6M&SCh>s(RKcYPpaU+--7= zd`+!uanI8aVNEkx(MHCZwc)>tf>asw1Z}FLsX|5MFz5lvI`TWNXgvdWQBv5{ucJ_h z2j7BXRYOezR$loJG0&+LhA*^3Jwm}UoF0fbAMoPG&7CR$o*m$F-#!EzkzSRGCftf` zL2PNazHMK|gz+$~BA{x>T#*)2T(@y4`JpqV z|1FxZIUXJfG+Yj*_!(}%pwuH~VlMlVUz=vMppsGlO;w@k%eTGa3NiMpr{G6r4ilVi zmVJA+<;o;Q?&*?B_r0;*rsc@qylN1bcJ~-~Lo(3scW%Lc`J>!oW%|x_-d83oRrs1$ z`3i{djYA4SBi{n~MA>vEOJ`P*4OQqf>qpl(%!_k2^yylHH-mS$BZvY}+$=4vBlF%L z>Ot)5H8h6%=ArW6iose!cA`iXvR9N>w@05Fq$7+c{Anjf$E&wTyKUAF#N1Vt4gy35S z!$~$4+fo5&gn<08&oxB)R|~rYL`HOhSY#aSk-urjYY{`)PYm!ElhvooizX7 zy~5;HK9dpQ61?yV!Dj1qNa?jmyI9AhSNUafbGPa7bEtvJ z5YS|Kc(OD{Sg%J_y2<4J_KdDkjkn(k!7msWiPbwPr?o;?w&-QIfb~vzcBhb8J%-En z%=TF6x?C}|ky(&3Ysle~!&aU9f22N*RBPkwR|_`Y*wv%jO0z8yKIa$c{iua^*Lzo&+96pZnJrFleVnqNeOapdf>enk@6MTe39r#lW+?S zb}y^lE6w+pi*L?ovT^I2a0BF2{t&yOIMq8pHGjXj zwW6X0-^>I=cf|)7?PAW?AH>tP`#&Dy&B+8y-VD^+eEXX1Be6x4$_L*#C-;i%%ct!e&Kw`T1MM-(N@J#@QsX+Bp0@ zTEwp-iw{I=%>#d_troC{hc|61J5{HDz6V-)JqDu_cmOW5!+oiGE3=`{UcPMrF06H$ zZRKpbv%t}FjT@p{5#}Paoba`y=AGe;v5A32(=zeykNG^$=!98TZ+BV8tn}r)sncFJ z6VkYligtFzE#gw)PSwyQe$7w)Sk8I0{(NAAxdp}Cd#|#eD=o4`3w&S|#zn2T0=1CH z8JZW8!WySjKln{wIehS{pO1{{NW7VpA!$I8r6IoZa;BEMP_ls{;`QX!yJ3p)x4hx%LD)tqlCcqa zulI6<5eF^?PTR@S-WIvM^=0K_7azL#sURsfTNBCh?_$cxh(H zF)O|N(Q~|F|1_yg*jo1dWgO{`q1W6!?x-IQHMGNC(7ltt_9fM3;HOxNv4kd-iEvg- z#hot`ACiL$%m;ELIUi;|I1k`F0JdAkNj$& zs=4Tasw_Aq;7yzeH#KgXiyCy1OXX$;uKg#<&;HLyT(tI#7y#hCZ1Zoc4@0p2L(k== KC)s4X%={Z>vvt$} diff --git a/notebooks/week3/01_feature_serving.py b/notebooks/week3/01_feature_serving.py new file mode 100644 index 0000000..f1a1605 --- /dev/null +++ b/notebooks/week3/01_feature_serving.py @@ -0,0 +1,8 @@ +# Databricks notebook source +# MAGIC %pip install ../power_consumption-0.0.1-py3-none-any.whl + +# COMMAND ---------- + +dbutils.library.restartPython() + +# COMMAND ---------- diff --git a/pyproject.toml b/pyproject.toml index 9cb1c76..b55074c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,8 @@ dependencies = [ "pandera==0.20.4", "pydantic==2.9.2", "databricks-sdk==0.32.0", + "psutil==6.0.0", + "databricks-feature-lookup==1.2.0", ] [project.optional-dependencies] diff --git a/uv.lock b/uv.lock index 24f72e5..1f112a1 100644 --- a/uv.lock +++ b/uv.lock @@ -380,6 +380,26 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ae/73/ab301d1b538014c91fc4481b7e13c374cc60dd2cb0c0ea0ebf36a3f789c6/databricks_feature_engineering-0.6.0-py3-none-any.whl", hash = "sha256:4774868e1b1d132897e06c940c19a447b4c9999fa1afb43193d372ac4716b6d1", size = 253465 }, ] +[[package]] +name = "databricks-feature-lookup" +version = "1.2.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "azure-cosmos" }, + { name = "boto3" }, + { name = "mlflow" }, + { name = "numpy" }, + { name = "pyarrow" }, + { name = "pymysql" }, + { name = "pyyaml" }, + { name = "requests" }, + { name = "sqlalchemy" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/67/0c/ed94bce5fd98224a238bfd6bf7fab6a4dfc529c58bb8287f8d1c7612d3a3/databricks-feature-lookup-1.2.0.tar.gz", hash = "sha256:34213e9889367de80fecdd1d5ccf43791fc330ec4d2bfafd710dbb262157752c", size = 72520 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/20/16/d5464c1d6a11896d956dd9b9854c7072ca87333a8d788e3bd176b062bcf5/databricks_feature_lookup-1.2.0-py3-none-any.whl", hash = "sha256:d3f9336192ea6f293d587caf3a0805c29081754319fea43f22d6951c3a4af10d", size = 98113 }, +] + [[package]] name = "databricks-sdk" version = "0.32.0" @@ -1265,11 +1285,12 @@ wheels = [ [[package]] name = "power-consumption" version = "0.0.1" -source = { virtual = "." } +source = { editable = "." } dependencies = [ { name = "cffi" }, { name = "cloudpickle" }, { name = "databricks-feature-engineering" }, + { name = "databricks-feature-lookup" }, { name = "databricks-sdk" }, { name = "lightgbm" }, { name = "loguru" }, @@ -1278,6 +1299,7 @@ dependencies = [ { name = "numpy" }, { name = "pandas" }, { name = "pandera" }, + { name = "psutil" }, { name = "pyarrow" }, { name = "pydantic" }, { name = "scikit-learn" }, @@ -1301,6 +1323,7 @@ requires-dist = [ { name = "cloudpickle", specifier = "==3.1.0" }, { name = "databricks-connect", marker = "extra == 'dev'", specifier = ">=15.4.1" }, { name = "databricks-feature-engineering", specifier = "==0.6" }, + { name = "databricks-feature-lookup", specifier = "==1.2.0" }, { name = "databricks-sdk", specifier = "==0.32.0" }, { name = "ipykernel", marker = "extra == 'dev'", specifier = ">=6.29.5,<7" }, { name = "lightgbm", specifier = "==4.5.0" }, @@ -1311,6 +1334,7 @@ requires-dist = [ { name = "pandas", specifier = "==2.2.3" }, { name = "pandera", specifier = "==0.20.4" }, { name = "pip", marker = "extra == 'dev'", specifier = ">=24.2" }, + { name = "psutil", specifier = "==6.0.0" }, { name = "pyarrow", specifier = "==14.0.1" }, { name = "pydantic", specifier = "==2.9.2" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.3.3" }, @@ -1495,6 +1519,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f7/3f/01c8b82017c199075f8f788d0d906b9ffbbc5a47dc9918a945e13d5a2bda/pygments-2.18.0-py3-none-any.whl", hash = "sha256:b8e6aca0523f3ab76fee51799c488e38782ac06eafcf95e7ba832985c8e7b13a", size = 1205513 }, ] +[[package]] +name = "pymysql" +version = "1.1.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/b3/8f/ce59b5e5ed4ce8512f879ff1fa5ab699d211ae2495f1adaa5fbba2a1eada/pymysql-1.1.1.tar.gz", hash = "sha256:e127611aaf2b417403c60bf4dc570124aeb4a57f5f37b8e95ae399a42f904cd0", size = 47678 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/0c/94/e4181a1f6286f545507528c78016e00065ea913276888db2262507693ce5/PyMySQL-1.1.1-py3-none-any.whl", hash = "sha256:4de15da4c61dc132f4fb9ab763063e693d521a80fd0e87943b9a453dd4c19d6c", size = 44972 }, +] + [[package]] name = "pyparsing" version = "3.2.0" From f50f0e63ccd7429a5c44f5162010045f659f63e3 Mon Sep 17 00:00:00 2001 From: Garett Sidwell <63107655+Garett601@users.noreply.github.com> Date: Fri, 1 Nov 2024 15:16:53 +0200 Subject: [PATCH 04/11] feat(01_prepare_dataset.py): Add ID column based on UNIX time Ran into an error with feature serving where a DateTime object is not json serializable - needed to find another method to get the DateTime as PK. --- .../power_consumption-0.0.1-py3-none-any.whl | Bin 9914 -> 9914 bytes notebooks/week2/01_prepare_dataset.py | 7 + notebooks/week3/01_feature_serving.py | 253 ++++++++++++++++++ 3 files changed, 260 insertions(+) diff --git a/notebooks/power_consumption-0.0.1-py3-none-any.whl b/notebooks/power_consumption-0.0.1-py3-none-any.whl index 387c6c3b5fbe1dfa68231b0ba461faf3c0ac564b..d035541f9f8e8da25f8b661271a1b587ff687a32 100644 GIT binary patch delta 201 zcmdnxyUUj+z?+#xgn@y9gMr;)B9Ao_yTeA$XN*85uO!$CY3l>`vAOeGLR{Zs)92C0UDc&k){LDYZMXb=^n76$;n1T-Q5 delta 201 zcmdnxyUUj+z?+#xgn@y9gJF*KL>_CVIo2CJpD_Zd&4x_-n1S@AayW?RqLKijmZ=1SsGlm~Ak%_W!$8z3)nE|yUo{#;#i+#r E05T;)$^ZZW diff --git a/notebooks/week2/01_prepare_dataset.py b/notebooks/week2/01_prepare_dataset.py index 5a9124a..3dff183 100644 --- a/notebooks/week2/01_prepare_dataset.py +++ b/notebooks/week2/01_prepare_dataset.py @@ -27,6 +27,13 @@ # COMMAND ---------- train_set.reset_index(inplace=True) test_set.reset_index(inplace=True) +# Convert DateTime to Unix timestamp (milliseconds) and create ID column +train_set['id'] = train_set['DateTime'].astype('int64') // 10**9 +test_set['id'] = test_set['DateTime'].astype('int64') // 10**9 + +# Convert to string +train_set['id'] = train_set['id'].astype(str) +test_set['id'] = test_set['id'].astype(str) # COMMAND ---------- data_processor.save_to_catalog(train_set=train_set, test_set=test_set, spark=spark) # COMMAND ---------- diff --git a/notebooks/week3/01_feature_serving.py b/notebooks/week3/01_feature_serving.py index f1a1605..0401e98 100644 --- a/notebooks/week3/01_feature_serving.py +++ b/notebooks/week3/01_feature_serving.py @@ -1,4 +1,9 @@ # Databricks notebook source +# MAGIC %md +# MAGIC # Setup + +# COMMAND ---------- + # MAGIC %pip install ../power_consumption-0.0.1-py3-none-any.whl # COMMAND ---------- @@ -6,3 +11,251 @@ dbutils.library.restartPython() # COMMAND ---------- + +# MAGIC %md +# MAGIC * Create feature table in unity catalog, it will be a delta table +# MAGIC * Create online table which uses the feature delta table created in the previous step +# MAGIC * Create a feature spec. When you create a feature spec, you specify the source Delta table. +# MAGIC * This allows the feature spec to be used in both offline and online scenarios. +# MAGIC * For online lookups, the serving endpoint automatically uses the online table to perform low-latency feature lookups. +# MAGIC * The source Delta table and the online table must use the same primary key. + +# COMMAND ---------- + +# DBTITLE 1,Imports +import random +import time +from concurrent.futures import ThreadPoolExecutor, as_completed + +import mlflow +import pandas as pd +import requests +from databricks import feature_engineering +from databricks.feature_engineering import FeatureLookup +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.catalog import ( + OnlineTableSpec, + OnlineTableSpecTriggeredSchedulingPolicy, +) +from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput +from pyspark.sql import SparkSession +from power_consumption.config import Config + +# COMMAND ---------- + +# DBTITLE 1,Initialise Databricks Clients +workspace = WorkspaceClient() +fe = feature_engineering.FeatureEngineeringClient() + +# COMMAND ---------- + +# DBTITLE 1,Set MLFlow registry URI +mlflow.set_registry_uri("databricks-uc") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # Data Loading + +# COMMAND ---------- + +# DBTITLE 1,Load config and tables +config = Config.from_yaml("../../configs/project_configs.yml") + +num_features = config.processed_features.num_features +cat_features = config.processed_features.cat_features +target = config.target.target +parameters = config.hyperparameters.__dict__ + +catalog_name = config.catalog_name +schema_name = config.schema_name + +# COMMAND ---------- + +# DBTITLE 1,Define feature tables (offline and online) +feature_table_name = f"{catalog_name}.{schema_name}.power_consumption_preds" +online_table_name = f"{catalog_name}.{schema_name}.power_consumption_preds_online" + +# COMMAND ---------- + +# DBTITLE 1,Load training and test sets from UC +train_set = spark.table(f"{catalog_name}.{schema_name}.train_set").toPandas() +test_set = spark.table(f"{catalog_name}.{schema_name}.test_set").toPandas() + +df = pd.concat([train_set, test_set]) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # Model Loading and Pipelines + +# COMMAND ---------- + +# DBTITLE 1,Load MLFlow model +pipeline = mlflow.sklearn.load_model(f"models:/{catalog_name}.{schema_name}.power_consumption_model/2") + +# COMMAND ---------- + +# DBTITLE 1,Prepare DF for feature table +preds_df = df[["DateTime", "Temperature", "Humidity", "Wind_Speed"]] + +# COMMAND ---------- + +predictions = pipeline.predict(df[cat_features + num_features]) + +# COMMAND ---------- + +preds_df.loc[:, "Predicted_PowerConsumption_Zone_1"] = predictions[:, 0] +preds_df.loc[:, "Predicted_PowerConsumption_Zone_2"] = predictions[:, 1] +preds_df.loc[:, "Predicted_PowerConsumption_Zone_3"] = predictions[:, 2] + +# COMMAND ---------- + +preds_df = spark.createDataFrame(preds_df) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # Create Feature Tables + +# COMMAND ---------- + +# DBTITLE 1,Feature table +fe.create_table( + name=feature_table_name, + primary_keys=["DateTime"], + df=preds_df, + description="Power consumption feature table", + +) + +# COMMAND ---------- + +# Enable Change Data Feed +spark.sql(f""" + ALTER TABLE {feature_table_name} + SET TBLPROPERTIES (delta.enableChangeDataFeed = true) +""") + +# COMMAND ---------- + +# DBTITLE 1,Online table using feature table +spec = OnlineTableSpec( + primary_key_columns=["DateTime"], + source_table_full_name=feature_table_name, + run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({"triggered":"true"}), + perform_full_copy=False, +) + +# COMMAND ---------- + +# DBTITLE 1,Create online table in Databricks +online_table_pipeline = workspace.online_tables.create(name=online_table_name, spec=spec) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # Create FeatureLookUp and FeatureSpecTable Feature Table + +# COMMAND ---------- + +# DBTITLE 1,Define features to look up from the feature table +features = [ + FeatureLookup( + table_name=feature_table_name, + lookup_key="DateTime", + feature_names=[ + "Temperature", + "Humidity", + "Wind_Speed", + "Predicted_PowerConsumption_Zone_1", + "Predicted_PowerConsumption_Zone_2", + "Predicted_PowerConsumption_Zone_3", + ], + ) +] + +# COMMAND ---------- + +# DBTITLE 1,Feature spec for serving +feature_spec_name = f"{catalog_name}.{schema_name}.return_predictions" +fe.create_feature_spec( + name=feature_spec_name, + features=features, + exclude_columns=None, +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # Deploy Feature Serving Endpoint + +# COMMAND ---------- + +endpoint_name = "power-consumption-feature-serving" + +# COMMAND ---------- + +# DBTITLE 1,Create endpoint using feature spec +workspace.serving_endpoints.create( + name=endpoint_name, + config=EndpointCoreConfigInput( + served_entities=[ + ServedEntityInput( + entity_name=feature_spec_name, + scale_to_zero_enabled=True, + workload_size="Small", + ) + ] + ), +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # Call The Endpoint + +# COMMAND ---------- + +# DBTITLE 1,Get token and host from Databricks Session +token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() +host = spark.conf.get("spark.databricks.workspaceUrl") + +# COMMAND ---------- + +# Convert the Spark DataFrame column to a list +date_list = preds_df.select("DateTime").rdd.flatMap(lambda x: x).collect() + +# COMMAND ---------- + +random_date = random.choice(date_list) +random_date_str = random_date.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "+00:00" + +# COMMAND ---------- + +print(random_date) +print(random_date_str) + +# COMMAND ---------- + +# DBTITLE 1,Call endpoint [dataframe_records] +start_time = time.time() + +serving_endpoint = f"https://{host}/serving-endpoints/{endpoint_name}/invocations" + +response = requests.post( + f"{serving_endpoint}", + headers={"Authorization": f"Bearer {token}"}, + json={"dataframe_records": [{"DateTime":random_date_str}]}, +) + +end_time = time.time() +execution_time = end_time - start_time + +print("Response status:", response.status_code) +print("Response text:", response.text) +print("Execution time:", execution_time, "seconds") + +# COMMAND ---------- + +# DBTITLE 1,Call endpoint [dataframe_split] From bc5705c4bff7bfa28aecbee54be1c9e0f2f7ea39 Mon Sep 17 00:00:00 2001 From: Garett Sidwell <63107655+Garett601@users.noreply.github.com> Date: Fri, 1 Nov 2024 15:51:03 +0200 Subject: [PATCH 05/11] feat(01_feature_serving.py): week3 notebook 1 create feature serving endpoint using feature tables and online table --- notebooks/week3/01_feature_serving.py | 90 ++++++++++++++++++++++----- 1 file changed, 76 insertions(+), 14 deletions(-) diff --git a/notebooks/week3/01_feature_serving.py b/notebooks/week3/01_feature_serving.py index 0401e98..a0ff714 100644 --- a/notebooks/week3/01_feature_serving.py +++ b/notebooks/week3/01_feature_serving.py @@ -41,6 +41,9 @@ from pyspark.sql import SparkSession from power_consumption.config import Config +# COMMAND ---------- +spark = SparkSession.builder.getOrCreate() + # COMMAND ---------- # DBTITLE 1,Initialise Databricks Clients @@ -97,7 +100,7 @@ # COMMAND ---------- # DBTITLE 1,Prepare DF for feature table -preds_df = df[["DateTime", "Temperature", "Humidity", "Wind_Speed"]] +preds_df = df[["id", "Temperature", "Humidity", "Wind_Speed"]] # COMMAND ---------- @@ -123,7 +126,7 @@ # DBTITLE 1,Feature table fe.create_table( name=feature_table_name, - primary_keys=["DateTime"], + primary_keys=["id"], df=preds_df, description="Power consumption feature table", @@ -141,7 +144,7 @@ # DBTITLE 1,Online table using feature table spec = OnlineTableSpec( - primary_key_columns=["DateTime"], + primary_key_columns=["id"], source_table_full_name=feature_table_name, run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({"triggered":"true"}), perform_full_copy=False, @@ -163,7 +166,7 @@ features = [ FeatureLookup( table_name=feature_table_name, - lookup_key="DateTime", + lookup_key="id", feature_names=[ "Temperature", "Humidity", @@ -178,7 +181,7 @@ # COMMAND ---------- # DBTITLE 1,Feature spec for serving -feature_spec_name = f"{catalog_name}.{schema_name}.return_predictions" +feature_spec_name = f"{catalog_name}.{schema_name}.return_predictions_2" fe.create_feature_spec( name=feature_spec_name, features=features, @@ -224,17 +227,11 @@ # COMMAND ---------- # Convert the Spark DataFrame column to a list -date_list = preds_df.select("DateTime").rdd.flatMap(lambda x: x).collect() - -# COMMAND ---------- - -random_date = random.choice(date_list) -random_date_str = random_date.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "+00:00" +id_list = preds_df.select("id").rdd.flatMap(lambda x: x).collect() # COMMAND ---------- -print(random_date) -print(random_date_str) +random_id = random.choice(id_list) # COMMAND ---------- @@ -246,7 +243,7 @@ response = requests.post( f"{serving_endpoint}", headers={"Authorization": f"Bearer {token}"}, - json={"dataframe_records": [{"DateTime":random_date_str}]}, + json={"dataframe_records": [{"id":random_id}]}, ) end_time = time.time() @@ -259,3 +256,68 @@ # COMMAND ---------- # DBTITLE 1,Call endpoint [dataframe_split] +start_time = time.time() + +response = requests.post( + f"{serving_endpoint}", + headers={"Authorization": f"Bearer {token}"}, + json={"dataframe_split": {"columns": ["id"], "data": [[random_id]]}}, +) + +end_time = time.time() +execution_time = end_time - start_time + +print("Response status:", response.status_code) +print("Response text:", response.text) +print("Execution time:", execution_time, "seconds") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # Load Test + +# COMMAND ---------- + +headers = {"Authorization": f"Bearer {token}"} +num_requests = 10 + +# COMMAND ---------- + +# DBTITLE 1,Function to make requests and record latency +def send_request(): + random_id = random.choice(id_list) + start_time = time.time() + response = requests.post( + serving_endpoint, + headers=headers, + json={"dataframe_records": [{"id": random_id}]}, + ) + end_time = time.time() + latency = end_time - start_time + return response.status_code, latency + +# COMMAND ---------- + +# DBTITLE 1,Send multiple requests and measure latency +# Measure total execution time +total_start_time = time.time() +latencies = [] + +# Send requests concurrently +with ThreadPoolExecutor(max_workers=100) as executor: + futures = [executor.submit(send_request) for _ in range(num_requests)] + + for future in as_completed(futures): + status_code, latency = future.result() + latencies.append(latency) + +total_end_time = time.time() +total_execution_time = total_end_time - total_start_time + +# Calculate the average latency +average_latency = sum(latencies) / len(latencies) + +print("\nTotal execution time:", total_execution_time, "seconds") +print("Average latency per request:", average_latency, "seconds") + +# COMMAND ---------- From 9094c8e38ee8b8f140c0f8f2e0d686ed96c1e7a1 Mon Sep 17 00:00:00 2001 From: Garett Sidwell <63107655+Garett601@users.noreply.github.com> Date: Mon, 11 Nov 2024 12:37:45 +0200 Subject: [PATCH 06/11] feat(week3/*): week3 notebook 1,2,3 --- notebooks/week3/01_feature_serving.py | 3 - notebooks/week3/02_model_serving.py | 190 ++++++++++++++++++ .../week3/03_model_serving_feature_lookup.py | 120 +++++++++++ 3 files changed, 310 insertions(+), 3 deletions(-) create mode 100644 notebooks/week3/02_model_serving.py create mode 100644 notebooks/week3/03_model_serving_feature_lookup.py diff --git a/notebooks/week3/01_feature_serving.py b/notebooks/week3/01_feature_serving.py index a0ff714..c99f3c8 100644 --- a/notebooks/week3/01_feature_serving.py +++ b/notebooks/week3/01_feature_serving.py @@ -41,9 +41,6 @@ from pyspark.sql import SparkSession from power_consumption.config import Config -# COMMAND ---------- -spark = SparkSession.builder.getOrCreate() - # COMMAND ---------- # DBTITLE 1,Initialise Databricks Clients diff --git a/notebooks/week3/02_model_serving.py b/notebooks/week3/02_model_serving.py new file mode 100644 index 0000000..ef198f5 --- /dev/null +++ b/notebooks/week3/02_model_serving.py @@ -0,0 +1,190 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Setup + +# COMMAND ---------- + +# MAGIC %pip install ../power_consumption-0.0.1-py3-none-any.whl + +# COMMAND ---------- + +dbutils.library.restartPython() + +# COMMAND ---------- + +# DBTITLE 1,imports +import time + +import requests +import random +from concurrent.futures import ThreadPoolExecutor, as_completed + +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.serving import ( + EndpointCoreConfigInput, + ServedEntityInput, + TrafficConfig, + Route, +) +from pyspark.sql import SparkSession +from power_consumption.config import Config + +# COMMAND ---------- + +# DBTITLE 1,initialisations +workspace = WorkspaceClient() +spark = SparkSession.builder.getOrCreate() + +config = Config.from_yaml("../../configs/project_configs.yml") + +catalog_name = config.catalog_name +schema_name = config.schema_name + +# COMMAND ---------- + +# DBTITLE 1,load data +train_set = spark.table(f"{catalog_name}.{schema_name}.train_set").toPandas() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # Create endpoint + +# COMMAND ---------- + +workspace.serving_endpoints.create( + name="power-consumption-model-serving", + config=EndpointCoreConfigInput( + served_entities=[ + ServedEntityInput( + entity_name=f"{catalog_name}.{schema_name}.power_consumption_model", + scale_to_zero_enabled=True, + workload_size="Small", + entity_version=2, + ) + ], + traffic_config=TrafficConfig( + routes=[ + Route( + served_model_name="power_consumption_model-2", + traffic_percentage=100, + ) + ] + ), + ), +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # Call the Endpoint + +# COMMAND ---------- + +# DBTITLE 1,set token and host +token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() +host = spark.conf.get("spark.databricks.workspaceUrl") + +# COMMAND ---------- + +# DBTITLE 1,create sample request body +required_columns = config.processed_features.num_features + config.processed_features.cat_features + +sampled_records = train_set[required_columns].sample(n=1000, replace=True).to_dict(orient="records") + +dataframe_records = [[record] for record in sampled_records] + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### example of body +# MAGIC +# MAGIC Each body should be a list of json with columns +# MAGIC +# MAGIC ```python +# MAGIC [{'Temperature': 0.9732703198032969, +# MAGIC 'Humidity': -0.27133371777652626, +# MAGIC 'Wind_Speed': -0.8048607918423459, +# MAGIC 'Hour': 0, +# MAGIC 'Day': 0, +# MAGIC 'Month': 0, +# MAGIC 'general_diffuse_flows': 2.372568431682211, +# MAGIC 'diffuse_flows': 0.12697856844758687, +# MAGIC 'DayOfWeek_1': 1.0, +# MAGIC 'DayOfWeek_2': 0.0, +# MAGIC 'DayOfWeek_3': 0.0, +# MAGIC 'DayOfWeek_4': 0.0, +# MAGIC 'DayOfWeek_5': 0.0, +# MAGIC 'DayOfWeek_6': 0.0, +# MAGIC 'IsWeekend_1': 0.0}] +# MAGIC ``` + +# COMMAND ---------- + +# DBTITLE 1,call endpoint +start_time = time.time() + +model_serving_endpoint = ( + f"https://{host}/serving-endpoints/power-consumption-model-serving/invocations" +) + +headers = {"Authorization": f"Bearer {token}"} + +response = requests.post( + f"{model_serving_endpoint}", + headers=headers, + json={"dataframe_records": dataframe_records[0]}, +) + +end_time = time.time() +execution_time = end_time - start_time + +print("Response status:", response.status_code) +print("Reponse text:", response.text) +print("Execution time:", execution_time, "seconds") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # Load Test + +# COMMAND ---------- + +# DBTITLE 1,initialise variables and request function +num_requests = 1000 + +def send_request(): + random_record = random.choice(dataframe_records) + start_time = time.time() + response = requests.post( + model_serving_endpoint, + headers=headers, + json={"dataframe_records": random_record}, + ) + end_time = time.time() + latency = end_time - start_time + return response.status_code, latency + +# COMMAND ---------- + +# DBTITLE 1,send concurrent requests +total_start_time = time.time() +latencies = [] + +with ThreadPoolExecutor(max_workers=100) as executor: + futures = [executor.submit(send_request) for _ in range(num_requests)] + + for future in as_completed(futures): + status_code, latency = future.result() + latencies.append(latency) + +total_end_time = time.time() +total_execution_time = total_end_time - total_start_time + +# Calculate the average latency +average_latency = sum(latencies) / len(latencies) + +print("\nTotal execution time:", total_execution_time, "seconds") +print("Average latency per request:", average_latency, "seconds") + +# COMMAND ---------- diff --git a/notebooks/week3/03_model_serving_feature_lookup.py b/notebooks/week3/03_model_serving_feature_lookup.py new file mode 100644 index 0000000..181f440 --- /dev/null +++ b/notebooks/week3/03_model_serving_feature_lookup.py @@ -0,0 +1,120 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Setup + +# COMMAND ---------- + +# MAGIC %pip install ../power_consumption-0.0.1-py3-none-any.whl + +# COMMAND ---------- + +dbutils.library.restartPython() + +# COMMAND ---------- + +# DBTITLE 1,imports +import time + +import requests +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.catalog import ( + OnlineTableSpec, + OnlineTableSpecTriggeredSchedulingPolicy, +) +from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput +from pyspark.sql import SparkSession + +from power_consumption.config import Config + +# COMMAND ---------- + +# DBTITLE 1,initialisations +workspace = WorkspaceClient() +spark = SparkSession.builder.getOrCreate() + +config = Config.from_yaml("../../configs/project_configs.yml") + +catalog_name = config.catalog_name +schema_name = config.schema_name + +# COMMAND ---------- + +# DBTITLE 1,create online table +online_table_name = f"{catalog_name}.{schema_name}.power_consumption_online" +spec = OnlineTableSpec( + primary_key_columns=["id"], + source_table_full_name=f"{catalog_name}.{schema_name}.power_consumption_features", + run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({"triggered": "true"}), + perform_full_copy=False, +) + +online_table_pipeline = workspace.online_tables.create(name=online_table_name, spec=spec) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # Create Endpoint + +# COMMAND ---------- + +# DBTITLE 1,create endpoint +workspace.serving_endpoints.create( + name="power-consumption-model-serving-fe", + config=EndpointCoreConfigInput( + served_entities=[ + ServedEntityInput( + entity_name=f"{catalog_name}.{schema_name}.power_consumption_model_fe", + scale_to_zero_enabled=True, + workload_size="Small", + entity_version=3, + ) + ] + ), +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # Call the Endpoint + +# COMMAND ---------- + +# DBTITLE 1,set token and host +token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() +host = spark.conf.get("spark.databricks.workspaceUrl") + +# COMMAND ---------- + +# DBTITLE 1,create request body +columns = config.processed_features.num_features + config.processed_features.cat_features +exclude_columns = ["general_diffuse_flows", "diffuse_flows"] + +required_columns = [col for col in columns if col not in exclude_columns] + + +train_set = spark.table(f"{catalog_name}.{schema_name}.train_set").toPandas() + +sampled_records = train_set[required_columns].sample(n=1000, replace=True).to_dict(orient="records") + +dataframe_records = [[record] for record in sampled_records] + +# COMMAND ---------- + +start_time = time.time() + +model_serving_endpoint = f"https://{host}/serving-endpoints/power-consumption-model-serving-fe/invocations" + +response = requests.post( + f"{model_serving_endpoint}", + headers={"Authorization": f"Bearer {token}"}, + json={"dataframe_records": dataframe_records[0]}, +) + +end_time = time.time() +execution_time = end_time - start_time + +print("Response status:", response.status_code) +print("Reponse text:", response.text) +print("Execution time:", execution_time, "seconds") + +# COMMAND ---------- From 83ce0a66953044fa78339e3557b36c16df4ffbfb Mon Sep 17 00:00:00 2001 From: Garett Sidwell <63107655+Garett601@users.noreply.github.com> Date: Mon, 11 Nov 2024 12:39:28 +0200 Subject: [PATCH 07/11] refactor(config.py-project_configs.yml): update configs for ab test --- configs/project_configs.yml | 7 +++++++ .../power_consumption-0.0.1-py3-none-any.whl | Bin 9914 -> 9949 bytes power_consumption/config.py | 9 +++++++++ 3 files changed, 16 insertions(+) diff --git a/configs/project_configs.yml b/configs/project_configs.yml index d4eee99..ad3a923 100644 --- a/configs/project_configs.yml +++ b/configs/project_configs.yml @@ -6,6 +6,13 @@ hyperparameters: n_estimators: 1000 max_depth: 6 +ab_test_hyperparameters: + learning_rate_a: 0.02 + learning_rate_b: 0.02 + n_estimators: 1000 + max_depth_a: 6 + max_depth_b: 10 + processed_features: num_features: - Temperature diff --git a/notebooks/power_consumption-0.0.1-py3-none-any.whl b/notebooks/power_consumption-0.0.1-py3-none-any.whl index d035541f9f8e8da25f8b661271a1b587ff687a32..6ea843059cb4573032e8a08df260bf7f811b906b 100644 GIT binary patch delta 1666 zcmY*Z2{@E#9RKDp#!(u^=rPW@h;>BDQLY_~D}#{em<}2YA*Gfkn(2wG>vQD_qjGGs zawQ|9l0stGYACaE?51dRv8vV1%(kBOJP@Av!v{_pR5p7-~A+r7uV*HJxTa5Mk_ zNZ|Ia$eeYyD9IGOizYri0s#P&8~|WJl#+T5Lzx-z!hFN+H^wXtfN()>n4W2E6eN_F8>MV z&?2DAAl$S4GA(PvyHWQo88<|Z8DD3oHzy*^akL8zmnhqwk%p2t~y=c;R!tgwRlsSsyox@VVL>ye0W3i=~E-b|dS6%*TZ4w0ADQZgaW z#^z$tuS@3|$N6}+gUvJ8_@jZFd?#zY>)dLs&VU7&A?=wuM8x*8acbB`*Q|ZTzn^?E zaQ0rhQGQCSJZ+@Glzcv*(r(415ARG_~J#H}V(6PoY>N~oBce5oUW-?i%^OAP@ZtcA$?GbwM)iYu(iYWX|Trzjk z1DkX}{bS=?xb6PewUvDj%nJ5JQ)qFeh-rT7AkT7UGb821VzF2qB%W{y{MKKi5e*M1 zS=3V%>fG|bbgd%sl(uM@<8ExQ%-iWl3i9L!_zI5ob3$KkA8Tz*94r}%vL%PdZIsD+)q6y*!7yyVB0YFoF>P-RAVwlVj z24&AHY$Psmle~Co3CuzuKrCV>Ok$U&D1<6px&TuYnKA&T)FChHlh6UEbdWqmS)Y)g zmSd+ZW67%(LttH!{%SN%QMv$ob;@OPgzi2C$#<|%OL*HJP@rqYyp>zxh<1r*DjgLT zc0ru4LkI*%rGvSq1z~C>yXE%Bj2_7kuxi_$))b^&`XQPhk`Jq^+>h}t3}X0UVm6Rm zhMn7LTz^U1n?BQN`Bkwfz{s%w+{JsG;^)@&y)~9O2?L{L`{;`g?6h^;KPlil11@K0 z*?3iHR5Y0QZ0n)=Gf8b%T(`LpijwN5gdNj~wTQ~0;v;;kpA*t@@*WPF^S1UDYgB&~ zsECInyq)^};z7G|)#ftjUY{RL<4CSJL} zI(Q}i%tz3dalMzCM7EWKs2FW}PkE5|{?}<+7XQ3PoFm?p`TXcq@ZjLx^G&<@4v=5r z7 zYmoU1rkvVanRrM>_zE5Q;whV1-uKz1C32nc%BLx@3fspAJU+t32a&%-9*jxgd!dW^ zGh2UDJr}FRbU4ZC$?|=>^&98B&qwiqFTe~tg*ukhDB4@K)Yln0YDf9{S00;nT1Xu$ zas{);hAt863zkr>Igc`62AJ;DP`LZZq&uo0#@5=yc-C--x~Y(uXr0|-Qn1~OCl}_W zxI%pxf#4b(h?yREKPH}`V9YZ~Y%{|sxwG?N%!SC=vu=7WcjH3en|J2bg&NKWbMI7? zYHJsDYAUrpB<@~A2(P&HVU{!G9^b<`w~7HCP-){{od z>yr+BTmb;M8v(#*O)n2E5mE!qNhmOnh?eECo`{7`=&ZtXM3fZHC1If{y5CVF*{-s{ cN`5ID4giM#+qF4m@;@R9EGJP#6XP08v21 z#3NNge+S>VjxGGZF@Z}geT3_le-Gr3&+dvpjM;Od*yek^N@&hG0A;ZokpJd z*vjV!72yQ6uEF5@a+>32R$3tv)${N(wb8G=iSnr>BEGaBv7e8|^+kWAd(Cw<&gF1Z z95L6*D`KYhuSYr%mtq*UsVu|K`;LZ-Ef?k7i<_uK;~4Y5GY31o4AK6%ljH5_TzNgO z26?Hm!XB;e)!}Ks;C?!GrQ5(a8CRMe`xal+``9C{U$Z}2mmNsU|NF{WI7T++o7&le zbqi+ciNKOl^A1r`{E8{vhY+0T?j+}PchG_@{#HVV#&iN9Gr!IR?P=>{4SyP1TB`VqLzTu01+y$Ie$hb|5Gr>1L{o!Ib|d86>z$I(zbj4)U)i~j|CHMA4f z?D57P)q_ON_pYhsQ8VbpbG&vHFCWcxX7kOP%o;2mvh77{orT(scQ|Bf>tI%&cCb$HgAs?B43p!q$kQaHFD*x0255~)sDVMZb zUun1=$lhGP;r+S*v3|(C!Sx-7_$>l8(-;}xu)|ELuQrM+A^wu)2IUV{dD?b)d1mYIVCQdr$lMM=^@X@FSiGRfYGujm0v3 z)hWN-#zsq>afYcX_0ChmIWec)>XaVrGGKPEc0G$4a*;gtMe5Te=6zXRCn<{ap(Cw~ zj`~|+^f@G|(Qz;qOwEa4yUq<{i@iED)SMF3wy2S4Ao8HcML6Pg@e8?@aywTUqtc7n z<^{v*dz%&^;G~ZFr*Ngops}g)TD@}#P3rniQ&4n*3N1WryCILu^cKZD) z8CUm?)nW$k40)&Mi<^3q6Tx{FP5s%07N3*ctb&klW=GhEt8GO|A?s%T@#x}*p)6LD z?w*848LbV}P)a@ZatjIo^9TS?5O{xI3L`1t0g{@avVBQ7 zp(!|cPgnso78VC9Nk{>1=VK3HKBlCRzY|F^4ssb(C+iEOon(y-eMJBO%h%nvO?sK{ zLjqmIF<{HTkv(LbP(2#dmsSL!6s$mAiGmYa=Oc<9VJlIqC1sxxF#xEL1AtxI)Y36q zpA2lEU<7e&eq5`v01@zT^&hVV?;p$Ax+FAz?%Af}1{zb11etID!v8zfMYi*OC}0;= KR@j5M#s2|441|>c diff --git a/power_consumption/config.py b/power_consumption/config.py index a800d63..48be8f4 100644 --- a/power_consumption/config.py +++ b/power_consumption/config.py @@ -10,6 +10,14 @@ class Hyperparameters(BaseModel): max_depth: int +class ABTestHyperparameters(BaseModel): + learning_rate_a: float + learning_rate_b: float + n_estimators: int + max_depth_a: int + max_depth_b: int + + class ProcessedFeatures(BaseModel): num_features: List[str] cat_features: List[str] @@ -29,6 +37,7 @@ class Config(BaseModel): catalog_name: str schema_name: str hyperparameters: Hyperparameters + ab_test_hyperparameters: ABTestHyperparameters processed_features: ProcessedFeatures target: Target dataset: Dataset From 1e4b71961f05aeb425d26adc85764b11c05afbc6 Mon Sep 17 00:00:00 2001 From: Garett Sidwell <63107655+Garett601@users.noreply.github.com> Date: Mon, 11 Nov 2024 14:36:32 +0200 Subject: [PATCH 08/11] feat(04_AB_test_model_serving.py): week3 notebook 4 --- notebooks/week3/04_AB_test_model_serving.py | 364 ++++++++++++++++++++ 1 file changed, 364 insertions(+) create mode 100644 notebooks/week3/04_AB_test_model_serving.py diff --git a/notebooks/week3/04_AB_test_model_serving.py b/notebooks/week3/04_AB_test_model_serving.py new file mode 100644 index 0000000..df850c0 --- /dev/null +++ b/notebooks/week3/04_AB_test_model_serving.py @@ -0,0 +1,364 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Setup + +# COMMAND ---------- + +# MAGIC %pip install ../power_consumption-0.0.1-py3-none-any.whl + +# COMMAND ---------- + +dbutils.library.restartPython() + +# COMMAND ---------- + +# DBTITLE 1,imports +import time + +import mlflow +import pandas as pd +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput +from lightgbm import LGBMRegressor +from sklearn.multioutput import MultiOutputRegressor +from mlflow import MlflowClient +from mlflow.models import infer_signature +from pyspark.sql import SparkSession +from sklearn.compose import ColumnTransformer +from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import OneHotEncoder +import hashlib +import requests + +from power_consumption.config import Config + +# COMMAND ---------- + +# DBTITLE 1,initialisations +mlflow.set_tracking_uri("databricks") +mlflow.set_registry_uri("databricks-uc") + +client = MlflowClient() + +spark = SparkSession.builder.getOrCreate() + +config = Config.from_yaml("../../configs/project_configs.yml") + +catalog_name = config.catalog_name +schema_name = config.schema_name + +num_features = config.processed_features.num_features +cat_features = config.processed_features.cat_features +target = config.target.target +ab_test_params = config.ab_test_hyperparameters.__dict__ + +# COMMAND ---------- + +# DBTITLE 1,get a/b parameters +parameters_a = { + "learning_rate": ab_test_params["learning_rate_a"], + "n_estimators": ab_test_params["n_estimators"], + "max_depth": ab_test_params["max_depth_a"], +} + +parameters_b = { + "learning_rate": ab_test_params["learning_rate_b"], + "n_estimators": ab_test_params["n_estimators"], + "max_depth": ab_test_params["max_depth_b"], +} + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # Data Loading + +# COMMAND ---------- + +train_set_spark = spark.table(f"{catalog_name}.{schema_name}.train_set") +train_set = train_set_spark.toPandas() +test_set = spark.table(f"{catalog_name}.{schema_name}.test_set").toPandas() + +# COMMAND ---------- + +# DBTITLE 1,define features and target +X_train = train_set[num_features + cat_features] +y_train = train_set[target] +X_test = test_set[num_features + cat_features] +y_test = test_set[target] + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # Train and Log Models + +# COMMAND ---------- + +# DBTITLE 1,create pipelines +# Define the preprocessor for categorical features +preprocessor = ColumnTransformer( + transformers=[('cat', OneHotEncoder(handle_unknown='ignore'), cat_features)], + remainder='passthrough' +) + +# Create the pipeline with preprocessing and the multi-output LightGBM regressor +pipeline_a = Pipeline(steps=[ + ('preprocessor', preprocessor), + ('regressor', MultiOutputRegressor(LGBMRegressor(**parameters_a))) +]) + +pipeline_b = Pipeline(steps=[ + ('preprocessor', preprocessor), + ('regressor', MultiOutputRegressor(LGBMRegressor(**parameters_b))) +]) + +# Set the MLflow experiment to track this A/B testing project +mlflow.set_experiment(experiment_name="/Shared/power-consumption-ab") +model_name = f"{catalog_name}.{schema_name}.power_consumption_model_ab" + +git_sha = "30d57afb2efca70cede3061d00f2a553c2b4779b" + +# COMMAND ---------- + +# DBTITLE 1,Train Model A and Log with MLflow +# Start MLflow run to track training of Model A +with mlflow.start_run(tags={"model_class": "A", "git_sha": git_sha}) as run: + run_id = run.info.run_id + + # Train the model + pipeline_a.fit(X_train, y_train) + y_pred = pipeline_a.predict(X_test) + + # Calculate performance metrics + mse = mean_squared_error(y_test, y_pred) + mae = mean_absolute_error(y_test, y_pred) + r2 = r2_score(y_test, y_pred) + + # Log model parameters, metrics, and other artifacts in MLflow + mlflow.log_param("model_type", "LightGBM with preprocessing") + mlflow.log_params(parameters_a) + mlflow.log_metric("mse", mse) + mlflow.log_metric("mae", mae) + mlflow.log_metric("r2_score", r2) + signature = infer_signature(model_input=X_train, model_output=y_pred) + + # Log the input dataset for tracking reproducibility + dataset = mlflow.data.from_spark(train_set_spark, + table_name=f"{catalog_name}.{schema_name}.train_set", + version="0") + mlflow.log_input(dataset, context="training") + + # Log the pipeline model in MLflow with a unique artifact path + mlflow.sklearn.log_model(sk_model=pipeline_a, artifact_path="lightgbm-pipeline-model", signature=signature) + +model_version = mlflow.register_model( + model_uri=f"runs:/{run_id}/lightgbm-pipeline-model", name=model_name, tags={"git_sha": f"{git_sha}"} +) + +# COMMAND ---------- + +# DBTITLE 1,Register Model A and Assign Alias +model_version_alias = "model_A" + +client.set_registered_model_alias(model_name, model_version_alias, f"{model_version.version}") +model_uri = f"models:/{model_name}@{model_version_alias}" +model_A = mlflow.sklearn.load_model(model_uri) + +# COMMAND ---------- + +# DBTITLE 1,Train Model B and Log with MLflow +# Start MLflow run to track training of Model B +with mlflow.start_run(tags={"model_class": "B", "git_sha": git_sha}) as run: + run_id = run.info.run_id + + # Train the model + pipeline_b.fit(X_train, y_train) + y_pred = pipeline_b.predict(X_test) + + # Calculate performance metrics + mse = mean_squared_error(y_test, y_pred) + mae = mean_absolute_error(y_test, y_pred) + r2 = r2_score(y_test, y_pred) + + # Log model parameters, metrics, and other artifacts in MLflow + mlflow.log_param("model_type", "LightGBM with preprocessing") + mlflow.log_params(parameters_b) + mlflow.log_metric("mse", mse) + mlflow.log_metric("mae", mae) + mlflow.log_metric("r2_score", r2) + signature = infer_signature(model_input=X_train, model_output=y_pred) + + # Log the input dataset for tracking reproducibility + dataset = mlflow.data.from_spark(train_set_spark, + table_name=f"{catalog_name}.{schema_name}.train_set", + version="0") + mlflow.log_input(dataset, context="training") + + # Log the pipeline model in MLflow with a unique artifact path + mlflow.sklearn.log_model(sk_model=pipeline_b, artifact_path="lightgbm-pipeline-model", signature=signature) + +model_version = mlflow.register_model( + model_uri=f"runs:/{run_id}/lightgbm-pipeline-model", name=model_name, tags={"git_sha": f"{git_sha}"} +) + +# COMMAND ---------- + +# DBTITLE 1,Register Model B and Assign Alias +model_version_alias = "model_B" + +client.set_registered_model_alias(model_name, model_version_alias, f"{model_version.version}") +model_uri = f"models:/{model_name}@{model_version_alias}" +model_B = mlflow.sklearn.load_model(model_uri) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # Define Custom A/B Test Model + +# COMMAND ---------- + +class PowerConsumptionModelWrapper(mlflow.pyfunc.PythonModel): + def __init__(self, models): + self.models = models + self.model_a = models[0] + self.model_b = models[1] + + def predict(self, context, model_input): + if isinstance(model_input, pd.DataFrame): + time_id = str(model_input["id"].values[0]) + hashed_id = hashlib.md5(time_id.encode(encoding="UTF-8")).hexdigest() + # convert a hexadecimal (base-16) string into an integer + if int(hashed_id, 16) % 2: + predictions = self.model_a.predict(model_input.drop(["id"], axis=1)) + return {"Prediction": predictions[0], "model": "Model A"} + else: + predictions = self.model_b.predict(model_input.drop(["id"], axis=1)) + return {"Prediction": predictions[0], "model": "Model B"} + else: + raise ValueError("Input must be a pandas DataFrame.") + +# COMMAND ---------- + +X_train = train_set[num_features + cat_features + ["id"]] +X_test = test_set[num_features + cat_features + ["id"]] + +# COMMAND ---------- + +models = [model_A, model_B] +wrapped_model = PowerConsumptionModelWrapper(models) +example_input_1 = X_test.iloc[0:1] +example_prediction_1 = wrapped_model.predict( + context=None, + model_input=example_input_1) +example_input_2 = X_test.iloc[1:2] +example_prediction_2 = wrapped_model.predict( + context=None, + model_input=example_input_2) +print("Example Prediction:", example_prediction_1) +print("Example Prediction:", example_prediction_2) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # Log, Register and Use Models + +# COMMAND ---------- + +# DBTITLE 1,log and register model +mlflow.set_experiment(experiment_name="/Shared/power-consumption-ab-testing") +model_name = f"{catalog_name}.{schema_name}.power_consumption_model_pyfunc_ab_test" + +with mlflow.start_run() as run: + run_id = run.info.run_id + signature = infer_signature(model_input=X_train, + model_output={"Prediction": 1234.5, + "model": "Model B"}) + dataset = mlflow.data.from_spark(train_set_spark, + table_name=f"{catalog_name}.{schema_name}.train_set", + version="0") + mlflow.log_input(dataset, context="training") + mlflow.pyfunc.log_model( + python_model=wrapped_model, + artifact_path="pyfunc-power-consumption-model-ab", + signature=signature + ) +model_version = mlflow.register_model( + model_uri=f"runs:/{run_id}/pyfunc-power-consumption-model-ab", + name=model_name, + tags={"git_sha": f"{git_sha}"} +) + +# COMMAND ---------- + +# DBTITLE 1,load and predict +model = mlflow.pyfunc.load_model(model_uri=f"models:/{model_name}/{model_version.version}") + +predictions = model.predict(X_test.iloc[0:1]) + +predictions + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # Create serving endpoint + +# COMMAND ---------- + +workspace = WorkspaceClient() + +workspace.serving_endpoints.create( + name="power-consumption-model-serving-ab-test", + config=EndpointCoreConfigInput( + served_entities=[ + ServedEntityInput( + entity_name=f"{catalog_name}.{schema_name}.power_consumption_model_pyfunc_ab_test", + scale_to_zero_enabled=True, + workload_size="Small", + entity_version=model_version.version, + ) + ] + ), +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # Call the endpoint + +# COMMAND ---------- + +# DBTITLE 1,set token and host +token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() +host = spark.conf.get("spark.databricks.workspaceUrl") + +# COMMAND ---------- + +# DBTITLE 1,create sample request body +required_columns = config.processed_features.num_features + config.processed_features.cat_features + ["id"] + +sampled_records = train_set[required_columns].sample(n=1000, replace=True).to_dict(orient="records") + +dataframe_records = [[record] for record in sampled_records] + +# COMMAND ---------- + +start_time = time.time() + +model_serving_endpoint = ( + f"https://{host}/serving-endpoints/power-consumption-model-serving-ab-test/invocations" +) + +response = requests.post( + f"{model_serving_endpoint}", + headers={"Authorization": f"Bearer {token}"}, + json={"dataframe_records": dataframe_records[0]}, +) + +end_time = time.time() +execution_time = end_time - start_time + +print("Response status:", response.status_code) +print("Reponse text:", response.text) +print("Execution time:", execution_time, "seconds") + +# COMMAND ---------- From 243b0f27142fb35bb48d4c3c82ceee4be3930dc9 Mon Sep 17 00:00:00 2001 From: Garett Sidwell <63107655+Garett601@users.noreply.github.com> Date: Mon, 11 Nov 2024 14:37:20 +0200 Subject: [PATCH 09/11] test(conftest.yml): update configs for tests --- tests/conftest.yml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/conftest.yml b/tests/conftest.yml index d4eee99..ad3a923 100644 --- a/tests/conftest.yml +++ b/tests/conftest.yml @@ -6,6 +6,13 @@ hyperparameters: n_estimators: 1000 max_depth: 6 +ab_test_hyperparameters: + learning_rate_a: 0.02 + learning_rate_b: 0.02 + n_estimators: 1000 + max_depth_a: 6 + max_depth_b: 10 + processed_features: num_features: - Temperature From a7e3ec9760ab58a1b949fbd9fae5206f10ce04cb Mon Sep 17 00:00:00 2001 From: Garett Sidwell <63107655+Garett601@users.noreply.github.com> Date: Mon, 11 Nov 2024 14:42:21 +0200 Subject: [PATCH 10/11] docs(README.md): Add link to feature engineering package limitations --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index d3f4e63..c6aa005 100644 --- a/README.md +++ b/README.md @@ -121,6 +121,7 @@ week 2 - In example code, the features generated at runtime were not used in the fe model > **Workaround** - Ran the feature engineering notebook from Databricks workspace, this resolved permissions issues + - Feature Engineering package limitations [documentation](https://docs.databricks.com/en/machine-learning/feature-store/python-api.html#limitations) - Ran the feature engineering feature function on the training and testing set and included the new features in the fe model - ```python testing_set = fe.create_training_set( From c529d80281cf1faeea84d03936ee2a174bcbdc1c Mon Sep 17 00:00:00 2001 From: Garett Sidwell <63107655+Garett601@users.noreply.github.com> Date: Mon, 11 Nov 2024 14:49:36 +0200 Subject: [PATCH 11/11] ci(.pre-commit-config.yaml): update stage to pre-commit to remove deprecation warning --- .pre-commit-config.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index fe623e0..bd6dae1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,3 +1,6 @@ +default_stages: +- pre-commit + exclude: ^tests/resources/ repos: - repo: https://github.com/pre-commit/pre-commit-hooks