tensorflow踩坑合集2. TF Serving & gRPC 踩坑

這一章咱們藉着以前的NER的模型聊聊tensorflow serving,以及gRPC調用要注意的點。如下代碼爲了方便理解作了簡化,完整代碼詳見Github-ChineseNER ,裏面提供了訓練好的包括bert_bilstm_crf, bilstm_crf_softlexcion,和CWS+NER多任務在內的4個模型,能夠開箱即用。這裏tensorflow模型用的是estimator框架,整個推理環節主要分紅:模型export,warmup,serving, client request四步html

Model Export

要把estimator保存成線上推理的格式,須要額外定義兩個字段,serving的輸出和輸入格式。python

輸出定義

serving的輸出在tf.estimator.EstimatorSpec中定義,比較容易混淆的是EstimatorSpec中有兩個和推理相關的字段predictions和export_outputs,默認predictions是必須傳入,export_outputs是可選傳入。git

差別在於predictions是estimator.predict的返回,而且容許predictions中的字段和features&labels的字段存在重合,例如我常常會把一些用於debug的字段像中文的tokens放在predictions,這些字段既是模型輸入也是predict輸出。github

若是export_outputs=None,estimator會默認用以下方式生成export_output,signature_name='serving_default',字段和predictions徹底相同。docker

export_output = {
    tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:
        tf.estimator.export.PredictOutput(predictions)
}

可是對後面raw tensor輸入的serving input,是不容許export_output和input顯式出現相同字段。因此我習慣單獨定義export_output,只保留線上serving須要返回的預測字段bash

def model_fn(features, labels, mode, params):
    ... build tf graph 
    if mode == tf.estimator.ModeKeys.PREDICT:
        output = {'serving_default':
        tf.estimator.export.PredictOutput({'pred_ids': pred_ids})
        }
        spec = tf.estimator.EstimatorSpec(mode, 
                                          predictions= {'pred_ids': pred_ids,
                                               'label_ids': features['label_ids'],
                                               'tokens': features['tokens']
                                              },
                                        export_outputs=output)
        return spec

輸入定義

serving的輸入在tf.estimator.export.ServingInputReceiver中定義,其中features是傳入模型的特徵格式,receiver_tensors是推理服務的請求格式,這倆啥差異呢?這個要說到serving input有兩種常見的定義方式,一種是傳入序列化後的tf.Example(receiver_tensor),而後按照tf_proto的特徵定義對example進行解析(feature)再輸入模型。這種方式的好處是請求接口一致,無論模型和特徵咋變服務請求字段永遠是example。哈哈還有一個好處就是tf_proto的定義能夠複用dataset裏面的定義好的框架

def serving_input_receiver_fn():
    tf_proto = {
      'token_ids': tf.io.FixedLenFeature([150], dtype=tf.int64),
      'segment_ids': tf.io.FixedLenFeature([150], dtype=tf.int64)
    }

    serialized_tf_example = tf.placeholder(
        dtype=tf.dtypes.string,
        shape=[None],
        name='input_tensor')
    receiver_tensors = {'example': serialized_tf_example}
    features = tf.parse_example(serialized_tf_example, tf_proto)
    ## 可能還會有feature preprocess邏輯在這裏
    return tf.estimator.export.ServingInputReceiver(features, receiver_tensors)

另外一種就是直接用原始特徵請求,這時features和receiver_tensors是同樣滴。這種方式的好處是用saved_model_cli能夠直接檢查serving的input格式,以及在請求特徵size很是大的時候,這種請求能多少節省一點以上序列化所需的時間。ide

def serving_input_receiver_fn():
    token_ids = tf.placeholder(dtype=tf.int64, shape=[None, 150], name='token_ids')
    segment_ids = tf.placeholder(dtype=tf.int64, shape=[None,150], name='segment_ids')
    receiver_tensors  = {'token_ids': token_ids,
                        'segment_ids': segment_ids}
    
    return tf.estimator.export.ServingInputReceiver(receiver_tensors, receiver_tensors)

Export

定義好serving的輸入輸出後,直接export model便可,這裏能夠是訓練完後export。也能夠用已經訓練好的checkpoint來build estimator而後直接export,這裏會默認使用model_dir裏面latest ckpt來export。測試

estimator._export_to_tpu = False
estimator.export_saved_model('serving_model/bilstm_crf', serving_input_receiver_fn)

輸出的模型默認用當前timestamp做爲folder_name, 按須要rename成version=1/2便可ui

而後咱們能夠經過saved_model_cli來檢查模型輸入輸出。圖一是tf.Example類型的輸入,圖二是raw tensor輸入,raw tensor類型的輸入debug更方便一點。

saved_model_cli show --all --dir ./serving_model/bilstm_crf/1

Warm up

在獲得上面的servable model後,在serving前還有一步可選操做,就是加入warm up文件。這主要是由於tensorflow模型啓動存在懶加載的邏輯,部分組件只在請求後才被觸發運行,因此咱們會觀察到第一次(前幾回)請求的latency會顯著的高。warm up簡單說就是在模型文件裏帶上幾條請求的測試數據,在模型啓動後用測試數據先去trigger懶加載的邏輯。具體操做就是在serving model的assets.extra目錄裏寫入請求數據

NUM_RECORDS=5
with tf.io.TFRecordWriter("./serving_model/{}/{}/assets.extra/tf_serving_warmup_requests".format(MODEL, VERSION)) as writer:
    # 生成request的邏輯
    log = prediction_log_pb2.PredictionLog(
        predict_log=prediction_log_pb2.PredictLog(request=req))
    for r in range(NUM_RECORDS):
        writer.write(log.SerializeToString())

Server

