Tensorflow1.4 高級接口使用(estimator, data, keras, layers)

TensorFlow 高級接口使用簡介(estimator, keras, data, experiment)

TensorFlow 1.4正式添加了kerasdata做爲其核心代碼(從contrib中畢業),加上以前的estimator API,如今已經能夠利用Tensorflow像keras同樣方便的搭建網絡進行訓練。data能夠方便從多種來源的數據輸入到搭建的網絡中(利用tf.features能夠方便的對結構化的數據進行讀取和處理,好比存在csv中的數據,具體操做能夠參考這篇文檔);keras在搭建網絡的時候,能夠很大程度上避免因爲Session和Graph的重用出現的一系列問題(不過如今tf.keras與原版的keras仍是有很多區別的地方,所以使用的時候也是會趕上一些意外的問題);而estimator提供了相似與sklearn中模型訓練的語法,很是好上手,並且默認定義了多種經常使用的任務,在自定義model_fn的時候也能讓人對訓練過程更加清晰,並且estimator提供了export_savedmodel函數很方便將訓練好的ckpt模型文件轉成pb文件並結合 dockertensorflow serving 進行靈活穩定的模型部署和更新。html

1. tf.data 進行數據流操做(TFRecords)

keras中有keras.preprocessing.image.ImageDataGenerator()類和.flow_from_directory()函數能夠很容易將保存在 文件夾 下面的數據進行讀取;也能夠用.flow()函數將數據直接從np.array中讀取後輸入網絡進行訓練(具體能夠查看官方文檔)。在使用圖片並以文件夾名做爲分類名的訓練任務時這個方案是十分簡單有效的,可是Tensorflow官方推薦的數據保存格式是 TFRecords,而keras官方不支持直接從tfrecords文件中讀取數據(tf.keras也不行,可是這個issue中提供了一些PR是能夠的,keras做者不太推薦就是了),因此這裏就能夠用data類來處理從TFRecords中的數據(也能夠用以前經常使用的tf.train.batch()tf.train.shuffle_batch()來處理訓練數據)。python

Tensorflow官方提供了詳細的文檔來介紹data的機制和使用方法(看這裏),而對於TFRecords類型數據,主要利用 tf.data.Iterator()來抽取數據,這裏簡單說下從TFRecords中提取數據的方法:

如下代碼爲官方代碼git

def dataset_input_fn():
  filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
  dataset = tf.data.TFRecordDataset(filenames) # 制定提取數據的tfrecords文件

  # Use `tf.parse_single_example()` to extract data from a `tf.Example`
  # protocol buffer, and perform any additional per-record preprocessing.
  def parser(record):   # 對tfrecords中的數據進行解析的操做
    keys_to_features = {
        "image_data": tf.FixedLenFeature((), tf.string, default_value=""),
        "date_time": tf.FixedLenFeature((), tf.int64, default_value=""),
        "label": tf.FixedLenFeature((), tf.int64,
                                    default_value=tf.zeros([], dtype=tf.int64)),
    }
    parsed = tf.parse_single_example(record, keys_to_features)

    # Perform additional preprocessing on the parsed data.
    image = tf.image.decode_jpeg(parsed["image_data"])
    image = tf.reshape(image, [299, 299, 1])
    label = tf.cast(parsed["label"], tf.int32)

    return {"image_data": image, "date_time": parsed["date_time"]}, label

  # Use `Dataset.map()` to build a pair of a feature dictionary and a label
  # tensor for each example.
  dataset = dataset.map(parser)  # 通常就用map函數對輸入圖像進行預處理,而預處理函數能夠包含在上面用於解析的parser函數
  dataset = dataset.shuffle(buffer_size=10000)  # 在訓練的時候通常須要將輸入數據進行順序打亂提升訓練的泛化性
  dataset = dataset.batch(32)  # 單次讀取的batch大小
  dataset = dataset.repeat(num_epochs)   # 數據集的重複使用次數,爲空的話則無線循環
  iterator = dataset.make_one_shot_iterator()  

  # `features` is a dictionary in which each value is a batch of values for
  # that feature; `labels` is a batch of labels.
  features, labels = iterator.get_next()  
  return features, labels  
  # return {"input": features, labels}  # 對於estimator的輸入前者爲dict類型,後者爲tensor

注: dataset.make_one_shot_iterator()是最簡單的Iterator類,不須要明確的initialization,而且目前這是惟一可以在estimator中容易使用的iterator。對於須要從新使用的Dataset類(好比結構相同的訓練和測試數據集),通常是須要用 reinitializable iterator ,不過在estimator中因爲上述問題,如今通常的作法是對訓練集和驗證集單獨寫兩個pipeline用make_one_shot_iterator來處理數據流。github

參考

2. Dataset + Keras

經過data處理TFRecords數據流,咱們就可使用keras來進行訓練了。docker

