-
Notifications
You must be signed in to change notification settings - Fork 235
Expand file tree
/
Copy pathpipeline.py
More file actions
163 lines (124 loc) · 5.34 KB
/
pipeline.py
File metadata and controls
163 lines (124 loc) · 5.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# Copyright 2020-2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""Pipeline module."""
import numpy as np
class Transformer:
"""Base class for data transformation.
This class defines the basic structure of a data transformer. It should be
subclassed when implementing new types of data transformations.
"""
def forward(self, data, **kwargs):
"""Forward pass data transformation.
Implement the data transformation.
This method should be overridden by all subclasses.
Args:
data: The data to be transformed.
**kwargs: Additional parameters to pass to the function
Returns:
transformed_data: The transformed data.
metadata: The metadata for the transformation.
"""
raise NotImplementedError
def backward(self, data, metadata, **kwargs):
"""Backward pass data transformation.
Implement the data transformation needed when going the opposite
direction to the forward method.
This method should be overridden by all subclasses.
Args:
data: The transformed data.
metadata: The metadata for the transformation.
**kwargs: Additional keyword arguments for the transformation.
Returns:
transformed_data: The original data before the transformation.
"""
raise NotImplementedError
class NumpyArrayToBytes(Transformer):
"""Transformer for converting generic Numpy arrays to bytes."""
def __init__(self):
self.lossy = False
def forward(self, data: np.ndarray, **kwargs):
"""Convert a Numpy array to bytes.
Args:
data: The Numpy array to be converted.
**kwargs: Additional keyword arguments for the conversion.
Returns:
data_bytes: The data converted to bytes.
metadata: The metadata for the conversion.
"""
array_shape = data.shape
metadata = {"int_list": list(array_shape), "dtype": str(data.dtype)}
data_bytes = data.tobytes(order="C")
return data_bytes, metadata
def backward(self, data, metadata, **kwargs):
"""Convert bytes back to a Numpy array.
Args:
data: The data in bytes.
metadata: The metadata for the conversion.
Returns:
The data converted back to a Numpy array.
"""
array_shape = tuple(metadata["int_list"])
dtype = np.dtype(metadata["dtype"])
flat_array = np.frombuffer(data, dtype=dtype)
return np.reshape(flat_array, newshape=array_shape, order="C")
class TransformationPipeline:
"""Data Transformer Pipeline Class.
This class is a pipeline of transformers that transform data in a
sequential manner.
A sequential pipeline to transform (e.x. compress) data (e.x. layer of
model_weights) as well as return metadata (if needed) for the
reconstruction process carried out by the backward method.
Attributes:
transformers (list): The list of transformers in the pipeline.
"""
def __init__(self, transformers, **kwargs):
"""Initialize TransformationPipeline.
Args:
transformers (list): The list of transformers in the pipeline.
**kwargs: Additional keyword arguments for the pipeline.
"""
self.transformers = transformers
def forward(self, data, **kwargs):
"""Forward pass of pipeline data transformer.
Args:
data: The data to be transformed.
**kwargs: Additional keyword arguments for the transformation.
Returns:
data: The transformed data.
transformer_metadata: The metadata for the transformation.
"""
# dataformat::numpy::float.32
# model proto:: a collection of tensor_dict proto
# protobuff::-> a layer of weights
# input::tensor_dict:{"layer1":np.array(float32, [128,3,28,28]),
# "layer2": np.array()}
# output::meta data::numpy::int array
# (data, transformer_metadata)::(float32, dictionary o
# key+float32 values)
# input:: numpy_data (float32)
# input:: (data(bytes), transformer_metadata_list::a list of dictionary
# from int to float)
transformer_metadata = []
for transformer in self.transformers:
data, metadata = transformer.forward(data=data, **kwargs)
transformer_metadata.append(metadata)
return data, transformer_metadata
def backward(self, data, transformer_metadata, **kwargs):
"""Backward pass of pipeline data transformer.
Args:
data: The transformed data.
transformer_metadata: The metadata for the transformation.
**kwargs: Additional keyword arguments for the transformation.
Returns:
The original data before the transformation.
"""
for transformer in self.transformers[::-1]:
data = transformer.backward(data=data, metadata=transformer_metadata.pop(), **kwargs)
return data
def is_lossy(self):
"""If any of the transformers are lossy, then the pipeline is lossy.
Returns:
True if any of the transformers in the pipeline are lossy, False
otherwise.
"""
return any(transformer.lossy for transformer in self.transformers)