TensorFlow 1.4正式添加了keras
和data
做爲其核心代碼(從contrib中畢業),加上以前的estimator
API,如今已經能夠利用Tensorflow像keras同樣方便的搭建網絡進行訓練。data
能夠方便從多種來源的數據輸入到搭建的網絡中(利用tf.features
能夠方便的對結構化的數據進行讀取和處理,好比存在csv
中的數據,具體操做能夠參考這篇文檔);keras
在搭建網絡的時候,能夠很大程度上避免因爲Session和Graph的重用出現的一系列問題(不過如今tf.keras
與原版的keras仍是有很多區別的地方,所以使用的時候也是會趕上一些意外的問題);而estimator
提供了相似與sklearn
中模型訓練的語法,很是好上手,並且默認定義了多種經常使用的任務,在自定義model_fn
的時候也能讓人對訓練過程更加清晰,並且estimator
提供了export_savedmodel
函數很方便將訓練好的ckpt模型文件轉成pb文件並結合 docker 和 tensorflow serving 進行靈活穩定的模型部署和更新。html
tf.data
進行數據流操做(TFRecords)在keras
中有keras.preprocessing.image.ImageDataGenerator()
類和.flow_from_directory()
函數能夠很容易將保存在 文件夾 下面的數據進行讀取;也能夠用.flow()
函數將數據直接從np.array中讀取後輸入網絡進行訓練(具體能夠查看官方文檔)。在使用圖片並以文件夾名做爲分類名的訓練任務時這個方案是十分簡單有效的,可是Tensorflow官方推薦的數據保存格式是 TFRecords,而keras官方不支持直接從tfrecords文件中讀取數據(tf.keras
也不行,可是這個issue中提供了一些PR是能夠的,keras做者不太推薦就是了),因此這裏就能夠用data
類來處理從TFRecords中的數據(也能夠用以前經常使用的tf.train.batch()
或tf.train.shuffle_batch()
來處理訓練數據)。python
Tensorflow官方提供了詳細的文檔來介紹data
的機制和使用方法(看這裏),而對於TFRecords類型數據,主要利用 tf.data.Iterator()
來抽取數據,這裏簡單說下從TFRecords中提取數據的方法:
如下代碼爲官方代碼git
def dataset_input_fn(): filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"] dataset = tf.data.TFRecordDataset(filenames) # 制定提取數據的tfrecords文件 # Use `tf.parse_single_example()` to extract data from a `tf.Example` # protocol buffer, and perform any additional per-record preprocessing. def parser(record): # 對tfrecords中的數據進行解析的操做 keys_to_features = { "image_data": tf.FixedLenFeature((), tf.string, default_value=""), "date_time": tf.FixedLenFeature((), tf.int64, default_value=""), "label": tf.FixedLenFeature((), tf.int64, default_value=tf.zeros([], dtype=tf.int64)), } parsed = tf.parse_single_example(record, keys_to_features) # Perform additional preprocessing on the parsed data. image = tf.image.decode_jpeg(parsed["image_data"]) image = tf.reshape(image, [299, 299, 1]) label = tf.cast(parsed["label"], tf.int32) return {"image_data": image, "date_time": parsed["date_time"]}, label # Use `Dataset.map()` to build a pair of a feature dictionary and a label # tensor for each example. dataset = dataset.map(parser) # 通常就用map函數對輸入圖像進行預處理,而預處理函數能夠包含在上面用於解析的parser函數 dataset = dataset.shuffle(buffer_size=10000) # 在訓練的時候通常須要將輸入數據進行順序打亂提升訓練的泛化性 dataset = dataset.batch(32) # 單次讀取的batch大小 dataset = dataset.repeat(num_epochs) # 數據集的重複使用次數,爲空的話則無線循環 iterator = dataset.make_one_shot_iterator() # `features` is a dictionary in which each value is a batch of values for # that feature; `labels` is a batch of labels. features, labels = iterator.get_next() return features, labels # return {"input": features, labels} # 對於estimator的輸入前者爲dict類型,後者爲tensor
注: dataset.make_one_shot_iterator()
是最簡單的Iterator類,不須要明確的initialization,而且目前這是惟一可以在estimator中容易使用的iterator。對於須要從新使用的Dataset類(好比結構相同的訓練和測試數據集),通常是須要用 reinitializable iterator ,不過在estimator中因爲上述問題,如今通常的作法是對訓練集和驗證集單獨寫兩個pipeline用make_one_shot_iterator
來處理數據流。github
經過data處理TFRecords數據流,咱們就可使用keras來進行訓練了。docker
def architecture(input_shape=(_PATCH_SIZE, _PATCH_SIZE, 3)): """ Model architecture Args: input_shape: input image shape (not include batch) Returns: an keras model instance """ base_model = Xception(include_top=True, weights=None, # no pre-trained weights used pooling="max", input_shape=input_shape, # modify first layer classes=_NUM_CLASSES) base_model.summary() return base_model def train(source_dir, model_save_path): """ Train patch based model Args: source_dir: a directory where training tfrecords file stored. All TF records start with train will be used! model_save_path: weights save path """ if tf.gfile.Exists(source_dir): train_data_paths = tf.gfile.Glob(source_dir+"/train*tfrecord") val_data_paths = tf.gfile.Glob(source_dir+"/val*tfrecord") if not len(train_data_paths): raise Exception("[Train Error]: unable to find train*.tfrecord file") if not len(val_data_paths): raise Exception("[Eval Error]: unable to find val*.tfrecord file") else: raise Exception("[Train Error]: unable to find input directory!") (images, labels) = dataset_input_fn(train_data_paths) model_input = keras.Input(tensor=images, shape=(_PATCH_SIZE, _PATCH_SIZE, 3), dtype=tf.float32, name="input") # keras model的輸入須要爲keras.Input類型,可是直接使用tensorflow tensor類型也是能夠的 base_model = architecture() model_output = base_model(model_input) model = keras.models.Model(inputs=model_input, outputs=model_output) optimizer = keras.optimizers.RMSprop(lr=2e-3, decay=0.9) model.compile(optimizer=optimizer, loss=focal_loss, metrics=['accuracy'], target_tensors=[labels]) # 1 tensorboard = tf.keras.callbacks.TensorBoard(log_dir=model_save_path) model_checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath=model_save_path+"/saved_model.h5") model.fit(steps_per_epoch=8000, epochs=_EPOCHS, callbacks=[tensorboard, model_checkpoint]) def evaluate(source_dir, weights_path): """ Eval patch based model Args: source_dir: directory where val tf records file stored. All TF records start with val will be used! weights_path: model weights save path """ # load model base_model = architecture() base_model.load_weights(weights_path) # load test dataset if tf.gfile.Exists(source_dir): val_data_paths = tf.gfile.Glob(source_dir+"/val*.tfrecord") if not len(val_data_paths): raise Exception("[Eval Error]: unable to find val*.tfrecord file") else: raise Exception("[Train Error]: unable to find input directory!") (images, labels) = input_fn(source_dir) probs = base_model(images) predictions = tf.argmax(probs, axis=-1) accuracy_score = tf.reduce_mean(tf.equal(probs, predictions)) print("Accuracy of testing images: {}".format(accuracy_score))
注: 對於#1來講,根據keras model的compile函數的文檔:api
target_tensors: By default, Keras will create placeholders for the model's target, which will be fed with the target data during training. If instead you would like to use your own target tensors (in turn, Keras will not expect external Numpy data for these targets at training time), you can specify them via the target_tensors argument. It can be a single tensor (for a single-output model), a list of tensors, or a dict mapping output names to target tensors.bash
定義了target tensor 以後就不須要再從外部輸入數據了(fit的時候)。網絡
而使用這種方式訓練獲得的的模型文件爲h5,若是想要轉換成用於tensorflow service的模型,須要用如下方式進行:a). fchollet提供的keras式寫法
session
# @1 from keras import backend as K K.set_learning_phase(0) # all new operations will be in test mode from now on # serialize the model and get its weights, for quick re-building config = previous_model.get_config() weights = previous_model.get_weights() # re-build a model where the learning phase is now hard-coded to 0 from keras.models import model_from_config new_model = model_from_config(config) new_model.set_weights(weights) # @2 from tensorflow_serving.session_bundle import exporter export_path = ... # where to save the exported graph export_version = ... # version number (integer) saver = tf.train.Saver(sharded=True) model_exporter = exporter.Exporter(saver) signature = exporter.classification_signature(input_tensor=model.input, scores_tensor=model.output) # 分類 model_exporter.init(sess.graph.as_graph_def(), default_graph_signature=signature) model_exporter.export(export_path, tf.constant(export_version), sess)
或者tensorflow官方的寫法:app
import os import tensorflow as tf import keras.backend as K def save_model_for_production(model, version, path='prod_models'): K.set_learning_phase(0) if not os.path.exists(path): os.mkdir(path) export_path = os.path.join( tf.compat.as_bytes(path), tf.compat.as_bytes(str(get_new_version(path=path, current_version=int(version))))) builder = tf.saved_model.builder.SavedModelBuilder(export_path) model_input = tf.saved_model.utils.build_tensor_info(model.input) model_output = tf.saved_model.utils.build_tensor_info(model.output) prediction_signature = ( tf.saved_model.signature_def_utils.build_signature_def( inputs={'inputs': model_input}, outputs={'output': model_output}, method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME)) with K.get_session() as sess: builder.add_meta_graph_and_variables( sess=sess, tags=[tf.saved_model.tag_constants.SERVING], signature_def_map={ 'predict': prediction_signature, }) builder.save()
能夠用tf.layers
函數來代替keras搭建網絡,並且能夠提供更豐富的layer。
def xception(): def tf_xception(features, classes=2, is_training=True): """ The Xception architecture written in tf.layers Args: features: input image tensor classes: number of classes to classify images into is_training: is training stage or not Returns: 2-D logits prediction output after pooling and activation """ x = tf.layers.conv2d(features, 32, (3, 3), strides=(2, 2), use_bias=False, name='block1_conv1') x = tf.layers.batch_normalization(x, training=is_training, name='block1_conv1_bn') x = tf.nn.relu(x, name='block1_conv1_act') x = tf.layers.conv2d(x, 64, (3, 3), use_bias=False, name='block1_conv2') x = tf.layers.batch_normalization(x, training=is_training, name='block1_conv2_bn') x = tf.nn.relu(x, name='block1_conv2_act') residual = tf.layers.conv2d(x, 128, (1, 1), strides=(2, 2), padding='same', use_bias=False) residual = tf.layers.batch_normalization(residual, training=is_training) x = tf.layers.separable_conv2d(x, 128, (3, 3), padding='same', use_bias=False, name='block2_sepconv1') x = tf.layers.batch_normalization(x, training=is_training, name='block2_sepconv1_bn') x = tf.nn.relu(x, name='block2_sepconv2_act') x = tf.layers.separable_conv2d(x, 128, (3, 3), padding='same', use_bias=False, name='block2_sepconv2') x = tf.layers.batch_normalization(x, training=is_training, name='block2_sepconv2_bn') x = tf.layers.max_pooling2d(x, (3, 3), strides=(2, 2), padding='same', name='block2_pool') x = tf.add(x, residual, name='block2_add') residual = tf.layers.conv2d(x, 256, (1, 1), strides=(2, 2), padding='same', use_bias=False) residual = tf.layers.batch_normalization(residual, training=is_training) x = tf.nn.relu(x, name='block3_sepconv1_act') x = tf.layers.separable_conv2d(x, 256, (3, 3), padding='same', use_bias=False, name='block3_sepconv1') x = tf.layers.batch_normalization(x, training=is_training, name='block3_sepconv1_bn') x = tf.nn.relu(x, name='block3_sepconv2_act') x = tf.layers.separable_conv2d(x, 256, (3, 3), padding='same', use_bias=False, name='block3_sepconv2') x = tf.layers.batch_normalization(x, training=is_training, name='block3_sepconv2_bn') x = tf.layers.max_pooling2d(x, (3, 3), strides=(2, 2), padding='same', name='block3_pool') x = tf.add(x, residual, name="block3_add") residual = tf.layers.conv2d(x, 728, (1, 1), strides=(2, 2), padding='same', use_bias=False) residual = tf.layers.batch_normalization(residual, training=is_training) x = tf.nn.relu(x, name='block4_sepconv1_act') x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name='block4_sepconv1') x = tf.layers.batch_normalization(x, training=is_training, name='block4_sepconv1_bn') x = tf.nn.relu(x, name='block4_sepconv2_act') x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name='block4_sepconv2') x = tf.layers.batch_normalization(x, training=is_training, name='block4_sepconv2_bn') x = tf.layers.max_pooling2d(x, (3, 3), strides=(2, 2), padding='same', name='block4_pool') x = tf.add(x, residual, name="block4_add") for i in range(8): residual = x prefix = 'block' + str(i + 5) x = tf.nn.relu(x, name=prefix + '_sepconv1_act') x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name=prefix + '_sepconv1') x = tf.layers.batch_normalization(x, training=is_training, name=prefix + '_sepconv1_bn') x = tf.nn.relu(x, name=prefix + '_sepconv2_act') x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name=prefix + '_sepconv2') x = tf.layers.batch_normalization(x, training=is_training, name=prefix + '_sepconv2_bn') x = tf.nn.relu(x, name=prefix + '_sepconv3_act') x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name=prefix + '_sepconv3') x = tf.layers.batch_normalization(x, training=is_training, name=prefix + '_sepconv3_bn') x = tf.add(x, residual, name=prefix+"_add") residual = tf.layers.conv2d(x, 1024, (1, 1), strides=(2, 2), padding='same', use_bias=False) residual = tf.layers.batch_normalization(residual, training=is_training) x = tf.nn.relu(x, name='block13_sepconv1_act') x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name='block13_sepconv1') x = tf.layers.batch_normalization(x, training=is_training, name='block13_sepconv1_bn') x = tf.nn.relu(x, name='block13_sepconv2_act') x = tf.layers.separable_conv2d(x, 1024, (3, 3), padding='same', use_bias=False, name='block13_sepconv2') x = tf.layers.batch_normalization(x, training=is_training, name='block13_sepconv2_bn') x = tf.layers.max_pooling2d(x, (3, 3), strides=(2, 2), padding='same', name='block13_pool') x = tf.add(x, residual, name="block13_add") x = tf.layers.separable_conv2d(x, 1536, (3, 3), padding='same', use_bias=False, name='block14_sepconv1') x = tf.layers.batch_normalization(x, training=is_training, name='block14_sepconv1_bn') x = tf.nn.relu(x, name='block14_sepconv1_act') x = tf.layers.separable_conv2d(x, 2048, (3, 3), padding='same', use_bias=False, name='block14_sepconv2') x = tf.layers.batch_normalization(x, training=is_training, name='block14_sepconv2_bn') x = tf.nn.relu(x, name='block14_sepconv2_act') # replace conv layer with fc x = tf.layers.average_pooling2d(x, (3, 3), (2, 2), name="global_average_pooling") x = tf.layers.conv2d(x, 2048, [1, 1], activation=None, name="block15_conv1") x = tf.layers.conv2d(x, classes, [1, 1], activation=None, name="block15_conv2") x = tf.squeeze(x, axis=[1, 2], name="logits") return x def model_fn(features, labels, mode, params): """ Model_fn for estimator model Args: features (Tensor): Input features to the model. labels (Tensor): Labels tensor for training and evaluation. mode (ModeKeys): Specifies if training, evaluation or prediction. params (HParams): hyper-parameters for estimator model Returns: (EstimatorSpec): Model to be run by Estimator. """ # check if training stage if mode == tf.estimator.ModeKeys.TRAIN: is_training = True else: is_training = False # is_training = False # 1 input_tensor = features["input"] logits = xception(input_tensor, classes=_NUM_CLASSES, is_training=is_training) probs = tf.nn.softmax(logits, name="output_score") predictions = tf.argmax(probs, axis=-1, name="output_label") onehot_labels = tf.one_hot(tf.cast(labels, tf.int32), _NUM_CLASSES) # provide a tf.estimator spec for PREDICT predictions_dict = {"score": probs, "label": predictions} if mode == tf.estimator.ModeKeys.PREDICT: predictions_output = tf.estimator.export.PredictOutput(predictions_dict) return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions_dict, export_outputs={ tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: predictions_output }) # calculate loss # loss = focal_loss(onehot_labels, logits, gamma=1.5) gamma = 1.5 weights = tf.reduce_sum(tf.multiply(onehot_labels, tf.pow(1. - probs, gamma)), axis=-1) loss = tf.losses.softmax_cross_entropy(onehot_labels, logits, weights=weights) accuracy = tf.metrics.accuracy(labels=labels, predictions=predictions) if mode == tf.estimator.ModeKeys.TRAIN: lr = params.learning_rate # train optimizer optimizer = tf.train.RMSPropOptimizer(learning_rate=lr, decay=0.9) update_ops = tf.get_collections(tf.GraphKeys.UPDATE_OPS) with tf.control_dependencies(update_ops): train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step()) tensors_to_log = {'batch_accuracy': accuracy[1], 'logits': logits, 'label': labels} logging_hook = tf.train.LoggingTensorHook(tensors=tensors_to_log, every_n_iter=1000) return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op, training_hooks=[logging_hook]) else: eval_metric_ops = {"accuracy": accuracy} return tf.estimator.EstimatorSpec(mode=mode, loss=loss, eval_metric_ops=eval_metric_ops) def get_estimator_model(config=None, params=None): """ Get estimator model by definition of model_fn """ est_model = tf.estimator.Estimator(model_fn=model_fn, config=config, params=params) return est_model def train(source_dir, model_save_path): """ Train patch based model Args: source_dir: a directory where training tfrecords file stored. All TF records start with train will be used! model_save_path: weights save path """ if tf.gfile.Exists(source_dir): train_data_paths = tf.gfile.Glob(source_dir+"/train*tfrecord") val_data_paths = tf.gfile.Glob(source_dir+"/val*tfrecord") if not len(train_data_paths): raise Exception("[Train Error]: unable to find train*.tfrecord file") if not len(val_data_paths): raise Exception("[Eval Error]: unable to find val*.tfrecord file") else: raise Exception("[Train Error]: unable to find input directory!") train_config = tf.estimator.RunConfig() new_config = train_config.replace(model_dir=model_save_path, save_checkpoints_steps=1000, keep_checkpoint_max=5) params = tf.contrib.training.HParams( learning_rate=0.001, train_steps=5000, min_eval_frequency=1000 ) est_model = get_estimator_model(config=new_config, params=params) # define training config train_spec = tf.estimator.TrainSpec(input_fn=lambda: input_fn(data_path=train_data_paths, batch_size=_BATCH_SIZE, is_training=True), max_steps=_MAX_STEPS) eval_spec = tf.estimator.EvalSpec(input_fn=lambda: input_fn(data_path=val_data_paths, batch_size=_BATCH_SIZE), steps=100, throttle_secs=900) # train and evaluate model tf.estimator.train_and_evaluate(estimator=est_model, train_spec=train_spec, eval_spec=eval_spec) def evaluate(source_dir, model_save_path): """ Eval patch based model Args: source_dir: directory where val tf records file stored. All TF records start with val will be used! model_save_path: model save path """ # load model run_config = tf.contrib.learn.RunConfig(model_dir=model_save_path) est_model = get_estimator_model(run_config) # load test dataset if tf.gfile.Exists(source_dir): val_data_paths = tf.gfile.Glob(source_dir+"/val*.tfrecord") if not len(val_data_paths): raise Exception("[Eval Error]: unable to find val*.tfrecord file") else: raise Exception("[Train Error]: unable to find input directory!") accuracy_score = est_model.evaluate(input_fn=lambda: input_fn(val_data_paths, batch_size=_BATCH_SIZE, is_training=False)) print("Accuracy of testing images: {}".format(accuracy_score))
注:
BN layer in estimator: : 對於問題#1,在包含BN層的網絡,正常狀況下都是須要明確指定是否處於training狀態(BN在訓練和其餘狀態的計算方式是不一樣的),可是在 1.4.1 版本下,使用estimator進行訓練的時候是正常( 這個錯誤是因爲Tensorflow is_training=True
),可是在evaluation的時候(is_training=False
)輸出的精度是初始化狀態,而且隨着train_and_evaluate()
的屢次調用,eval loss會變得很大。這應該是一個bug(具體問題可查看issue13895), 16455。而參考tensorflow官方給出的給予estimator的resnet example中的BN layer在train stage的時候並無設置is_training=True
,因此這裏把is_training關閉了,這樣操做訓練是正常的。tf.layers.batch_normalization()
函數調用方式不對致使的,根據這個函數的[官方文檔](https://www.tensorflow.org/api_docs/python/tf/layers/batch_normalization):
Note: when training, the moving_mean and moving_variance need to be updated. By default the update ops are placed in tf.GraphKeys.UPDATE_OPS, so they need to be added as a dependency to the train_op.
須要明確手動添加update_ops做爲dependency到update_ops:
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS) with tf.control_dependencies(update_ops): train_op = optimizer.minimize(loss)此外,輸入的圖片的channel須要爲3
tf.logging
的verbosity level: tf.logging.set_verbosity(tf.logging.INFO)
def model_fn(features, labels, mode, params): # Logic to do the following: # 1. Configure the model via TensorFlow operations # 2. Define the loss function for training/evaluation # 3. Define the training operation/optimizer # 4. Generate predictions # 5. Return predictions/loss/train_op/eval_metric_ops in EstimatorSpec object return EstimatorSpec(mode, predictions, loss, train_op, eval_metric_ops)
model_fn有三種mode:tf.estimator.ModeKeys.TRAIN
,tf.estimator.ModeKeys.EVAL
, tf.estimator.ModeKeys.PREDICT
,其中第一個必需要返回的爲loss
和training_op
, 第二個是loss
,第三個是prediction
。
tf.metrics.accuracy: 通常狀況下EVAL的時候model_fn會返回eval_metric_ops
,這個是一個dict,其值爲tf.metrics
的返回值(以tf.metrics.accuracy()
爲例,返回值以下):
accuracy: A
Tensor
representing the accuracy, the value oftotal
divided bycount
.
update_op: An operation that increments thetotal
andcount
variables appropriately and whose value matchesaccuracy
.
可是這個函數在estimator模型中使用的時候有點反直覺:
tf.metrics.accuracy is not meant to compute the accuracy of a single batch. It returns both the accuracy and an update_op, and update_op is intended to be run every batch, which updates the accuracy.
因此在上面代碼中對每一個batch算accuracy的時候算的是update_op而非accuracy,更多關於tf.metrics.accuracy()
的討論能夠看這倆個issue:15115, 9498
Freeze model/export savedmodel for Tensorflow Serving
使用estimator進行訓練的一大好處就是進行分佈式部署的擴展很容易,而部署須要的模型結構爲(具體看這裏):
從訓練的ckpt模型文件freeze從pb(protobuf)模型文件,estimator提供了export_savedmodel
來幫助快速進行操做。
def serving_input_receiver_fn(): """ Build serving inputs """ inputs = tf.placeholder(dtype=tf.string, name="input_image") feature_config = {'image/encoded': tf.FixedLenFeature(shape=[], dtype=tf.string)} tf_example = tf.parse_example(inputs, feature_config) patch_images = tf.map_fn(_preprocess_image, tf_example["image/encoded"], dtype=tf.float32) patch_images = tf.squeeze(patch_images, axis=[0]) receive_tensors = {'example': inputs} features = {"input": patch_images} return tf.estimator.export.ServingInputReceiver(features, receive_tensors) def save_serving_model(): session_config = tf.ConfigProto(gpu_options=tf.GPUOptions(allow_growth=True)) config = tf.estimator.RunConfig(model_dir=MODEL_WEIGHTS_PATH, session_config=session_config) # session_config is used for configuration of session model = get_estimator_model(config=config) model.export_savedmodel(export_dir_base=SERVING_MODEL_SAVE_PATH, serving_input_receiver_fn=serving_input_receiver_fn)
若是須要在預測的時候調整sess的參數(ConfigProto),能夠經過tf.estimator.RunConfig
配置session_config
的參數而後再輸入到tf.estimator.Estimator()
的config參數(如上面所示)。
通常經過export_savedmodel方式輸出的模型是用tensorflow serving封裝服務進行預測:
# start tensorflow serving $ bazel-bin/tensorflow_serving/model_servers/tensorflow_model_server --port=9000 --model_name=your_model_name --model_base_path=/your/model/base/path
server = "localhost:9000" def serving_predict(face_img): """ Get prediction based on given face image Args: face_img: input image file Returns: anti-spoof detection result of given face """ host, port = server.split(":") channel = implementations.insecure_channel(host, int(port)) stub = prediction_service_pb2.beta_create_PredictionService_stub(channel) # creating request request = predict_pb2.PredictRequest() request.model_spec.name = "face_anti_spoof" request.model_spec.signature_name = "predict_images" # parse face img for serving with open(face_img, "rb") as f: serialized = f.read() example = tf.train.Example(features=tf.train.Features(feature={"image/encoded": bytes_feature(serialized)})) serialized = example.SerializeToString() request.inputs['inputs'].CopyFrom(tf.contrib.util.make_tensor_proto(serialized, shape=[1])) # predict results content = stub.Predict(request, timeout=5) labels = content.outputs["label"].int64_val prob = np.sum(labels) / len(labels) if prob > 0.5: label = LABELS[1] else: label = LABELS[0] prob = 1 - prob info = {"label": label, "prob": prob} return info
具體關於tensorlfow serving的用法會在以後的文章裏面展開。
而生成的pb文件也能夠直接加載不經過tensorflow serving來進行預測,不過因爲estimator的export_savedmodel
輸出的pb文件中模型的保存格式爲tf.saved_model
類,因此加載模型用tf.saved_model.loader
模塊進行操做會比較方便(sess.restore()
加載這個pb模型很容易報錯= =):
def load_graph(trained_model): session_config = tf.ConfigProto() session_config.gpu_options.allow_growth = True sess = tf.Session(config=session_config) meta_graph_def = tf.saved_model.loader.load(sess, [tf.saved_model.tag_constants.SERVING], trained_model) # signature = meta_graph_def.signature_def # signature_key = tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY # print(signature[signature_key]) return sess sess = load_graph(model_path) def predict_pb(image): """ Predict input face is real or not using model stored in save_model.pb generated by estimator export_savedmodel Args: image: input image file """ assert tf.gfile.Exists(image), "Input image file not found!" with open(image, "rb") as f: image_bytes = f.read() example = tf.train.Example(features=tf.train.Features(feature={"image/encoded": bytes_feature(image_bytes)})) serialized = example.SerializeToString() input_tensor = sess.graph.get_tensor_by_name("input_image:0") labels = sess.graph.get_tensor_by_name("output_label:0") scores = sess.graph.get_tensor_by_name("output_score:0") (labels, scores) = sess.run([labels, scores], feed_dict={input_tensor: [serialized]}) return labels, scores
大部分與上面相同,只是用keras.estimator.model_to_estimator()
函數直接將keras model直接轉換成estimator model而不用本身定義model_fn,這種方式對於模型訓練十分簡單友好,可是在使用Tensorflow estimator的export_savedmodel()
函數的時候會報錯,緣由是缺乏export_outputs
(在自定義model_fn的時候好像也並無須要明肯定義export_outputs變量,很是微妙的報錯),而用官方的作法轉化模型的時候又會報 BN層參數沒有初始化 的錯誤(Attempting to use uninitialized value bn_xx)(又是BN層, 跟以前同樣多是因爲graph重用引發的錯誤),所以用這種方式訓練的模型暫時尚未找到轉化成Tensorflow serving所需的模型的方法。
def model_fn(model_dir=".", input_shape=(_PATCH_SIZE, _PATCH_SIZE, 3)): """ Convert keras model to estimator model Args: model_dir: absolute model save directory input_shape: input image shape (not include batch) Returns: an estimator model instance """ with K.name_scope("model"): base_model = tf.keras.applications.Xception(include_top=True, weights=None, # no pre-trained weights used pooling="max", input_shape=input_shape, # modify first layer classes=_NUM_CLASSES) base_model.summary() model_input = keras.Input(shape=input_shape, dtype=tf.float32, name="input_1") model_output = base_model(model_input) model = keras.models.Model(inputs=model_input, outputs=model_output) optimizer = keras.optimizers.RMSprop(lr=2e-3, decay=0.9) model.compile(optimizer=optimizer, loss=focal_loss, metrics=['accuracy']) if model_dir == ".": model_dir = os.path.dirname(os.path.abspath(__file__)) elif not model_dir.startswith("/"): # relative path to abs path model_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), model_dir) if not os.path.exists(model_dir): os.makedirs(model_dir) # define training config run_config = tf.estimator.RunConfig(save_checkpoints_steps=2000, keep_checkpoint_max=5) # convert keras model to estimator model est_model = keras.estimator.model_to_estimator(model, model_dir=model_dir, config=run_config) return est_model
在1.4中estimator model有train_and_evaluate()
函數進行開始模型的train和evaluate,也能夠直接用model.train()
和model.evaluate()
函數(相似keras model的fit和evaluate函數);不過能夠用tf.contrib.learn.Experiment()
來代替(須要定義experiment類,比較麻煩)。大部分與estimator model相同,須要定義experiment_fn
以下:
from tensorflow.contrib.learn import learn_runner def experiment_fn(run_config, params): """ Create an experiment to train and evaluate the model Args: run_config: configuration for estimator params: Hparams object returned by tf.contrib.training.HParams Returns: Experiment object """ if tf.gfile.Exists(FLAGS.train_data_dir) and tf.gfile.Exists(FLAGS.val_data_dir): train_data_paths = tf.gfile.Glob(FLAGS.train_data_dir+"/train*tfrecord") val_data_paths = tf.gfile.Glob(FLAGS.val_data_dir+"/val*tfrecord") if not len(train_data_paths): raise Exception("[Train Error]: unable to find train*.tfrecord file") if not len(val_data_paths): raise Exception("[Eval Error]: unable to find val*.tfrecord file") else: raise Exception("[Train Error]: unable to find input directory!") estimator = get_estimator_model(config=run_config, params=params) def train_input_fn(): return input_fn(data_path=train_data_paths, batch_size=_BATCH_SIZE, is_training=True) def eval_input_fn(): return input_fn(data_path=val_data_paths, batch_size=_BATCH_SIZE, is_training=False) experiment = tf.contrib.learn.Experiment( estimator=estimator, train_input_fn=train_input_fn, eval_input_fn=eval_input_fn, train_steps=params.train_steps, min_eval_frequency=params.min_eval_frequency, ) return experiment def run_experiment(): """ Run the training experiment """ params = tf.contrib.training.HParams( learning_rate=0.002, n_classes=_NUM_CLASSES, train_steps=5000, min_eval_frequency=100 ) run_config = tf.contrib.learn.RunConfig(model_dir=FLAGS.save_dir, save_checkpoints_steps=1000, keep_checkpoint_max=5) experiment = experiment_fn(run_config=run_config, params=params) experiment.train_and_evaluate() # learn_runner.run( # experiment_fn=experiment_fn, # first class function # run_config=run_config, # schedule='train_and_evaluate', # hparams=params # )