def architecture(input_shape=(_PATCH_SIZE, _PATCH_SIZE, 3)):
    """
    Model architecture
    Args:
        input_shape: input image shape (not include batch)

    Returns: an keras model instance
    """
    base_model = Xception(include_top=True,
                          weights=None,   # no pre-trained weights used
                          pooling="max",
                          input_shape=input_shape,  # modify first layer
                          classes=_NUM_CLASSES)
    base_model.summary()
    return base_model


def train(source_dir, model_save_path):
    """
    Train patch based model
    Args:
        source_dir: a directory where training tfrecords file stored. All TF records start with train will be used!
        model_save_path: weights save path
    """
    if tf.gfile.Exists(source_dir):
        train_data_paths = tf.gfile.Glob(source_dir+"/train*tfrecord")
        val_data_paths = tf.gfile.Glob(source_dir+"/val*tfrecord")
        if not len(train_data_paths):
            raise Exception("[Train Error]: unable to find train*.tfrecord file")
        if not len(val_data_paths):
            raise Exception("[Eval Error]: unable to find val*.tfrecord file")
    else:
        raise Exception("[Train Error]: unable to find input directory!")
    (images, labels) = dataset_input_fn(train_data_paths)
    model_input = keras.Input(tensor=images, shape=(_PATCH_SIZE, _PATCH_SIZE, 3), dtype=tf.float32, name="input")  # keras model的輸入須要爲keras.Input類型,可是直接使用tensorflow tensor類型也是能夠的
    base_model = architecture()
    model_output = base_model(model_input)
    model = keras.models.Model(inputs=model_input, outputs=model_output)
    optimizer = keras.optimizers.RMSprop(lr=2e-3, decay=0.9)
    model.compile(optimizer=optimizer,
                  loss=focal_loss,
                  metrics=['accuracy'],
                  target_tensors=[labels])   # 1
    tensorboard = tf.keras.callbacks.TensorBoard(log_dir=model_save_path)
    model_checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath=model_save_path+"/saved_model.h5")
    model.fit(steps_per_epoch=8000, epochs=_EPOCHS, callbacks=[tensorboard, model_checkpoint])


def evaluate(source_dir, weights_path):
    """
    Eval patch based model
    Args:
        source_dir: directory where val tf records file stored. All TF records start with val will be used!
        weights_path: model weights save path
    """
    # load model
    base_model = architecture()
    base_model.load_weights(weights_path)
    # load test dataset
    if tf.gfile.Exists(source_dir):
        val_data_paths = tf.gfile.Glob(source_dir+"/val*.tfrecord")
        if not len(val_data_paths):
            raise Exception("[Eval Error]: unable to find val*.tfrecord file")
    else:
        raise Exception("[Train Error]: unable to find input directory!")
    (images, labels) = input_fn(source_dir)
    probs = base_model(images)
    predictions = tf.argmax(probs, axis=-1)
    accuracy_score = tf.reduce_mean(tf.equal(probs, predictions))
    print("Accuracy of testing images: {}".format(accuracy_score))

注: 對於#1來講,根據keras model的compile函數的文檔:api

target_tensors: By default, Keras will create placeholders for the model's target, which will be fed with the target data during training. If instead you would like to use your own target tensors (in turn, Keras will not expect external Numpy data for these targets at training time), you can specify them via the target_tensors argument. It can be a single tensor (for a single-output model), a list of tensors, or a dict mapping output names to target tensors.bash

定義了target tensor 以後就不須要再從外部輸入數據了(fit的時候)。網絡

而使用這種方式訓練獲得的的模型文件爲h5,若是想要轉換成用於tensorflow service的模型,須要用如下方式進行:a). fchollet提供的keras式寫法
session

# @1
from keras import backend as K

K.set_learning_phase(0)  # all new operations will be in test mode from now on

# serialize the model and get its weights, for quick re-building
config = previous_model.get_config()
weights = previous_model.get_weights()

# re-build a model where the learning phase is now hard-coded to 0
from keras.models import model_from_config
new_model = model_from_config(config)
new_model.set_weights(weights)
# @2
from tensorflow_serving.session_bundle import exporter

export_path = ... # where to save the exported graph
export_version = ... # version number (integer)

saver = tf.train.Saver(sharded=True)
model_exporter = exporter.Exporter(saver)
signature = exporter.classification_signature(input_tensor=model.input,
                                              scores_tensor=model.output)  # 分類
model_exporter.init(sess.graph.as_graph_def(),
                    default_graph_signature=signature)
model_exporter.export(export_path, tf.constant(export_version), sess)

或者tensorflow官方的寫法:app

import os
import tensorflow as tf
import keras.backend as K