server部分比較簡單,比較推薦Docker部署,方便快捷。只須要三步

  1. 下載Docker https://docs.docker.com/get-docker/
  2. 下載和環境適配的Image,不指定版本默認是latest
docker pull tensorflow/serving:1.14.0
  1. 在本地運行運行服務,注意port 8500是給gRPC的,8501是給REST API的不要寫錯
docker run -t --rm -p 8500:8500  \
   -v "$(pwd)/serving_model/${MODEL_NAME}:/models/${MODEL_NAME}" \
   -e MODEL_NAME=${MODEL_NAME}  tensorflow/serving:1.14.0

gRPC client

Demo

這裏咱們以上面tf.Example的serving請求格式,看下如何用gRPC請求服務。請求主要分紅3步:創建通訊,生成request, 請求並解析response

第一步創建通訊

channel = grpc.insecure_channel(‘localhost:8500’)
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)

第二步生成請求

# 獲得tf_feature dict 
example = tf.train.Example(
    features=tf.train.Features(feature=tf_feature)
).SerializeToString() # 獲得example並序列化成string
example = [example] # add batch_size dimension
# 生成request
request = predict_pb2.PredictRequest()
request.model_spec.signature_name = 'serving_default' # set in estimator output
request.model_spec.name = 'bilstm_crf'
request.model_spec.version.value = 1
request.inputs['example'].CopyFrom(
    tensor_util.make_tensor_proto(example, dtype=tf.string)
    )

第三步請求服務,解析response

resp = stub.Predict.future(request, timeout=5)
res = resp.result().outputs
pred_ids = np.squeeze(tf.make_ndarray(res['pred_ids']))

gRPC踩坑

在使用gPRC client的過程當中有幾個可能會踩坑的點,哈哈但不排除出坑的姿式不徹底正確,若是是的話求指正~

Not fork safe,使用多進程要注意!

官方文檔:grpc/fork_support

gRPC並非fork safe的,若是在fork以前建立channel,可能會碰到deadlock或者報錯,取決於你用的gRPC版本。。。我使用的1.36版本會檢查fork,若是channel在fork以前建立且未close,會raise‘ValueError: Cannot invoke RPC: Channel closed due to fork’,以前用的忘記是啥版本的會deadlock。想要在client側使用多進程,合理的方案是在fork以後,在每一個子進程中建立channel,若是主進程有channel須要先close掉。multiprocessing/client 給了一個多進程client的demo

channel重用大法好

官方文檔:Performance Guide

最開始用gRPC我習慣性的在單條請求之後會channel.close,或者用with管理,後來發現channel建立銷燬自己是比較耗時的。看了官方文檔才發現正確使用方式是在整個client生命週期裏複用同一個channel。至於stub,我的感受建立成本很低,複用和每次從channel從新建立差異不大。

channel保活

官方文檔:Keepalive User Guide

上面的channel複用會延伸到channel保活的問題。grpc客戶端默認是長連接,避免了連接創建和銷燬的開銷,但須要keep-alive機制來保證客戶端到服務端的連接持續有效。若是客戶端發送請求的間隔較長,在一段時間沒有請求後,須要知道究竟是server掉線了,仍是真的沒有數據傳輸,這個連接還需不須要保持。grpc經過發送keep-alive ping來保活。

在連接創建後,keep-alive計時器開始,經過如下參數控制是否發送ping,發送的時間,次數,間隔。

  • grpc.keepalive_permit_without_calls,set=1則無請求進行,也能夠發送keepalive ping
  • grpc.http2.max_pings_without_data,沒有數據傳輸的狀況下,最多容許send多少ping,set=0是無限發送
  • grpc.keepalive_time_ms,client發送ping的時間間隔
  • grpc.keepalive_timeout_ms,確認ping應答的超時時間
  • grpc.http2.min_ping_interval_without_data_ms,沒有數據傳輸的狀況下,server容許收到ping的最小時間間隔,小於這個間隔的ping會被認爲是ping strike。這個數值設置要>=以上keepalive_time_ms
  • grpc.http2.max_pring_strikes, server最多容許ping strike的次數,超出會發送GOAWAY自動斷開連接,set=0容許無限次

如下是參數的默認取值

statusCode.UNAVAILABLE,‘connection reset by peer’

針對偶發UNAVAILABLE的報錯,部分狀況多是server部署環境和保活參數的設置有一些衝突,詳見Docker Swarm 部署 gRPC 服務的坑,不過多數狀況下都能被retry解決。grpc issue裏提到一個interceptor 插件如今是experimental API。簡單拆出來就是下面exponential backoff的retry邏輯。果真解決bug兩大法器restart+retry。。。

RETRY_TIEMS = {
    StatusCode.INTERNAL: 1,
    StatusCode.ABORTED: 3,
    StatusCode.UNAVAILABLE: 3,
    StatusCode.DEADLINE_EXCEEDED: 5  # most-likely grpc channel close, need time to reopen
}

def grpc_retry(default_max_retry=3, sleep=0.01):
    def helper(func):
        @wraps(func)
        def handle_args(*args, **kwargs):
            counter = 0
            while True:
                try:
                    return func(*args, **kwargs)
                except RpcError as e:
                    max_retry = RETRY_TIEMS.get(e.code(), default_max_retry)
                    if counter >= max_retry:
                        raise e
                    counter += 1
                    backoff = min(sleep * 2 ** counter, 1) # exponential backoff
                    time.sleep(backoff)  # wait for grpc to reopen channel
        return handle_args
    return helper

Reference

  1. http://www.javashuo.com/article/p-wddwcqfh-wt.html
  2. http://d0evi1.com/tensorflow/serving/estimator_saved_model/
  3. https://zhuanlan.zhihu.com/p/136619485
相關文章
相關標籤/搜索