diff --git a/codeflare.egg-info/PKG-INFO b/codeflare.egg-info/PKG-INFO new file mode 100644 index 0000000..7cb5370 --- /dev/null +++ b/codeflare.egg-info/PKG-INFO @@ -0,0 +1,22 @@ +Metadata-Version: 1.2 +Name: codeflare +Version: 0.1.2.dev0 +Summary: Codeflare pipelines +Home-page: https://github.com/project-codeflare/codeflare +Author: CodeFlare team +Author-email: chcost@us.ibm.com +License: Apache v2.0 +Project-URL: Bug Reports, https://github.com/project-codeflare/codeflare/issues +Project-URL: Source, https://github.com/project-codeflare/codeflare +Description: UNKNOWN +Keywords: ray pipelines +Platform: UNKNOWN +Classifier: Development Status :: 4 - Beta +Classifier: Intended Audience :: Developers +Classifier: License :: OSI Approved :: Apache Software License +Classifier: Programming Language :: Python +Classifier: Programming Language :: Python :: 3 +Classifier: Programming Language :: Python :: 3 :: Only +Classifier: Operating System :: OS Independent +Classifier: Topic :: System :: Distributed Computing +Requires-Python: >=3.7 diff --git a/codeflare.egg-info/SOURCES.txt b/codeflare.egg-info/SOURCES.txt new file mode 100644 index 0000000..5c509c6 --- /dev/null +++ b/codeflare.egg-info/SOURCES.txt @@ -0,0 +1,26 @@ +README.md +setup.cfg +setup.py +codeflare/__init__.py +codeflare/_version.py +codeflare.egg-info/PKG-INFO +codeflare.egg-info/SOURCES.txt +codeflare.egg-info/dependency_links.txt +codeflare.egg-info/requires.txt +codeflare.egg-info/top_level.txt +codeflare/pipelines/Datamodel.py +codeflare/pipelines/Exceptions.py +codeflare/pipelines/Runtime.py +codeflare/pipelines/__init__.py +codeflare/pipelines/utils.py +codeflare/pipelines/tests/__init__.py +codeflare/pipelines/tests/test_Datamodel.py +codeflare/pipelines/tests/test_and.py +codeflare/pipelines/tests/test_helper.py +codeflare/pipelines/tests/test_multibranch.py +codeflare/pipelines/tests/test_or.py +codeflare/pipelines/tests/test_pipeline_predict.py +codeflare/pipelines/tests/test_runtime.py +codeflare/pipelines/tests/test_save_load.py +codeflare/pipelines/tests/test_singleton.py +codeflare/pipelines/tests/test_utils.py \ No newline at end of file diff --git a/codeflare.egg-info/dependency_links.txt b/codeflare.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/codeflare.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/codeflare.egg-info/requires.txt b/codeflare.egg-info/requires.txt new file mode 100644 index 0000000..50ec0c3 --- /dev/null +++ b/codeflare.egg-info/requires.txt @@ -0,0 +1,8 @@ +ray[default,k8s,serve]>=1.3.0 +setuptools>=52.0.0 +sklearn>=0.0 +scikit-learn>=0.24.1 +pandas>=1.2.4 +numpy>=1.18.5 +pickle5>=0.0.11 +graphviz>=0.16 diff --git a/codeflare.egg-info/top_level.txt b/codeflare.egg-info/top_level.txt new file mode 100644 index 0000000..8bee6fa --- /dev/null +++ b/codeflare.egg-info/top_level.txt @@ -0,0 +1 @@ +codeflare diff --git a/codeflare/pipelines/Runtime.py b/codeflare/pipelines/Runtime.py index 837e920..0ca9559 100644 --- a/codeflare/pipelines/Runtime.py +++ b/codeflare/pipelines/Runtime.py @@ -146,6 +146,7 @@ def execute_or_node_remote(node: dm.EstimatorNode, mode: ExecutionType, xy_ref: result = dm.XYRef(res_Xref, xy_ref.get_yref(), prev_node_ptr, prev_node_ptr, [xy_ref]) return result + def execute_or_node(node, pre_edges, edge_args, post_edges, mode: ExecutionType, is_outputNode): """ Inner method that executes the estimator node parallelizing at the level of input objects. This defines the diff --git a/codeflare/pipelines/tests/test_Datamodel.py b/codeflare/pipelines/tests/test_Datamodel.py index cd0242b..9220597 100644 --- a/codeflare/pipelines/tests/test_Datamodel.py +++ b/codeflare/pipelines/tests/test_Datamodel.py @@ -13,15 +13,20 @@ from codeflare.pipelines.Datamodel import Xy from codeflare.pipelines.Runtime import ExecutionType + class FeatureUnion(dm.AndEstimator): def __init__(self): pass + def get_estimator_type(self): return 'transform' + def clone(self): return base.clone(self) + def fit_transform(self, xy_list): return self.transform(xy_list) + def transform(self, xy_list): X_list = [] y_vec = None @@ -31,6 +36,7 @@ def transform(self, xy_list): X_concat = np.concatenate(X_list, axis=1) return Xy(X_concat, y_vec) + class MultibranchTestCase(unittest.TestCase): def test_multibranch(self): diff --git a/codeflare/pipelines/tests/test_utils.py b/codeflare/pipelines/tests/test_utils.py new file mode 100644 index 0000000..bb4b58d --- /dev/null +++ b/codeflare/pipelines/tests/test_utils.py @@ -0,0 +1,22 @@ +import ray +import pandas as pd +import codeflare.pipelines.Datamodel as dm +import codeflare.pipelines.utils as cfutils + + +def test_utils_split(): + ray.shutdown() + ray.init() + d = {'col1': [1, 2, 3, 4, 5, 6, 7, 8], 'col2': [3, 4, 5, 6, 7, 8, 9, 10]} + df = pd.DataFrame(d) + x_test_ref = ray.put(df) + y_test_ref = ray.put(None) + xy_ref_test = dm.XYRef(x_test_ref, y_test_ref) + + split_ref = cfutils.split.remote(xy_ref_test, 4) + xy_ref_splits = ray.get(split_ref) + assert len(xy_ref_splits) == 4 + + # get the output and assert again + X_in_ray = ray.get(xy_ref_splits[0].get_Xref()) + assert len(X_in_ray) == 2 \ No newline at end of file diff --git a/codeflare/pipelines/utils.py b/codeflare/pipelines/utils.py index 2c3f6cd..6f7ab6c 100644 --- a/codeflare/pipelines/utils.py +++ b/codeflare/pipelines/utils.py @@ -27,6 +27,8 @@ import graphviz import codeflare.pipelines.Datamodel as dm +import ray +import numpy as np def pipeline_to_graph(pipeline: dm.Pipeline) -> graphviz.Digraph: @@ -45,3 +47,36 @@ def pipeline_to_graph(pipeline: dm.Pipeline) -> graphviz.Digraph: graph.node(post_node.get_node_name()) graph.edge(pre_node.get_node_name(), post_node.get_node_name()) return graph + + +@ray.remote +def split(xy_ref: dm.XYRef, num_splits): + """ + Takes input as XYRef, splits the X and sends back the data as chunks. This is quite + useful when we have to break a raw array into smaller pieces. This current implementation + requires the input X of XYRef to be a pandas dataframe. + """ + x = ray.get(xy_ref.get_Xref()) + y = ray.get(xy_ref.get_yref()) + + xy_split_refs = [] + + # TODO: How do we split y if it is needed, lets revisit it later, will these be aligned? + x_split = np.array_split(x, num_splits) + if y is not None: + y_split = np.array_split(y, num_splits) + + # iterate over each and then insert into Plasma + for i in range(0, len(x_split)): + x_part = x_split[i] + y_part = None + + if y is not None: + y_part = y_split[i] + + x_part_ref = ray.put(x_part) + y_part_ref = ray.put(y_part) + xy_ref_part = dm.XYRef(x_part_ref, y_part_ref) + xy_split_refs.append(xy_ref_part) + + return xy_split_refs diff --git a/notebooks/AIX360 meets CodeFlare.ipynb b/notebooks/AIX360 meets CodeFlare.ipynb new file mode 100644 index 0000000..ce90030 --- /dev/null +++ b/notebooks/AIX360 meets CodeFlare.ipynb @@ -0,0 +1,616 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "ec0bac3a", + "metadata": {}, + "source": [ + "### CodeFlare meets AIX360\n", + "\n", + "This notebooks demonstrates how CodeFlare can help accelerate and parallelize some key explainers in AIX360. The level of parallelism is per record and is a simple yet powerful way of using CodeFlare with AIX360.\n", + "\n", + "We show how the explainer base class can be encapsulated as a simple `Estimator` that can then be run in a simple `TRANSFORM` mode of CodeFlare pipelines. A key utility here is the ability to \"split\" the data into smaller chunks to enable compute level parallelism. A lot of compute heavy tasks such as explainability (e.g., SHAP, LIME, etc.) can benefit from scaling, even when data is small.\n", + "\n", + "We wil first show how the famous IRIS example is implemented using AIX360 and use a shap explainer to explain the model. We then demonstrate how CodeFlare pipelines can with minimal code enable the acceleration of explanations. In this particular example, CodeFlare pipelines achieves 4x parallelism as we split the data 4-way. In general, depending on the use-case, we can accelerate it by N where N is the number of cores, assuming N is less than the number of data items :)\n", + "\n", + "Note that this notebook requires CodeFlare and AIX360 to be installed." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "74c29dc1", + "metadata": {}, + "outputs": [], + "source": [ + "%config Completer.use_jedi = False" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "50055db6", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/rganti/miniconda3/envs/aix360/lib/python3.7/site-packages/tensorflow/python/framework/dtypes.py:516: FutureWarning: Passing (type, 1) or '1type' as a synonym of type is deprecated; in a future version of numpy, it will be understood as (type, (1,)) / '(1,)type'.\n", + " _np_qint8 = np.dtype([(\"qint8\", np.int8, 1)])\n", + "/Users/rganti/miniconda3/envs/aix360/lib/python3.7/site-packages/tensorflow/python/framework/dtypes.py:517: FutureWarning: Passing (type, 1) or '1type' as a synonym of type is deprecated; in a future version of numpy, it will be understood as (type, (1,)) / '(1,)type'.\n", + " _np_quint8 = np.dtype([(\"quint8\", np.uint8, 1)])\n", + "/Users/rganti/miniconda3/envs/aix360/lib/python3.7/site-packages/tensorflow/python/framework/dtypes.py:518: FutureWarning: Passing (type, 1) or '1type' as a synonym of type is deprecated; in a future version of numpy, it will be understood as (type, (1,)) / '(1,)type'.\n", + " _np_qint16 = np.dtype([(\"qint16\", np.int16, 1)])\n", + "/Users/rganti/miniconda3/envs/aix360/lib/python3.7/site-packages/tensorflow/python/framework/dtypes.py:519: FutureWarning: Passing (type, 1) or '1type' as a synonym of type is deprecated; in a future version of numpy, it will be understood as (type, (1,)) / '(1,)type'.\n", + " _np_quint16 = np.dtype([(\"quint16\", np.uint16, 1)])\n", + "/Users/rganti/miniconda3/envs/aix360/lib/python3.7/site-packages/tensorflow/python/framework/dtypes.py:520: FutureWarning: Passing (type, 1) or '1type' as a synonym of type is deprecated; in a future version of numpy, it will be understood as (type, (1,)) / '(1,)type'.\n", + " _np_qint32 = np.dtype([(\"qint32\", np.int32, 1)])\n", + "/Users/rganti/miniconda3/envs/aix360/lib/python3.7/site-packages/tensorflow/python/framework/dtypes.py:525: FutureWarning: Passing (type, 1) or '1type' as a synonym of type is deprecated; in a future version of numpy, it will be understood as (type, (1,)) / '(1,)type'.\n", + " np_resource = np.dtype([(\"resource\", np.ubyte, 1)])\n", + "/Users/rganti/miniconda3/envs/aix360/lib/python3.7/site-packages/tensorboard/compat/tensorflow_stub/dtypes.py:541: FutureWarning: Passing (type, 1) or '1type' as a synonym of type is deprecated; in a future version of numpy, it will be understood as (type, (1,)) / '(1,)type'.\n", + " _np_qint8 = np.dtype([(\"qint8\", np.int8, 1)])\n", + "/Users/rganti/miniconda3/envs/aix360/lib/python3.7/site-packages/tensorboard/compat/tensorflow_stub/dtypes.py:542: FutureWarning: Passing (type, 1) or '1type' as a synonym of type is deprecated; in a future version of numpy, it will be understood as (type, (1,)) / '(1,)type'.\n", + " _np_quint8 = np.dtype([(\"quint8\", np.uint8, 1)])\n", + "/Users/rganti/miniconda3/envs/aix360/lib/python3.7/site-packages/tensorboard/compat/tensorflow_stub/dtypes.py:543: FutureWarning: Passing (type, 1) or '1type' as a synonym of type is deprecated; in a future version of numpy, it will be understood as (type, (1,)) / '(1,)type'.\n", + " _np_qint16 = np.dtype([(\"qint16\", np.int16, 1)])\n", + "/Users/rganti/miniconda3/envs/aix360/lib/python3.7/site-packages/tensorboard/compat/tensorflow_stub/dtypes.py:544: FutureWarning: Passing (type, 1) or '1type' as a synonym of type is deprecated; in a future version of numpy, it will be understood as (type, (1,)) / '(1,)type'.\n", + " _np_quint16 = np.dtype([(\"quint16\", np.uint16, 1)])\n", + "/Users/rganti/miniconda3/envs/aix360/lib/python3.7/site-packages/tensorboard/compat/tensorflow_stub/dtypes.py:545: FutureWarning: Passing (type, 1) or '1type' as a synonym of type is deprecated; in a future version of numpy, it will be understood as (type, (1,)) / '(1,)type'.\n", + " _np_qint32 = np.dtype([(\"qint32\", np.int32, 1)])\n", + "/Users/rganti/miniconda3/envs/aix360/lib/python3.7/site-packages/tensorboard/compat/tensorflow_stub/dtypes.py:550: FutureWarning: Passing (type, 1) or '1type' as a synonym of type is deprecated; in a future version of numpy, it will be understood as (type, (1,)) / '(1,)type'.\n", + " np_resource = np.dtype([(\"resource\", np.ubyte, 1)])\n" + ] + } + ], + "source": [ + "from __future__ import print_function\n", + "import sklearn\n", + "from sklearn.model_selection import train_test_split\n", + "import sklearn.datasets\n", + "import sklearn.ensemble\n", + "import numpy as np\n", + "import time\n", + "np.random.seed(1)\n", + "\n", + "# Importing shap KernelExplainer (aix360 style)\n", + "from aix360.algorithms.shap import KernelExplainer\n", + "\n", + "# the following import is required for access to shap plotting functions and datasets\n", + "import shap" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "2fcc36f0", + "metadata": {}, + "outputs": [], + "source": [ + "# Supress jupyter warnings if required for cleaner output\n", + "import warnings\n", + "warnings.simplefilter('ignore')" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "7794c8e2", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Accuracy = 96.66666666666667%\n" + ] + } + ], + "source": [ + "X_train,X_test,Y_train,Y_test = train_test_split(*shap.datasets.iris(), test_size=0.2, random_state=0)\n", + "\n", + "# rather than use the whole training set to estimate expected values, we could summarize with\n", + "# a set of weighted kmeans, each weighted by the number of points they represent. But this dataset\n", + "# is so small we don't worry about it\n", + "#X_train_summary = shap.kmeans(X_train, 50)\n", + "\n", + "def print_accuracy(f):\n", + " print(\"Accuracy = {0}%\".format(100*np.sum(f(X_test) == Y_test)/len(Y_test)))\n", + " time.sleep(0.5) # to let the print get out before any progress bars\n", + "\n", + "shap.initjs()\n", + "\n", + "knn = sklearn.neighbors.KNeighborsClassifier()\n", + "knn.fit(X_train, Y_train)\n", + "\n", + "print_accuracy(knn.predict)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "dbf3202f", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Using 120 background data samples could cause slower run times. Consider using shap.sample(data, K) or shap.kmeans(data, K) to summarize the background as K samples.\n" + ] + }, + { + "data": { + "text/html": [ + "\n", + "
\n", + "
\n", + " Visualization omitted, Javascript library not loaded!
\n", + " Have you run `initjs()` in this notebook? If this notebook was from another\n", + " user you must also trust this notebook (File -> Trust notebook). If you are viewing\n", + " this notebook on github the Javascript has been stripped for security. If you are using\n", + " JupyterLab this error is because a JupyterLab extension has not yet been written.\n", + "
\n", + " " + ], + "text/plain": [ + "" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "shapexplainer = KernelExplainer(knn.predict_proba, X_train)\n", + "# aix360 style for explaining input instances\n", + "shap_values = shapexplainer.explain_instance(X_train.iloc[0,:])\n", + "shap.force_plot(shapexplainer.explainer.expected_value[0], shap_values[0], X_train.iloc[0,:])" + ] + }, + { + "cell_type": "markdown", + "id": "480d98d2", + "metadata": {}, + "source": [ + "### Simple timing of single threaded code\n", + "We use `X_train` as it has a bit more data to time the execution using simple %time from Jupyter. A wall-clock time of 3.55s is observed." + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "46de8459", + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "0c56a95e21ad4ca280427dbecd1d2578", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + " 0%| | 0/120 [00:00,\n", + " ,\n", + " ,\n", + " ]" + ] + }, + "execution_count": 37, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ray.get(split_ref_test)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.10" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}