前一篇講過環境的部署篇,這一次就講講從代碼角度如何導出pb模型,如何進行服務調用。python
部署完docker後,若是是cpu環境,能夠直接拉取tensorflow/serving,若是是GPU環境則麻煩點,具體參考前一篇,這裏就再也不贅述了。git
cpu版本的能夠直接拉取tensorflow/serving,docker會自動拉取latest版本:github
docker pull tensorflow/serving
若是想要指定tensorflow的版本,能夠去這裏查看:https://hub.docker.com/r/tensorflow/serving/tags/算法
好比我須要的是1.12.0版本的tf,那麼也能夠拉取指定的版本:docker
docker pull tensorflow/serving:1.12.0
拉取完鏡像,須要下載一個hello world的程序代碼。api
mkdir -p /tmp/tfserving cd /tmp/tfserving git clone https://github.com/tensorflow/serving
tensorflow/serving的github中有對應的測試模型,模型其實就是 y = 0.5 * x + 2。即輸入一個數,輸出是對應的y。app
運行下面的命令,在docker中部署服務:curl
docker run -p 8501:8501 --mount type=bind,source=/tmp/serving/tensorflow_serving/servables/tensorflow/testdata/saved_model_half_plus_two_cpu,target=/models/half_plus_two -e MODEL_NAME=half_plus_two -t tensorflow/serving &
上面的命令中,把/tmp/serving/tensorflow_serving/servables/tensorflow/testdata/saved_model_half_plus_two_cpu
路徑掛載到/models/half_plus_two
,這樣tensorflow_serving就能夠加載models下的模型了,而後開放內部8501的http接口。tcp
執行docker ps
查看服務列表:ide
➜ ~ docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 7decb4286057 tensorflow/serving "/usr/bin/tf_serving…" 7 seconds ago Up 6 seconds 8500/tcp, 0.0.0.0:8501->8501/tcp eager_dewdney
發送一個http請求測試一下:
curl -d '{"instances": [1.0, 2.0, 5.0]}' -X POST http://localhost:8501/v1/models/half_plus_two:predict { "predictions": [2.5, 3.0, 4.5 ] }%
因爲前面的例子,serving工程下只有pb模型,沒有模型的訓練和導出,所以看不出其中的門道。這一部分就直接基於手寫體識別的例子,展現一下如何從tensorflow訓練代碼導出模型,又如何經過grpc服務進行模型的調用。
訓練和導出:
#! /usr/bin/env python """ 訓練並導出Softmax迴歸模型,使用SaveModel導出訓練模型並添加簽名。 """ from __future__ import print_function import os import sys # This is a placeholder for a Google-internal import. import tensorflow as tf import ssl ssl._create_default_https_context = ssl._create_unverified_context import basic.mnist_input_data as mnist_input_data # 定義模型參數 tf.app.flags.DEFINE_integer('training_iteration', 10, 'number of training iterations.') tf.app.flags.DEFINE_integer('model_version', 2, 'version number of the model.') tf.app.flags.DEFINE_string('work_dir', './tmp', 'Working directory.') FLAGS = tf.app.flags.FLAGS def main(_): # 參數校驗 # if len(sys.argv) < 2 or sys.argv[-1].startswith('-'): # print('Usage: mnist_saved_model.py [--training_iteration=x] ' # '[--model_version=y] export_dir') # sys.exit(-1) # if FLAGS.training_iteration <= 0: # print('Please specify a positive value for training iteration.') # sys.exit(-1) # if FLAGS.model_version <= 0: # print('Please specify a positive value for version number.') # sys.exit(-1) # Train model print('Training model...') mnist = mnist_input_data.read_data_sets(FLAGS.work_dir, one_hot=True) sess = tf.InteractiveSession() serialized_tf_example = tf.placeholder(tf.string, name='tf_example') feature_configs = {'x': tf.FixedLenFeature(shape=[784], dtype=tf.float32), } tf_example = tf.parse_example(serialized_tf_example, feature_configs) x = tf.identity(tf_example['x'], name='x') # use tf.identity() to assign name y_ = tf.placeholder('float', shape=[None, 10]) w = tf.Variable(tf.zeros([784, 10])) b = tf.Variable(tf.zeros([10])) sess.run(tf.global_variables_initializer()) y = tf.nn.softmax(tf.matmul(x, w) + b, name='y') cross_entropy = -tf.reduce_sum(y_ * tf.log(y)) train_step = tf.train.GradientDescentOptimizer(0.01).minimize(cross_entropy) values, indices = tf.nn.top_k(y, 10) table = tf.contrib.lookup.index_to_string_table_from_tensor( tf.constant([str(i) for i in range(10)])) prediction_classes = table.lookup(tf.to_int64(indices)) for _ in range(FLAGS.training_iteration): batch = mnist.train.next_batch(50) train_step.run(feed_dict={x: batch[0], y_: batch[1]}) correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_, 1)) accuracy = tf.reduce_mean(tf.cast(correct_prediction, 'float')) print('training accuracy %g' % sess.run( accuracy, feed_dict={ x: mnist.test.images, y_: mnist.test.labels })) print('Done training!') # Export model # WARNING(break-tutorial-inline-code): The following code snippet is # in-lined in tutorials, please update tutorial documents accordingly # whenever code changes. # export_path_base = sys.argv[-1] export_path_base = "/Users/xingoo/PycharmProjects/ml-in-action/實踐-tensorflow/01-官方文檔-學習和使用ML/save_model" export_path = os.path.join(tf.compat.as_bytes(export_path_base), tf.compat.as_bytes(str(FLAGS.model_version))) print('Exporting trained model to', export_path) # 配置導出地址,建立SaveModel builder = tf.saved_model.builder.SavedModelBuilder(export_path) # Build the signature_def_map. # 建立TensorInfo,包含type,shape,name classification_inputs = tf.saved_model.utils.build_tensor_info(serialized_tf_example) classification_outputs_classes = tf.saved_model.utils.build_tensor_info(prediction_classes) classification_outputs_scores = tf.saved_model.utils.build_tensor_info(values) # 分類簽名:算法類型+輸入+輸出(機率和名字) classification_signature = ( tf.saved_model.signature_def_utils.build_signature_def( inputs={ tf.saved_model.signature_constants.CLASSIFY_INPUTS: classification_inputs }, outputs={ tf.saved_model.signature_constants.CLASSIFY_OUTPUT_CLASSES: classification_outputs_classes, tf.saved_model.signature_constants.CLASSIFY_OUTPUT_SCORES: classification_outputs_scores }, method_name=tf.saved_model.signature_constants.CLASSIFY_METHOD_NAME)) tensor_info_x = tf.saved_model.utils.build_tensor_info(x) tensor_info_y = tf.saved_model.utils.build_tensor_info(y) # 預測簽名:輸入的x和輸出的y prediction_signature = ( tf.saved_model.signature_def_utils.build_signature_def( inputs={'images': tensor_info_x}, outputs={'scores': tensor_info_y}, method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME)) # 構建圖和變量的信息: """ sess 會話 tags 標籤,默認提供serving、train、eval、gpu、tpu signature_def_map 簽名 main_op 初始化? strip_default_attrs strip? """ # predict_images就是服務調用的方法 # serving_default是沒有輸入簽名時,使用的方法 builder.add_meta_graph_and_variables( sess, [tf.saved_model.tag_constants.SERVING], signature_def_map={ 'predict_images': prediction_signature, tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: classification_signature, }, main_op=tf.tables_initializer(), strip_default_attrs=True) # 保存 builder.save() print('Done exporting!') if __name__ == '__main__': tf.app.run()
執行後,在當前目錄中就有一個save_model文件,保存了各個版本的pb模型文件。
而後基於grpc部署服務:
docker run -p 8500:8500 --mount type=bind,source=/Users/xingoo/PycharmProjects/ml-in-action/01-實踐-tensorflow/01-官方文檔-學習和使用ML/save_model,target=/models/mnist -e MODEL_NAME=mnist -t tensorflow/serving &
服務部署成功,查看一下docker列表:
➜ ~ docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 39a06cc35961 tensorflow/serving "/usr/bin/tf_serving…" 4 seconds ago Up 3 seconds 0.0.0.0:8500->8500/tcp, 8501/tcp hardcore_galileo
而後編寫對應的client代碼:
import tensorflow as tf import basic.mnist_input_data as mnist_input_data import grpc import numpy as np import sys import threading from tensorflow_serving.apis import predict_pb2 from tensorflow_serving.apis import prediction_service_pb2_grpc tf.app.flags.DEFINE_integer('concurrency', 1, 'maximum number of concurrent inference requests') tf.app.flags.DEFINE_integer('num_tests', 100, 'Number of test images') tf.app.flags.DEFINE_string('server', 'localhost:8500', 'PredictionService host:port') tf.app.flags.DEFINE_string('work_dir', './tmp', 'Working directory. ') FLAGS = tf.app.flags.FLAGS test_data_set = mnist_input_data.read_data_sets(FLAGS.work_dir).test channel = grpc.insecure_channel(FLAGS.server) stub = prediction_service_pb2_grpc.PredictionServiceStub(channel) class _ResultCounter(object): """Counter for the prediction results.""" def __init__(self, num_tests, concurrency): self._num_tests = num_tests self._concurrency = concurrency self._error = 0 self._done = 0 self._active = 0 self._condition = threading.Condition() def inc_error(self): with self._condition: self._error += 1 def inc_done(self): with self._condition: self._done += 1 self._condition.notify() def dec_active(self): with self._condition: self._active -= 1 self._condition.notify() def get_error_rate(self): with self._condition: while self._done != self._num_tests: self._condition.wait() return self._error / float(self._num_tests) def throttle(self): with self._condition: while self._active == self._concurrency: self._condition.wait() self._active += 1 def _create_rpc_callback(label, result_counter): def _callback(result_future): exception = result_future.exception() if exception: result_counter.inc_error() print(exception) else: response = np.array(result_future.result().outputs['scores'].float_val) prediction = np.argmax(response) sys.stdout.write("%s - %s\n" % (label, prediction)) sys.stdout.flush() result_counter.inc_done() result_counter.dec_active() return _callback result_counter = _ResultCounter(FLAGS.num_tests, FLAGS.concurrency) for i in range(FLAGS.num_tests): request = predict_pb2.PredictRequest() request.model_spec.name = 'mnist' request.model_spec.signature_name = 'predict_images' image, label = test_data_set.next_batch(1) request.inputs['images'].CopyFrom(tf.contrib.util.make_tensor_proto(image[0], shape=[1, image[0].size])) result_counter.throttle() result_future = stub.Predict.future(request, 5.0) # 5 seconds result_future.add_done_callback(_create_rpc_callback(label[0], result_counter)) print(result_counter.get_error_rate())
獲得對應的輸出:
3 - 3 6 - 6 9 - 9 3 - 3 1 - 1 4 - 9 1 - 5 7 - 9 6 - 6 9 - 9 0.0