做者:陳戊超(仲卓),阿里巴巴技術專家java
深度學習技術在當代社會發揮的做用愈來愈大。目前深度學習被普遍應用於個性化推薦、商品搜索、人臉識別、機器翻譯、自動駕駛等多個領域,此外還在向社會各個領域迅速滲透。node
當前,深度學習的應用愈來愈多樣化,隨之涌現出諸多優秀的計算框架。其中 TensorFlow,PyTorch,MXNeT 做爲普遍使用的框架更是備受矚目。在將深度學習應用於實際業務的過程當中,每每須要結合數據處理相關的計算框架如:模型訓練以前須要對訓練數據進行加工生成訓練樣本,模型預測過程當中須要對處理數據的一些指標進行監控等。在這樣的狀況下,數據處理和模型訓練分別須要使用不一樣的計算引擎,增長了用戶使用的難度。python
本文將分享如何使用一套引擎搞定機器學習全流程的解決方案。先介紹一下典型的機器學習工做流程。如圖所示,整個流程包含特徵工程、模型訓練、離線或者是在線預測等環節。算法
在此過程當中,不管是特徵工程、模型訓練仍是模型預測,中間都會產生日誌。須要先用數據處理引擎好比 Flink 對這些日誌進行分析,而後進入特徵工程。再使用深度學習的計算引擎 TensorFlow 進行模型訓練和模型預測。當模型訓練好了之後再用 tensor serving 作在線的打分。框架
上述流程雖然能夠跑通,但也存在必定的問題,好比:機器學習
針對以上問題,咱們經過結合 Flink 和 TensorFlow,將 TensorFlow 的程序跑在 Flink 集羣上的這種方式來解決,總體流程以下:分佈式
特徵工程用 Flink 去執行,模型訓練和模型的準實時預測目標使 TensorFlow 計算引擎能夠跑在 Flink 集羣上。這樣就能夠用 Flink 一套計算引擎去支持模型訓練和模型的預測,部署上更簡單的同時也節約了資源。學習
Flink 是一款開源大數據分佈式計算引擎,在 Flink 裏全部的計算都抽象成 operator,如上圖所示,數據讀取的節點叫 source operator,輸出數據的節點叫 sink operator。source 和 sink 中間有多種多樣的 Flink operator 去處理,上圖的計算拓撲包含了三個 source 和兩個 sink。大數據
機器學習分佈式運行拓撲以下圖所示:spa
在一個機器學習的集羣當中,常常會對一組節點(node)進行分組,如上圖所示,一組節點能夠是 worker(運行算法),也能夠是 ps(更新參數)。
如何將 Flink 的 operator 結構與 Machine Learning 的 node、Application Manager 角色結合起來?下面將詳細講解 flink-ai-extended 的抽象。
首先,對機器學習的 cluster 進行一層抽象,命名爲 ML framework,同時機器學習也包含了 ML operator。經過這兩個模塊,能夠把 Flink 和 Machine Learning Cluster 結合起來,而且能夠支持不一樣的計算引擎,包括 TensorFlow。
以下圖所示:
在 Flink 運行環境上,抽象了 ML Framework 和 ML Operator 模塊,負責鏈接 Flink 和其餘計算引擎。
ML Framework 分爲 2 個角色。
在上述過程當中,還能夠對 Application Manager 和 node 進行進一步的抽象,Application Manager 裏面咱們單獨把 state machine 的狀態機作成可擴展的,這樣就能夠支持不一樣類型的做業。
深度學習引擎,能夠本身定義其狀態機。從 node 的節點抽象 runner 接口,這樣用戶就能夠根據不一樣的深度學習引擎去自定義運行算法程序。
ML Operator 模塊提供了兩個接口:
利用 ML Operator 提供的接口,能夠實現 Flink Operator 中包含一個Application Manager 及 3 組 node 的角色,這三組 node 分別叫 role a、 role b,、role c,三個不一樣角色組成機器學習的一個 cluster。如上圖代碼所示。Flink 的 operator 與機器學習做業的 node 一一對應。
機器學習的 node 節點運行在 Flink 的 operator 裏,須要進行數據交換,原理以下圖所示:
Flink operator 是 java 進程,機器學習的 node 節點通常是 python 進程,java 和 python 進程經過共享內存交換數據。
TensorFlow On Flink
TensorFlow 分佈式訓練通常分爲 worker 和 ps 角色。worker 負責機器學習計算,ps 負責參數更新。下面將講解 TensorFlow 如何運行在 Flink 集羣中。
Batch 模式下,樣本數據能夠是放在 HDFS 上的,對於 Flink 做業而言,它會起一個source 的 operator,而後 TensorFlow 的 work 角色就會啓動。如上圖所示,若是 worker 的角色有三個節點,那麼 source 的並行度就會設爲 3。同理下面 ps 角色有 2 個,因此 ps source 節點就會設爲 2。而 Application Manager 和別的角色並無數據交換,因此 Application Manager 是單獨的一個節點,所以它的 source 節點並行度始終爲 1。這樣 Flink 做業上啓動了三個 worker 和兩個 ps 節點,worker 和 ps 之間的通信是經過原始的 TensorFlow 的 GRPC 通信來實現的,並非走 Flink 的通訊機制。
如上圖所示,前面有兩個 source operator,而後接 join operator,把兩份數據合併爲一份數據,再加自定義處理的節點,生成樣本數據。在 stream 模式下,worker 的角色是經過 UDTF 或者 flatmap 來實現的。
同時,TensorFlow worker node 有3 個,因此 flatmap 和 UDTF 相對應的 operator 的並行度也爲 3, 因爲ps 角色並不去讀取數據,因此是經過 flink source operator 來實現。
下面咱們再講一下,若是已經訓練好的模型,如何去支持實時的預測。
使用 Python 進行預測流程如圖所示,若是 TensorFlow 的模型是分佈式訓練出來的模型,而且這個模型很是大,好比說單機放不下的狀況,通常出如今推薦和搜索的場景下。那麼實時預測和實時訓練原理相同,惟一不一樣的地方是多了一個加載模型的過程。
在預測的狀況下,經過讀取模型,將全部的參數加載到 ps 裏面去,而後上游的數據仍是通過和訓練時候同樣的處理形式,數據流入到 worker 這樣一個角色中去進行處理,將預測的分數再寫回到 flink operator,而且發送到下游 operator。
如圖所示,模型單機進行預測時就不必再去起 ps 節點,單個 worker 就能夠裝下整個模型進行預測,尤爲是使用 TensorFlow 導出 save model。同時,由於 saved model 格式包含了整個深度學習預測的所有計算邏輯和輸入輸出,因此不須要運行 Python 的代碼就能夠進行預測。
此外,還有一種方式能夠進行預測。前面 source、join、UDTF 都是對數據進行加工處理變成預測模型能夠識別的數據格式,在這種狀況下,能夠直接在 Java 進程裏面經過 TensorFlow Java API,將訓練好的模型 load 到內存裏,這時會發現並不須要 ps 角色, worker 角色也都是 Java 進程,並非 Python 的進程,因此咱們能夠直接在 Java 進程內進行預測,而且能夠將預測結果繼續發給 Flink 的下游。
在本文中,咱們講解了 flink-ai-extended 原理,以及Flink 結合 TensorFlow 如何進行模型訓練和預測。但願經過本文大分享,你們可以使用 flink-ai-extended, 經過 Flink 做業去支持模型訓練和模型的預測。