Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a split utility to support data splitting -- enables key AIX36… #35

Merged
merged 1 commit into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions codeflare.egg-info/PKG-INFO
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
Metadata-Version: 1.2
Name: codeflare
Version: 0.1.2.dev0
Summary: Codeflare pipelines
Home-page: https://github.com/project-codeflare/codeflare
Author: CodeFlare team
Author-email: [email protected]
License: Apache v2.0
Project-URL: Bug Reports, https://github.com/project-codeflare/codeflare/issues
Project-URL: Source, https://github.com/project-codeflare/codeflare
Description: UNKNOWN
Keywords: ray pipelines
Platform: UNKNOWN
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Operating System :: OS Independent
Classifier: Topic :: System :: Distributed Computing
Requires-Python: >=3.7
26 changes: 26 additions & 0 deletions codeflare.egg-info/SOURCES.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
README.md
setup.cfg
setup.py
codeflare/__init__.py
codeflare/_version.py
codeflare.egg-info/PKG-INFO
codeflare.egg-info/SOURCES.txt
codeflare.egg-info/dependency_links.txt
codeflare.egg-info/requires.txt
codeflare.egg-info/top_level.txt
codeflare/pipelines/Datamodel.py
codeflare/pipelines/Exceptions.py
codeflare/pipelines/Runtime.py
codeflare/pipelines/__init__.py
codeflare/pipelines/utils.py
codeflare/pipelines/tests/__init__.py
codeflare/pipelines/tests/test_Datamodel.py
codeflare/pipelines/tests/test_and.py
codeflare/pipelines/tests/test_helper.py
codeflare/pipelines/tests/test_multibranch.py
codeflare/pipelines/tests/test_or.py
codeflare/pipelines/tests/test_pipeline_predict.py
codeflare/pipelines/tests/test_runtime.py
codeflare/pipelines/tests/test_save_load.py
codeflare/pipelines/tests/test_singleton.py
codeflare/pipelines/tests/test_utils.py
1 change: 1 addition & 0 deletions codeflare.egg-info/dependency_links.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

8 changes: 8 additions & 0 deletions codeflare.egg-info/requires.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
ray[default,k8s,serve]>=1.3.0
setuptools>=52.0.0
sklearn>=0.0
scikit-learn>=0.24.1
pandas>=1.2.4
numpy>=1.18.5
pickle5>=0.0.11
graphviz>=0.16
1 change: 1 addition & 0 deletions codeflare.egg-info/top_level.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
codeflare
1 change: 1 addition & 0 deletions codeflare/pipelines/Runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def execute_or_node_remote(node: dm.EstimatorNode, mode: ExecutionType, xy_ref:
result = dm.XYRef(res_Xref, xy_ref.get_yref(), prev_node_ptr, prev_node_ptr, [xy_ref])
return result


def execute_or_node(node, pre_edges, edge_args, post_edges, mode: ExecutionType, is_outputNode):
"""
Inner method that executes the estimator node parallelizing at the level of input objects. This defines the
Expand Down
6 changes: 6 additions & 0 deletions codeflare/pipelines/tests/test_Datamodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,20 @@
from codeflare.pipelines.Datamodel import Xy
from codeflare.pipelines.Runtime import ExecutionType


class FeatureUnion(dm.AndEstimator):
def __init__(self):
pass

def get_estimator_type(self):
return 'transform'

def clone(self):
return base.clone(self)

def fit_transform(self, xy_list):
return self.transform(xy_list)

def transform(self, xy_list):
X_list = []
y_vec = None
Expand All @@ -31,6 +36,7 @@ def transform(self, xy_list):
X_concat = np.concatenate(X_list, axis=1)
return Xy(X_concat, y_vec)


class MultibranchTestCase(unittest.TestCase):

def test_multibranch(self):
Expand Down
22 changes: 22 additions & 0 deletions codeflare/pipelines/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import ray
import pandas as pd
import codeflare.pipelines.Datamodel as dm
import codeflare.pipelines.utils as cfutils


def test_utils_split():
ray.shutdown()
ray.init()
d = {'col1': [1, 2, 3, 4, 5, 6, 7, 8], 'col2': [3, 4, 5, 6, 7, 8, 9, 10]}
df = pd.DataFrame(d)
x_test_ref = ray.put(df)
y_test_ref = ray.put(None)
xy_ref_test = dm.XYRef(x_test_ref, y_test_ref)

split_ref = cfutils.split.remote(xy_ref_test, 4)
xy_ref_splits = ray.get(split_ref)
assert len(xy_ref_splits) == 4

# get the output and assert again
X_in_ray = ray.get(xy_ref_splits[0].get_Xref())
assert len(X_in_ray) == 2
35 changes: 35 additions & 0 deletions codeflare/pipelines/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import graphviz
import codeflare.pipelines.Datamodel as dm
import ray
import numpy as np


def pipeline_to_graph(pipeline: dm.Pipeline) -> graphviz.Digraph:
Expand All @@ -45,3 +47,36 @@ def pipeline_to_graph(pipeline: dm.Pipeline) -> graphviz.Digraph:
graph.node(post_node.get_node_name())
graph.edge(pre_node.get_node_name(), post_node.get_node_name())
return graph


@ray.remote
def split(xy_ref: dm.XYRef, num_splits):
"""
Takes input as XYRef, splits the X and sends back the data as chunks. This is quite
useful when we have to break a raw array into smaller pieces. This current implementation
requires the input X of XYRef to be a pandas dataframe.
"""
x = ray.get(xy_ref.get_Xref())
y = ray.get(xy_ref.get_yref())

xy_split_refs = []

# TODO: How do we split y if it is needed, lets revisit it later, will these be aligned?
x_split = np.array_split(x, num_splits)
if y is not None:
y_split = np.array_split(y, num_splits)

# iterate over each and then insert into Plasma
for i in range(0, len(x_split)):
x_part = x_split[i]
y_part = None

if y is not None:
y_part = y_split[i]

x_part_ref = ray.put(x_part)
y_part_ref = ray.put(y_part)
xy_ref_part = dm.XYRef(x_part_ref, y_part_ref)
xy_split_refs.append(xy_ref_part)

return xy_split_refs
616 changes: 616 additions & 0 deletions notebooks/AIX360 meets CodeFlare.ipynb

Large diffs are not rendered by default.