def save_model_for_production(model, version, path='prod_models'):
    K.set_learning_phase(0)
    if not os.path.exists(path):
        os.mkdir(path)
    export_path = os.path.join(
        tf.compat.as_bytes(path),
        tf.compat.as_bytes(str(get_new_version(path=path, current_version=int(version)))))
    builder = tf.saved_model.builder.SavedModelBuilder(export_path)

    model_input = tf.saved_model.utils.build_tensor_info(model.input)
    model_output = tf.saved_model.utils.build_tensor_info(model.output)

    prediction_signature = (
        tf.saved_model.signature_def_utils.build_signature_def(
            inputs={'inputs': model_input},
            outputs={'output': model_output},
            method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME))

    with K.get_session() as sess:
        builder.add_meta_graph_and_variables(
            sess=sess, tags=[tf.saved_model.tag_constants.SERVING],
            signature_def_map={
                'predict':
                    prediction_signature,
            })

        builder.save()

參考

  • fchollet的mnist example
  • https://stackoverflow.com/questions/46135499/how-to-properly-combine-tensorflows-dataset-api-and-keras
  • https://blog.keras.io/keras-as-a-simplified-interface-to-tensorflow-tutorial.html

3. Dataset + estimator

能夠用tf.layers函數來代替keras搭建網絡,並且能夠提供更豐富的layer。

def xception():
    def tf_xception(features, classes=2, is_training=True):
    """
    The Xception architecture written in tf.layers
    Args:
        features: input image tensor
        classes: number of classes to classify images into
        is_training: is training stage or not

    Returns:
        2-D logits prediction output after pooling and activation
    """
    x = tf.layers.conv2d(features, 32, (3, 3), strides=(2, 2), use_bias=False, name='block1_conv1')
    x = tf.layers.batch_normalization(x, training=is_training, name='block1_conv1_bn')
    x = tf.nn.relu(x, name='block1_conv1_act')
    x = tf.layers.conv2d(x, 64, (3, 3), use_bias=False, name='block1_conv2')
    x = tf.layers.batch_normalization(x, training=is_training, name='block1_conv2_bn')
    x = tf.nn.relu(x, name='block1_conv2_act')

    residual = tf.layers.conv2d(x, 128, (1, 1), strides=(2, 2), padding='same', use_bias=False)
    residual = tf.layers.batch_normalization(residual, training=is_training)

    x = tf.layers.separable_conv2d(x, 128, (3, 3), padding='same', use_bias=False, name='block2_sepconv1')
    x = tf.layers.batch_normalization(x, training=is_training, name='block2_sepconv1_bn')
    x = tf.nn.relu(x, name='block2_sepconv2_act')
    x = tf.layers.separable_conv2d(x, 128, (3, 3), padding='same', use_bias=False, name='block2_sepconv2')
    x = tf.layers.batch_normalization(x, training=is_training, name='block2_sepconv2_bn')

    x = tf.layers.max_pooling2d(x, (3, 3), strides=(2, 2), padding='same', name='block2_pool')
    x = tf.add(x, residual, name='block2_add')

    residual = tf.layers.conv2d(x, 256, (1, 1), strides=(2, 2), padding='same', use_bias=False)
    residual = tf.layers.batch_normalization(residual, training=is_training)

    x = tf.nn.relu(x, name='block3_sepconv1_act')
    x = tf.layers.separable_conv2d(x, 256, (3, 3), padding='same', use_bias=False, name='block3_sepconv1')
    x = tf.layers.batch_normalization(x, training=is_training, name='block3_sepconv1_bn')
    x = tf.nn.relu(x, name='block3_sepconv2_act')
    x = tf.layers.separable_conv2d(x, 256, (3, 3), padding='same', use_bias=False, name='block3_sepconv2')
    x = tf.layers.batch_normalization(x, training=is_training, name='block3_sepconv2_bn')

    x = tf.layers.max_pooling2d(x, (3, 3), strides=(2, 2), padding='same', name='block3_pool')
    x = tf.add(x, residual, name="block3_add")

    residual = tf.layers.conv2d(x, 728, (1, 1), strides=(2, 2), padding='same', use_bias=False)
    residual = tf.layers.batch_normalization(residual, training=is_training)

    x = tf.nn.relu(x, name='block4_sepconv1_act')
    x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name='block4_sepconv1')
    x = tf.layers.batch_normalization(x, training=is_training, name='block4_sepconv1_bn')
    x = tf.nn.relu(x, name='block4_sepconv2_act')
    x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name='block4_sepconv2')
    x = tf.layers.batch_normalization(x, training=is_training, name='block4_sepconv2_bn')

    x = tf.layers.max_pooling2d(x, (3, 3), strides=(2, 2), padding='same', name='block4_pool')
    x = tf.add(x, residual, name="block4_add")

    for i in range(8):
        residual = x
        prefix = 'block' + str(i + 5)

        x = tf.nn.relu(x, name=prefix + '_sepconv1_act')
        x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name=prefix + '_sepconv1')
        x = tf.layers.batch_normalization(x, training=is_training, name=prefix + '_sepconv1_bn')
        x = tf.nn.relu(x, name=prefix + '_sepconv2_act')
        x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name=prefix + '_sepconv2')
        x = tf.layers.batch_normalization(x, training=is_training, name=prefix + '_sepconv2_bn')
        x = tf.nn.relu(x, name=prefix + '_sepconv3_act')
        x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name=prefix + '_sepconv3')
        x = tf.layers.batch_normalization(x, training=is_training, name=prefix + '_sepconv3_bn')

        x = tf.add(x, residual, name=prefix+"_add")

    residual = tf.layers.conv2d(x, 1024, (1, 1), strides=(2, 2), padding='same', use_bias=False)
    residual = tf.layers.batch_normalization(residual, training=is_training)

    x = tf.nn.relu(x, name='block13_sepconv1_act')
    x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name='block13_sepconv1')
    x = tf.layers.batch_normalization(x, training=is_training, name='block13_sepconv1_bn')
    x = tf.nn.relu(x, name='block13_sepconv2_act')
    x = tf.layers.separable_conv2d(x, 1024, (3, 3), padding='same', use_bias=False, name='block13_sepconv2')
    x = tf.layers.batch_normalization(x, training=is_training, name='block13_sepconv2_bn')

    x = tf.layers.max_pooling2d(x, (3, 3), strides=(2, 2), padding='same', name='block13_pool')
    x = tf.add(x, residual, name="block13_add")

    x = tf.layers.separable_conv2d(x, 1536, (3, 3), padding='same', use_bias=False, name='block14_sepconv1')
    x = tf.layers.batch_normalization(x, training=is_training, name='block14_sepconv1_bn')
    x = tf.nn.relu(x, name='block14_sepconv1_act')

    x = tf.layers.separable_conv2d(x, 2048, (3, 3), padding='same', use_bias=False, name='block14_sepconv2')
    x = tf.layers.batch_normalization(x, training=is_training, name='block14_sepconv2_bn')
    x = tf.nn.relu(x, name='block14_sepconv2_act')
    # replace conv layer with fc
    x = tf.layers.average_pooling2d(x, (3, 3), (2, 2), name="global_average_pooling")
    x = tf.layers.conv2d(x, 2048, [1, 1], activation=None, name="block15_conv1")
    x = tf.layers.conv2d(x, classes, [1, 1], activation=None, name="block15_conv2")
    x = tf.squeeze(x, axis=[1, 2], name="logits")
    return x


