-
Notifications
You must be signed in to change notification settings - Fork 115
ElasticDL Serving Solution Explore
Model serving is an essential part in an end-to-end machine learning lifecycle. Publishing the trained model as a service in production can make it valuable in the real world.
At the current stage, ElasticDL focuses on the training part. We don't have our own or can't reuse any existed serving infrastructure to serve our trained models. (Why?) Our target is to figure out the serving solution of ElasticDL.
Store the ElasticDL model in the SavedModel format.
SavedModel is the universal serialization format for tensorflow models. It's language neutral and can be loaded by multiple frameworks (such as TFServing, TFLite, TensorFlow.js and so on). We choose to store the ElaticDL model into SavedModel format. In this way, we can leverage various mature solutions to serving our model in different scenarios.
The model size varies from several kilobytes to several terabytes. We divide the model size into two categories: Small or medium size and large size. The small or medium size model can be loaded by a process, and the latter can not fit in a single process. Training and serving strategies will be different between these two cases. Please check the following table:
Master Central Storage | AllReduce | Parameter Server | |
---|---|---|---|
Small or Medium Size Model | SavedModel | SavedModel | SavedModel |
Large Size Model | N/A | N/A | Distributed Parameter Server for Serving |
Distributed Parameter Server for Serving
This is for the case of large size model. We partition the model variables into multiple shards, store them in distributed parameter server for serving. In the serving stage, the inference engine will execute the serving graph, query the variable values from the distributed parameter server as needed and finish the calculation.
Parameter servers differs between training and serving:
- The latency and SLA requirement is higher for serving.
- The serving ps instance count is in proportion to the QPS of the inference traffic. The training ps has fixed instance count.
- The serving ps only provide the function of static embedding table look up. The training ps need aggregate the gradient and update the variables. The serving ps is simpler. We will consider the distributed serving parameter server in a separate design in the next step.
-
How to save the model trained with parameter server as SavedModel?
For the model of large size, we are designing parameter server to restore the variables and embeddings. Currently we use Redis as a temporary solution. In our model definition, we use ElasticDL.Embedding instead of tf.keras.layers.Embedding to interact with our parameter server. ElasticDL.Embedding use tf.py_function to invoke Rpc to call the parameter server.
But in the stage of saving model, the customized ElasticDL.Embedding layer is not mapped to any native TensorFlow op and can't be saved into SavedModel. The embedding vectors stored in parameter server are lost. The embedding look up can't work in the serving process.
1. Save model with feature columns to serve.
import time
import numpy as np
import tensorflow as tf
def get_feature_columns():
age = tf.feature_column.numeric_column("age", dtype=tf.int64)
education = tf.feature_column.categorical_column_with_hash_bucket(
'education', hash_bucket_size=4)
education_one_hot = tf.feature_column.indicator_column(education)
return [age, education_one_hot]
def get_input_layer():
input_layers = {}
input_layers['age'] = tf.keras.layers.Input(name='age', shape=(1,), dtype=tf.int64)
input_layers['education'] = tf.keras.layers.Input(name='education', shape=(1,), dtype=tf.string)
return input_layers
def custom_model(feature_columns):
input_layers = get_input_layer()
dense_feature = tf.keras.layers.DenseFeatures(feature_columns=feat_cols)(input_layers)
dense = tf.keras.layers.Dense(10, activation='relu')(dense_feature)
dense = tf.keras.layers.Dense(1, activation='sigmoid')(dense)
return tf.keras.models.Model(inputs=input_layers, outputs=dense)
feat_cols = get_feature_columns()
model = custom_model(feat_cols)
output = model.call({'age':tf.constant([[10],[16]]),
'education':tf.constant([['Bachelors'],['Master']])})
print(output)
export_dir = './saved_models/feature_columns/{}'.format(int(time.time()))
tf.saved_model.save(model, export_dir=export_dir)
the outputs of model.call is
tf.Tensor(
[[0.6658912 ]
[0.73223007]], shape=(2, 1), dtype=float32)
Then we test the outputs of tf-serving with the model.
>> curl -d '{"instances": [{"age":[10],"education":["Bachelors"]}]}' -X POST http://localhost:8501/v1/models/model:predict
{
"predictions": [[0.665891171]
]
}
2. Customize an embedding layer to train with ElasticDL.Embedding and export model using SavedModel format with Keras.Layers.Embedding.
To verify the feasibility, we define a custom layer like this:
import os
import tensorflow as tf
import numpy as np
from tensorflow.keras.layers import Input, Embedding, Dense, Flatten
from elasticdl.python.elasticdl.layers.embedding import Embedding as elasticDL_Embedding
from tensorflow.keras import layers
from tensorflow.python.keras.utils import tf_utils
class TestCustomEmbedding(layers.Layer):
def __init__(self,
input_dim,
output_dim,
**kwargs
):
super(TestCustomEmbedding, self).__init__(**kwargs)
self.input_dim = input_dim
self.output_dim = output_dim
self.edl_embedding_layer = elasticDL_Embedding(self.output_dim)
self.keras_embedding_layer = Embedding(self.input_dim, self.output_dim)
def call(self, inputs):
is_exporting_saved_model = s.getenv('SAVED_MODEL')=='True'
is_elastic = os.getenv('FRAMEWORK') == 'ElasticDL'
def _true_fn(inputs):
_replace_weights_with_edl()
out = self.keras_embedding_layer(inputs)
return out
def _false_fn(inputs):
if is_elastic:
out = self.edl_embedding_layer(inputs)
else:
out = self.keras_embedding_layer(inputs)
return out
def _replace_weights_with_edl():
import pandas as pd
var_values = pd.read_csv('variable.csv')
custom_param = var_values.values
for var in self.keras_embedding_layer.trainable_variables:
var.assign(custom_param)
return tf_utils.smart_cond(is_exporting_saved_model,
lambda: _true_fn(inputs),
lambda: _false_fn(inputs)
)
In the TestCustomEmbedding, the variables in Keras.Embedding instance will be replaced by values in variable.csv. The variable.csv mocks the variable values in ElasticDl.Embedding instance which can be gotten by grpc.
Then, we will define a Keras model with TestCustomEmbedding like as below:
inputs = Input(shape=(10,))
embedding = TestCustomEmbedding(10,4)(inputs)
flatten = Flatten()(embedding)
output = Dense(1, activation='sigmoid')(flatten)
model = tf.keras.Model(inputs=[inputs], outputs=[output])
os.environ['SAVED_MODEL'] = 'False'
input_array = tf.constant([[1,2,3,4,1,1,1,1,1,0]])
output = model.call(input_array, training=True)
print('training output : ', output)
output = model.call(input_array)
print('predict output : ',output)
The output is:
training output : tf.Tensor([[0.48767245]], shape=(1, 1), dtype=float32)
predict output : tf.Tensor([[0.48767245]], shape=(1, 1), dtype=float32)
The we set SAVE_MODEL to True and view the model output with the same input.
# save model in saved_model
tf.saved_model.save(model, "./tmp/custom_embedding/123")
os.environ['SAVED_MODEL'] = 'True'
output = model.call(input_array)
print('predict output in saved_model : ', output)
The output
predict output in saved_model : tf.Tensor([[0.99985003]], shape=(1, 1), dtype=float32)
We publish a service with the SavedModel using tf-serving. And then we send request to the server with the same input values.
curl -d '{"instances": [[1,2,3,4,1,1,1,1,1,0]]}' -X POST http://localhost:8501/v1/models/model:predict
The response
{
"predictions": [[0.999850035]]
}
So, we have verified that the custom layer can use ElasticDL.Embedding during training and use Keras.Embedding with variables in ElasticDL.Embedding to save model with SavedModel format.
3. For the Sequential model and the Model class used with the functional API, we can clone a new model by keras.models.clone_model and replace keras.layers.Embedding with Elastic.Embedding.
import tensorflow as tf
from tensorflow import keras
from elasticdl.python.elasticdl.layers.embedding import Embedding as edl_Embedding
from tensorflow.keras.layers import Input, Embedding, Flatten, Dense
def clone_function(layer):
if isinstance(layer, keras.layers.Embedding):
print(layer.output_dim)
output_dim = layer.output_dim
edl_layer = edl_Embedding(output_dim)
return edl_layer
return layer
inputs = Input(shape=(10,))
embedding = Embedding(10,4)(inputs)
flatten = Flatten()(embedding)
output = Dense(1, activation='sigmoid')(flatten)
model = tf.keras.Model(inputs=[inputs], outputs=[output])
new_model = keras.models.clone_model(model, clone_function=clone_function)
Layers in the model:
for layer in model.layers:
print(layer)
<tensorflow.python.keras.engine.input_layer.InputLayer object at 0x13ea36b70>
<tensorflow.python.keras.layers.embeddings.Embedding object at 0x13ea36ac8>
<tensorflow.python.keras.layers.core.Flatten object at 0x13ea36278>
<tensorflow.python.keras.layers.core.Dense object at 0x13ea36fd0>
Layers in the new model
for layer in new_model.layers:
print(layer)
<tensorflow.python.keras.engine.input_layer.InputLayer object at 0x13e74d2b0>
<elasticdl.python.elasticdl.layers.embedding.Embedding object at 0x13ea7e198>
<tensorflow.python.keras.layers.core.Flatten object at 0x13ea36278>
<tensorflow.python.keras.layers.core.Dense object at 0x13ea36fd0>
As shown, we succeed in replacing the Keras.layers.Embedding with ElasticDL.Embedding in the new model. So, we can use the new model to train in ElasticDL and use the origin model to export SavedModel with embedding variables of ElasticDL.Embedding. But, the method will not work if the custom Layer class to make embedding.
4. For subclass model, we can replace the Keras.layers.Embedding attribute with Elastic.Embedding.
import tensorflow as tf
from tensorflow import keras
from tensorflow.python.keras.engine import network
from tensorflow.keras.layers import Input, Embedding, Flatten, Dense
from elasticdl.python.elasticdl.layers.embedding import Embedding as edl_Embedding
class MyModel(tf.keras.Model):
def __init__(self):
super(MyModel, self).__init__()
self.embedding = Embedding(10,4)
self.dense1 = tf.keras.layers.Dense(4, activation=tf.nn.relu)
self.dense2 = tf.keras.layers.Dense(5, activation=tf.nn.softmax)
def call(self, inputs):
embedding = self.embedding(inputs)
x = self.dense1(embedding)
return self.dense2(x)
model = MyModel()
Layers in the origin model:
for layer in model.layers:
print(layer)
<tensorflow.python.keras.layers.embeddings.Embedding object at 0x13b1e6b70>
<tensorflow.python.keras.layers.core.Dense object at 0x13b1e6da0>
<tensorflow.python.keras.layers.core.Dense object at 0x13b1d76a0>
Now, we replace model.embedding with Elastic.Embedding.
for attr_name, attr_value in model.__dict__.items():
if isinstance(attr_value, keras.layers.Embedding):
setattr(model, attr_name, edl_Embedding(attr_value.output_dim))
for layer in model.layers:
print(layer)
<tensorflow.python.keras.layers.core.Dense object at 0x13b1e6da0>
<tensorflow.python.keras.layers.core.Dense object at 0x13b1d76a0>
<elasticdl.python.elasticdl.layers.embedding.Embedding object at 0x13b1d2908>
Like the 3nd solution, the solution will not work for the custom layer class to make embedding.
The Keras sequential and subclass model instance does not have inputs and outputs when the model instance is created. Keras will build the model with inputs and outputs when the model call the fit
. So, SavedModel will generate input and output signatures for tf-serving based those inputs and outputs. However, ElasticDL does not call the fit
API and directly call call
API to execute forward-pass computation. The model does not have inputs and outputs after completing training. We should add inputs and outputs to the model before using SavedModel to save.
We make experiments for sequential and subclass model to verify that we can add inputs and outputs to the model by dataset.
Firstly, we define function to get dataset and feature columns.
import numpy as np
import tensorflow as tf
import pandas as pd
def get_dataset():
y_labels = np.array([1,1,0,0,1])
x_data = pd.DataFrame({'age':[14,56,78,38,80],
'education':['Bachelors','Master','Some-college','Bachelors','Master']})
dataset = tf.data.Dataset.from_tensor_slices((dict(x_data), y_labels))
dataset = dataset.shuffle(len(x_data)).batch(4)
return dataset
def get_feature_columns():
age = tf.feature_column.numeric_column("age", dtype=tf.int64)
education = tf.feature_column.categorical_column_with_hash_bucket(
'education', hash_bucket_size=4)
education_one_hot = tf.feature_column.indicator_column(education)
return [age, education_one_hot]
Then, we verify a solution for sequential model.
feat_cols = get_feature_columns()
dataset = get_dataset()
model = tf.keras.Sequential([
tf.keras.layers.DenseFeatures(feature_columns=feat_cols),
tf.keras.layers.Dense(10, activation='relu'),
tf.keras.layers.Dense(10, activation='relu'),
tf.keras.layers.Dense(1, activation='sigmoid')
])
print("model inputs: ", model.inputs)
model inputs: []
model outputs: []
Then, we build the model with dataset by _build_model_with_inputs
.
model._build_model_with_inputs(inputs=dataset, targets=None)
print("model inputs: ", model.inputs)
print("model outputs: ", model.outputs)
model inputs: {'age': <tf.Tensor 'age_5:0' shape=(None, 1) dtype=int32>, 'education': <tf.Tensor 'education_3:0' shape=(None, 1) dtype=string>}
model outputs: [<tf.Tensor 'sequential_3/Identity:0' shape=(None, 1) dtype=float32>]
Finally, we verify a solution for subclass model.
import tensorflow as tf
class MyModel(tf.keras.Model):
def __init__(self, feature_columns):
super(MyModel, self).__init__()
self.dense_feature = tf.keras.layers.DenseFeatures(feature_columns=feature_columns)
self.dense1 = tf.keras.layers.Dense(4, activation=tf.nn.relu)
self.dense2 = tf.keras.layers.Dense(1, activation=tf.nn.softmax)
def call(self, inputs):
dense_feature = self.dense_feature(inputs)
dense1 = self.dense1(dense_feature)
dense2 = self.dense2(dense1)
return dense2
feat_cols = get_feature_columns()
model = MyModel(feat_cols)
print("model inputs: ", model.inputs)
print("model outputs:", model.outputs)
model inputs: []
model outputs: []
model._build_model_with_inputs(inputs=dataset, targets=None)
print("model inputs: ", model.inputs)
print("model outputs:", model.outputs)
model inputs: {'age': <tf.Tensor 'age_6:0' shape=(None, 1) dtype=int32>, 'education': <tf.Tensor 'education_4:0' shape=(None, 1) dtype=string>}
model outputs: [<tf.Tensor 'my_model_4/Identity:0' shape=(None, 1) dtype=float32>]
- Is the following scenario possible? User writes tf.keras.layer.Embedding in the model definition. While running the model in ElasticDL, if PS is turned on, the keras native Embedding layer is replaced with ElasticDL.Embedding layer to interact with parameter server. In this way, user can write the model using TensorFlow native Api, but can execute in distributed way in ElasticDL. It's more user friendly.