Skip to content

Commit a2b290a

Browse files
Merge pull request #35 from project-codeflare/splitter
Adding a split utility to support data splitting -- enables key AIX36…
2 parents 131a772 + f0c1f95 commit a2b290a

File tree

10 files changed

+738
-0
lines changed

10 files changed

+738
-0
lines changed

Diff for: codeflare.egg-info/PKG-INFO

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
Metadata-Version: 1.2
2+
Name: codeflare
3+
Version: 0.1.2.dev0
4+
Summary: Codeflare pipelines
5+
Home-page: https://github.com/project-codeflare/codeflare
6+
Author: CodeFlare team
7+
Author-email: [email protected]
8+
License: Apache v2.0
9+
Project-URL: Bug Reports, https://github.com/project-codeflare/codeflare/issues
10+
Project-URL: Source, https://github.com/project-codeflare/codeflare
11+
Description: UNKNOWN
12+
Keywords: ray pipelines
13+
Platform: UNKNOWN
14+
Classifier: Development Status :: 4 - Beta
15+
Classifier: Intended Audience :: Developers
16+
Classifier: License :: OSI Approved :: Apache Software License
17+
Classifier: Programming Language :: Python
18+
Classifier: Programming Language :: Python :: 3
19+
Classifier: Programming Language :: Python :: 3 :: Only
20+
Classifier: Operating System :: OS Independent
21+
Classifier: Topic :: System :: Distributed Computing
22+
Requires-Python: >=3.7

Diff for: codeflare.egg-info/SOURCES.txt

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
README.md
2+
setup.cfg
3+
setup.py
4+
codeflare/__init__.py
5+
codeflare/_version.py
6+
codeflare.egg-info/PKG-INFO
7+
codeflare.egg-info/SOURCES.txt
8+
codeflare.egg-info/dependency_links.txt
9+
codeflare.egg-info/requires.txt
10+
codeflare.egg-info/top_level.txt
11+
codeflare/pipelines/Datamodel.py
12+
codeflare/pipelines/Exceptions.py
13+
codeflare/pipelines/Runtime.py
14+
codeflare/pipelines/__init__.py
15+
codeflare/pipelines/utils.py
16+
codeflare/pipelines/tests/__init__.py
17+
codeflare/pipelines/tests/test_Datamodel.py
18+
codeflare/pipelines/tests/test_and.py
19+
codeflare/pipelines/tests/test_helper.py
20+
codeflare/pipelines/tests/test_multibranch.py
21+
codeflare/pipelines/tests/test_or.py
22+
codeflare/pipelines/tests/test_pipeline_predict.py
23+
codeflare/pipelines/tests/test_runtime.py
24+
codeflare/pipelines/tests/test_save_load.py
25+
codeflare/pipelines/tests/test_singleton.py
26+
codeflare/pipelines/tests/test_utils.py

Diff for: codeflare.egg-info/dependency_links.txt

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+

Diff for: codeflare.egg-info/requires.txt

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
ray[default,k8s,serve]>=1.3.0
2+
setuptools>=52.0.0
3+
sklearn>=0.0
4+
scikit-learn>=0.24.1
5+
pandas>=1.2.4
6+
numpy>=1.18.5
7+
pickle5>=0.0.11
8+
graphviz>=0.16

Diff for: codeflare.egg-info/top_level.txt

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
codeflare

Diff for: codeflare/pipelines/Runtime.py