def model_fn(features, labels, mode, params):
    """
    Model_fn for estimator model
    Args:
        features (Tensor): Input features to the model.
        labels (Tensor): Labels tensor for training and evaluation.
        mode (ModeKeys): Specifies if training, evaluation or prediction.
        params (HParams): hyper-parameters for estimator model
    Returns:
        (EstimatorSpec): Model to be run by Estimator.
    """
    # check if training stage
    if mode == tf.estimator.ModeKeys.TRAIN:
        is_training = True
    else:
        is_training = False
    # is_training = False   # 1
    input_tensor = features["input"]
    logits = xception(input_tensor, classes=_NUM_CLASSES, is_training=is_training)
    probs = tf.nn.softmax(logits, name="output_score")
    predictions = tf.argmax(probs, axis=-1, name="output_label")
    onehot_labels = tf.one_hot(tf.cast(labels, tf.int32), _NUM_CLASSES)
    # provide a tf.estimator spec for PREDICT
    predictions_dict = {"score": probs,
                        "label": predictions}
    if mode == tf.estimator.ModeKeys.PREDICT:
        predictions_output = tf.estimator.export.PredictOutput(predictions_dict)
        return tf.estimator.EstimatorSpec(mode=mode,
                                          predictions=predictions_dict,
                                          export_outputs={
                                              tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: predictions_output
                                          })
    # calculate loss
    # loss = focal_loss(onehot_labels, logits, gamma=1.5)
    gamma = 1.5
    weights = tf.reduce_sum(tf.multiply(onehot_labels, tf.pow(1. - probs, gamma)), axis=-1)
    loss = tf.losses.softmax_cross_entropy(onehot_labels, logits, weights=weights)
    accuracy = tf.metrics.accuracy(labels=labels,
                                   predictions=predictions)
    if mode == tf.estimator.ModeKeys.TRAIN:
        lr = params.learning_rate
        # train optimizer
        optimizer = tf.train.RMSPropOptimizer(learning_rate=lr, decay=0.9)
        update_ops = tf.get_collections(tf.GraphKeys.UPDATE_OPS)
        with tf.control_dependencies(update_ops):
          train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
        tensors_to_log = {'batch_accuracy': accuracy[1],
                          'logits': logits,
                          'label': labels}
        logging_hook = tf.train.LoggingTensorHook(tensors=tensors_to_log, every_n_iter=1000)  
        return tf.estimator.EstimatorSpec(mode=mode,
                                          loss=loss,
                                          train_op=train_op,
                                          training_hooks=[logging_hook])
    else:
        eval_metric_ops = {"accuracy": accuracy}
        return tf.estimator.EstimatorSpec(mode=mode,
                                          loss=loss,
                                          eval_metric_ops=eval_metric_ops)


def get_estimator_model(config=None, params=None):
    """
    Get estimator model by definition of model_fn
    """
    est_model = tf.estimator.Estimator(model_fn=model_fn,
                                       config=config,
                                       params=params)
    return est_model


def train(source_dir, model_save_path):
    """
    Train patch based model
    Args:
        source_dir: a directory where training tfrecords file stored. All TF records start with train will be used!
        model_save_path: weights save path
    """
    if tf.gfile.Exists(source_dir):
        train_data_paths = tf.gfile.Glob(source_dir+"/train*tfrecord")
        val_data_paths = tf.gfile.Glob(source_dir+"/val*tfrecord")
        if not len(train_data_paths):
            raise Exception("[Train Error]: unable to find train*.tfrecord file")
        if not len(val_data_paths):
            raise Exception("[Eval Error]: unable to find val*.tfrecord file")
    else:
        raise Exception("[Train Error]: unable to find input directory!")
    train_config = tf.estimator.RunConfig()
    new_config = train_config.replace(model_dir=model_save_path,
                                      save_checkpoints_steps=1000,
                                      keep_checkpoint_max=5)
    params = tf.contrib.training.HParams(
        learning_rate=0.001,
        train_steps=5000,
        min_eval_frequency=1000
    )
    est_model = get_estimator_model(config=new_config,
                                    params=params)
    # define training config
    train_spec = tf.estimator.TrainSpec(input_fn=lambda: input_fn(data_path=train_data_paths,
                                                                  batch_size=_BATCH_SIZE,
                                                                  is_training=True),
                                        max_steps=_MAX_STEPS)

    eval_spec = tf.estimator.EvalSpec(input_fn=lambda: input_fn(data_path=val_data_paths,
                                                                batch_size=_BATCH_SIZE),
                                      steps=100,
                                      throttle_secs=900)
    # train and evaluate model
    tf.estimator.train_and_evaluate(estimator=est_model,
                                    train_spec=train_spec,
                                    eval_spec=eval_spec)


def evaluate(source_dir, model_save_path):
    """
    Eval patch based model
    Args:
        source_dir: directory where val tf records file stored. All TF records start with val will be used!
        model_save_path: model save path
    """
    # load model
    run_config = tf.contrib.learn.RunConfig(model_dir=model_save_path)
    est_model = get_estimator_model(run_config)
    # load test dataset
    if tf.gfile.Exists(source_dir):
        val_data_paths = tf.gfile.Glob(source_dir+"/val*.tfrecord")
        if not len(val_data_paths):
            raise Exception("[Eval Error]: unable to find val*.tfrecord file")
    else:
        raise Exception("[Train Error]: unable to find input directory!")
    accuracy_score = est_model.evaluate(input_fn=lambda: input_fn(val_data_paths,
                                                                  batch_size=_BATCH_SIZE,
                                                                  is_training=False))
    print("Accuracy of testing images: {}".format(accuracy_score))

注:

  • BN layer in estimator: : 對於問題#1,在包含BN層的網絡,正常狀況下都是須要明確指定是否處於training狀態(BN在訓練和其餘狀態的計算方式是不一樣的),可是在 1.4.1 版本下,使用estimator進行訓練的時候是正常(is_training=True),可是在evaluation的時候(is_training=False)輸出的精度是初始化狀態,而且隨着train_and_evaluate()的屢次調用,eval loss會變得很大。這應該是一個bug(具體問題可查看issue13895), 16455。而參考tensorflow官方給出的給予estimator的resnet example中的BN layer在train stage的時候並無設置is_training=True,因此這裏把is_training關閉了,這樣操做訓練是正常的。 這個錯誤是因爲Tensorflow tf.layers.batch_normalization()函數調用方式不對致使的,根據這個函數的[官方文檔](https://www.tensorflow.org/api_docs/python/tf/layers/batch_normalization):

    Note: when training, the moving_mean and moving_variance need to be updated. By default the update ops are placed in tf.GraphKeys.UPDATE_OPS, so they need to be added as a dependency to the train_op.

    須要明確手動添加update_ops做爲dependency到update_ops:

    update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
    with tf.control_dependencies(update_ops):
      train_op = optimizer.minimize(loss)
    此外,輸入的圖片的channel須要爲3
  • TensorHook for logging: 在使用 tensorhook 打印訓練日誌的時候須要明確指tf.logging的verbosity level: tf.logging.set_verbosity(tf.logging.INFO)
  • estimator自定義model_fn的時候須要根據model_fn的使用狀態返回不一樣的EstimatorSpec,model_fn的定義須要符合如下格式,具體能夠查看官方文檔