+1
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ def execute_or_node_remote(node: dm.EstimatorNode, mode: ExecutionType, xy_ref:
146146
result = dm.XYRef(res_Xref, xy_ref.get_yref(), prev_node_ptr, prev_node_ptr, [xy_ref])
147147
return result
148148

149+
149150
def execute_or_node(node, pre_edges, edge_args, post_edges, mode: ExecutionType, is_outputNode):
150151
"""
151152
Inner method that executes the estimator node parallelizing at the level of input objects. This defines the

Diff for: codeflare/pipelines/tests/test_Datamodel.py

+6
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,20 @@
1313
from codeflare.pipelines.Datamodel import Xy
1414
from codeflare.pipelines.Runtime import ExecutionType
1515

16+
1617
class FeatureUnion(dm.AndEstimator):
1718
def __init__(self):
1819
pass
20+
1921
def get_estimator_type(self):
2022
return 'transform'
23+
2124
def clone(self):
2225
return base.clone(self)
26+
2327
def fit_transform(self, xy_list):
2428
return self.transform(xy_list)
29+
2530
def transform(self, xy_list):
2631
X_list = []
2732
y_vec = None
@@ -31,6 +36,7 @@ def transform(self, xy_list):
3136
X_concat = np.concatenate(X_list, axis=1)
3237
return Xy(X_concat, y_vec)
3338

39+
3440
class MultibranchTestCase(unittest.TestCase):
3541

3642
def test_multibranch(self):

Diff for: codeflare/pipelines/tests/test_utils.py

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import ray
2+
import pandas as pd
3+
import codeflare.pipelines.Datamodel as dm
4+
import codeflare.pipelines.utils as cfutils
5+
6+
7+
def test_utils_split():
8+
ray.shutdown()
9+
ray.init()
10+
d = {'col1': [1, 2, 3, 4, 5, 6, 7, 8], 'col2': [3, 4, 5, 6, 7, 8, 9, 10]}
11+
df = pd.DataFrame(d)
12+
x_test_ref = ray.put(df)
13+
y_test_ref = ray.put(None)
14+
xy_ref_test = dm.XYRef(x_test_ref, y_test_ref)
15+
16+
split_ref = cfutils.split.remote(xy_ref_test, 4)
17+
xy_ref_splits = ray.get(split_ref)
18+
assert len(xy_ref_splits) == 4
19+
20+
# get the output and assert again
21+
X_in_ray = ray.get(xy_ref_splits[0].get_Xref())
22+
assert len(X_in_ray) == 2

Diff for: codeflare/pipelines/utils.py

+35
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
import graphviz
2929
import codeflare.pipelines.Datamodel as dm
30+
import ray
31+
import numpy as np
3032

3133

3234
def pipeline_to_graph(pipeline: dm.Pipeline) -> graphviz.Digraph:
@@ -45,3 +47,36 @@ def pipeline_to_graph(pipeline: dm.Pipeline) -> graphviz.Digraph:
4547
graph.node(post_node.get_node_name())
4648
graph.edge(pre_node.get_node_name(), post_node.get_node_name())
4749
return graph
50+
51+
52+
@ray.remote
53+
def split(xy_ref: dm.XYRef, num_splits):
54+
"""
55+
Takes input as XYRef, splits the X and sends back the data as chunks. This is quite
56+
useful when we have to break a raw array into smaller pieces. This current implementation
57+
requires the input X of XYRef to be a pandas dataframe.
58+
"""
59+
x = ray.get(xy_ref.get_Xref())
60+
y = ray.get(xy_ref.get_yref())
61+
62+
xy_split_refs = []
63+
64+
# TODO: How do we split y if it is needed, lets revisit it later, will these be aligned?
65+
x_split = np.array_split(x, num_splits)
66+
if y is not None:
67+
y_split = np.array_split(y, num_splits)
68+
69+
# iterate over each and then insert into Plasma
70+
for i in range(0, len(x_split)):
71+
x_part = x_split[i]
72+
y_part = None
73+
74+
if y is not None:
75+
y_part = y_split[i]
76+
77+
x_part_ref = ray.put(x_part)
78+
y_part_ref = ray.put(y_part)
79+
xy_ref_part = dm.XYRef(x_part_ref, y_part_ref)
80+
xy_split_refs.append(xy_ref_part)
81+
82+
return xy_split_refs

Diff for: notebooks/AIX360 meets CodeFlare.ipynb

+616
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)