def model_fn(features, labels, mode, params):
   # Logic to do the following:
   # 1. Configure the model via TensorFlow operations
   # 2. Define the loss function for training/evaluation
   # 3. Define the training operation/optimizer
   # 4. Generate predictions
   # 5. Return predictions/loss/train_op/eval_metric_ops in EstimatorSpec object
   return EstimatorSpec(mode, predictions, loss, train_op, eval_metric_ops)

model_fn有三種mode:tf.estimator.ModeKeys.TRAINtf.estimator.ModeKeys.EVALtf.estimator.ModeKeys.PREDICT,其中第一個必需要返回的爲losstraining_op, 第二個是loss,第三個是prediction

  • tf.metrics.accuracy: 通常狀況下EVAL的時候model_fn會返回eval_metric_ops,這個是一個dict,其值爲tf.metrics的返回值(以tf.metrics.accuracy()爲例,返回值以下):

    accuracy: A Tensor representing the accuracy, the value of total divided by count.

    update_op: An operation that increments the total and count variables appropriately and whose value matches accuracy.

    可是這個函數在estimator模型中使用的時候有點反直覺:

    tf.metrics.accuracy is not meant to compute the accuracy of a single batch. It returns both the accuracy and an update_op, and update_op is intended to be run every batch, which updates the accuracy.

    因此在上面代碼中對每一個batch算accuracy的時候算的是update_op而非accuracy,更多關於tf.metrics.accuracy()的討論能夠看這倆個issue:15115, 9498

Export saved model

Freeze model/export savedmodel for Tensorflow Serving
使用estimator進行訓練的一大好處就是進行分佈式部署的擴展很容易,而部署須要的模型結構爲(具體看這裏):

image3
從訓練的ckpt模型文件freeze從pb(protobuf)模型文件,estimator提供了export_savedmodel來幫助快速進行操做。

def serving_input_receiver_fn():
    """
    Build serving inputs
    """
    inputs = tf.placeholder(dtype=tf.string, name="input_image")
    feature_config = {'image/encoded': tf.FixedLenFeature(shape=[], dtype=tf.string)}
    tf_example = tf.parse_example(inputs, feature_config)
    patch_images = tf.map_fn(_preprocess_image, tf_example["image/encoded"], dtype=tf.float32)
    patch_images = tf.squeeze(patch_images, axis=[0])
    receive_tensors = {'example': inputs}
    features = {"input": patch_images}
    return tf.estimator.export.ServingInputReceiver(features, receive_tensors)


def save_serving_model():
    session_config = tf.ConfigProto(gpu_options=tf.GPUOptions(allow_growth=True))
    config = tf.estimator.RunConfig(model_dir=MODEL_WEIGHTS_PATH, session_config=session_config)  # session_config is used for configuration of session
    model = get_estimator_model(config=config)
    model.export_savedmodel(export_dir_base=SERVING_MODEL_SAVE_PATH, serving_input_receiver_fn=serving_input_receiver_fn)

若是須要在預測的時候調整sess的參數(ConfigProto),能夠經過tf.estimator.RunConfig配置session_config的參數而後再輸入到tf.estimator.Estimator()的config參數(如上面所示)。

predict

通常經過export_savedmodel方式輸出的模型是用tensorflow serving封裝服務進行預測:

# start tensorflow serving
$ bazel-bin/tensorflow_serving/model_servers/tensorflow_model_server --port=9000 --model_name=your_model_name --model_base_path=/your/model/base/path
server = "localhost:9000"

def serving_predict(face_img):
    """
    Get prediction based on given face image
    Args:
        face_img: input image file
    Returns:
        anti-spoof detection result of given face
    """
    host, port = server.split(":")
    channel = implementations.insecure_channel(host, int(port))
    stub = prediction_service_pb2.beta_create_PredictionService_stub(channel)
    # creating request
    request = predict_pb2.PredictRequest()
    request.model_spec.name = "face_anti_spoof"
    request.model_spec.signature_name = "predict_images"
    # parse face img for serving
    with open(face_img, "rb") as f:
        serialized = f.read()
    example = tf.train.Example(features=tf.train.Features(feature={"image/encoded": bytes_feature(serialized)}))
    serialized = example.SerializeToString()
    request.inputs['inputs'].CopyFrom(tf.contrib.util.make_tensor_proto(serialized, shape=[1]))
    # predict results
    content = stub.Predict(request, timeout=5)
    labels = content.outputs["label"].int64_val
    prob = np.sum(labels) / len(labels)
    if prob > 0.5:
        label = LABELS[1]
    else:
        label = LABELS[0]
        prob = 1 - prob
    info = {"label": label, "prob": prob}
    return info

具體關於tensorlfow serving的用法會在以後的文章裏面展開。

而生成的pb文件也能夠直接加載不經過tensorflow serving來進行預測,不過因爲estimator的export_savedmodel輸出的pb文件中模型的保存格式爲tf.saved_model類,因此加載模型用tf.saved_model.loader模塊進行操做會比較方便(sess.restore()加載這個pb模型很容易報錯= =):

def load_graph(trained_model):
    session_config = tf.ConfigProto()
    session_config.gpu_options.allow_growth = True
    sess = tf.Session(config=session_config)
    meta_graph_def = tf.saved_model.loader.load(sess, [tf.saved_model.tag_constants.SERVING], trained_model)
    # signature = meta_graph_def.signature_def
    # signature_key = tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY
    # print(signature[signature_key])
    return sess
sess = load_graph(model_path)

def predict_pb(image):
    """
    Predict input face is real or not using model stored in save_model.pb generated by estimator export_savedmodel
    Args:
        image: input image file
    """
    assert tf.gfile.Exists(image), "Input image file not found!"
    with open(image, "rb") as f:
        image_bytes = f.read()
    example = tf.train.Example(features=tf.train.Features(feature={"image/encoded": bytes_feature(image_bytes)}))
    serialized = example.SerializeToString()
    input_tensor = sess.graph.get_tensor_by_name("input_image:0")
    labels = sess.graph.get_tensor_by_name("output_label:0")
    scores = sess.graph.get_tensor_by_name("output_score:0")
    (labels, scores) = sess.run([labels, scores], feed_dict={input_tensor: [serialized]})
    return labels, scores

參考

  1. estimator tutorial
  • https://www.tensorflow.org/versions/r1.5/get_started/custom_estimators
  • https://developers.googleblog.com/2017/12/creating-custom-estimators-in-tensorflow.html
  • https://www.tensorflow.org/extend/estimators
  • https://www.damienpontifex.com/2017/09/19/mnist-with-tensorflow-experiments-and-estimators/
  1. estimator example
  • https://github.com/tensorflow/models/blob/master/official/mnist/mnist.py
  • https://github.com/tensorflow/tensorflow/blob/master/tensorflow/examples/learn/resnet.py
  • https://github.com/tensorflow/models/blob/master/samples/outreach/blogs/blog_custom_estimators.py
  • https://github.com/tensorflow/tensorflow/blob/master/tensorflow/examples/learn/iris_custom_model.py
  1. discussion
  • https://github.com/tensorflow/tensorflow/issues/1122
  • https://github.com/tensorflow/tensorflow/issues/15115
  • https://github.com/tensorflow/tensorflow/issues/9498
  1. tf.layers
  • https://www.tensorflow.org/tutorials/layers
  • https://www.tensorflow.org/api_docs/python/tf/layers
  1. BN layer
  • http://ruishu.io/2016/12/27/batchnorm/
  • https://github.com/tensorflow/tensorflow/issues/4361
  1. export & load saved model and predict
  • https://stackoverflow.com/questions/43667018/locally-load-saved-tensorflow-model-pb-from-google-cloud-machine-learning-engin
  • https://stackoverflow.com/questions/33759623/tensorflow-how-to-save-restore-a-model
  • https://stackoverflow.com/questions/46513923/tensorflow-how-and-why-to-use-savedmodel
  • https://stackoverflow.com/questions/46098863/how-to-import-an-saved-tensorflow-model-train-using-tf-estimator-and-predict-on
  • https://www.tensorflow.org/api_docs/python/tf/saved_model/loader
  • https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/saved_model/README.md
  • https://github.com/tensorflow/serving/blob/master/tensorflow_serving/example/inception_saved_model.py
  • https://github.com/tensorflow/serving/issues/478
  1. estimator gpu options setting
  • https://github.com/tensorflow/tensorflow/issues/9831
  • https://github.com/tensorflow/tensorflow/issues/13458

4. keras + estimator + dataset

大部分與上面相同,只是用keras.estimator.model_to_estimator()函數直接將keras model直接轉換成estimator model而不用本身定義model_fn,這種方式對於模型訓練十分簡單友好,可是在使用Tensorflow estimator的export_savedmodel()函數的時候會報錯,緣由是缺乏export_outputs(在自定義model_fn的時候好像也並無須要明肯定義export_outputs變量,很是微妙的報錯),而用官方的作法轉化模型的時候又會報 BN層參數沒有初始化 的錯誤(Attempting to use uninitialized value bn_xx)(又是BN層, 跟以前同樣多是因爲graph重用引發的錯誤),所以用這種方式訓練的模型暫時尚未找到轉化成Tensorflow serving所需的模型的方法。

def model_fn(model_dir=".", input_shape=(_PATCH_SIZE, _PATCH_SIZE, 3)):
    """
    Convert keras model to estimator model
    Args:
        model_dir: absolute model save directory
        input_shape: input image shape (not include batch)

    Returns: an estimator model instance
    """
    with K.name_scope("model"):
        base_model = tf.keras.applications.Xception(include_top=True,
                              weights=None,   # no pre-trained weights used
                              pooling="max",
                              input_shape=input_shape,  # modify first layer
                              classes=_NUM_CLASSES)
        base_model.summary()
        model_input = keras.Input(shape=input_shape, dtype=tf.float32, name="input_1")
        model_output = base_model(model_input)
        model = keras.models.Model(inputs=model_input, outputs=model_output)
        optimizer = keras.optimizers.RMSprop(lr=2e-3, decay=0.9)
        model.compile(optimizer=optimizer,
                      loss=focal_loss,
                      metrics=['accuracy'])
        if model_dir == ".":
            model_dir = os.path.dirname(os.path.abspath(__file__))
        elif not model_dir.startswith("/"):  # relative path to abs path
            model_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), model_dir)
        if not os.path.exists(model_dir):
            os.makedirs(model_dir)
        # define training config
        run_config = tf.estimator.RunConfig(save_checkpoints_steps=2000,
                                            keep_checkpoint_max=5)
        # convert keras model to estimator model
        est_model = keras.estimator.model_to_estimator(model, model_dir=model_dir, config=run_config)
        return est_model

參考

  • https://www.dlology.com/blog/an-easy-guide-to-build-new-tensorflow-datasets-and-estimator-with-keras-model/
  • https://cloud.google.com/blog/big-data/2017/12/new-in-tensorflow-14-converting-a-keras-model-to-a-tensorflow-estimator
  • https://github.com/tensorflow/tensorflow/issues/14198

5. estimator + experiment + datasets

在1.4中estimator model有train_and_evaluate()函數進行開始模型的train和evaluate,也能夠直接用model.train()model.evaluate()函數(相似keras model的fit和evaluate函數);不過能夠用tf.contrib.learn.Experiment()來代替(須要定義experiment類,比較麻煩)。大部分與estimator model相同,須要定義experiment_fn以下:

from tensorflow.contrib.learn import learn_runner

def experiment_fn(run_config, params):
    """
    Create an experiment to train and evaluate the model
    Args:
        run_config: configuration for estimator
        params: Hparams object returned by tf.contrib.training.HParams
    Returns:
        Experiment object
    """
    if tf.gfile.Exists(FLAGS.train_data_dir) and tf.gfile.Exists(FLAGS.val_data_dir):
        train_data_paths = tf.gfile.Glob(FLAGS.train_data_dir+"/train*tfrecord")
        val_data_paths = tf.gfile.Glob(FLAGS.val_data_dir+"/val*tfrecord")
        if not len(train_data_paths):
            raise Exception("[Train Error]: unable to find train*.tfrecord file")
        if not len(val_data_paths):
            raise Exception("[Eval Error]: unable to find val*.tfrecord file")
    else:
        raise Exception("[Train Error]: unable to find input directory!")
    estimator = get_estimator_model(config=run_config, params=params)

    def train_input_fn():
        return input_fn(data_path=train_data_paths, batch_size=_BATCH_SIZE, is_training=True)

    def eval_input_fn():
        return input_fn(data_path=val_data_paths, batch_size=_BATCH_SIZE, is_training=False)

    experiment = tf.contrib.learn.Experiment(
        estimator=estimator,
        train_input_fn=train_input_fn,
        eval_input_fn=eval_input_fn,
        train_steps=params.train_steps,
        min_eval_frequency=params.min_eval_frequency,
    )
    return experiment


def run_experiment():
    """
    Run the training experiment
    """
    params = tf.contrib.training.HParams(
        learning_rate=0.002,
        n_classes=_NUM_CLASSES,
        train_steps=5000,
        min_eval_frequency=100
    )
    run_config = tf.contrib.learn.RunConfig(model_dir=FLAGS.save_dir,
                                            save_checkpoints_steps=1000,
                                            keep_checkpoint_max=5)
    experiment = experiment_fn(run_config=run_config, params=params)
    experiment.train_and_evaluate()
    # learn_runner.run(
    #     experiment_fn=experiment_fn,  # first class function
    #     run_config=run_config,
    #     schedule='train_and_evaluate',
    #     hparams=params
    # )

參考

  • https://medium.com/onfido-tech/higher-level-apis-in-tensorflow-67bfb602e6c0
相關文章
相關標籤/搜索