做者:孫金城(金竹) 整理:韓非java
本文根據 Apache Flink 系列直播課程整理而成,由 Apache Flink PMC,阿里巴巴高級技術專家 孫金城 分享。重點爲你們介紹 Flink Python API 的現狀及將來規劃,主要內容包括:Apache Flink Python API 的前世此生和將來發展;Apache Flink Python API 架構及開發環境搭建;Apache Flink Python API 核心算子介紹及應用。git
Apache Flink 是流批統一的開源大數據計算引擎,在 Flink 1.9.0 版本開啓了新的 ML 接口和全新的Python API架構。那麼爲何 Flink 要增長對 Python 的支持,下文將進行詳細分析。github
Python 自己是很是優秀的開發語言,據 RedMonk 數據統計,除 Java 和 JavaScript 以外,受歡迎度排名第三。shell
RedMonk 是著名的以開發人員爲中心的行業分析公司,其更詳細的分析信息,你們在拿到個人PPT以後,能夠點擊連接進行詳細查閱。好了,那麼Python的火熱,與咱們今天向你們分享的流批統一的大數據計算引擎,Apache Flink有什麼關係呢?帶着這個問題,咱們你們想一想目前與大數據相關的著名的開源組件有哪些呢?好比說最先期的批處理框架Hadoop?流計算平臺Storm,最近異常火熱的Spark?異或其餘領域數倉的Hive,KV存儲的HBase?這些都是很是著名的開源項目,那麼這些項目都無一例外的進行了Python API的支持。apache
Python 的生態已相對完善,基於此,Apache Flink 在 1.9 版本中也投入了大量的精力,去推出了一個全新的 Pyflink。除大數據外,人工智能與Python也有十分密切的關係。windows
從上圖統計數據能夠發現,Python API 自己已經佔機器學習崗位需求語言的 0.129%。相對於 R 語言,Python 語言彷佛更受青睞。Python 做爲解釋型語言,語法的設計哲學是」用一種方法而且只有一種方法來作一件事」。其簡潔和易用性使其成爲了世界上最受歡迎的語言,在大數據計算領域都有着很好的生態建設,同時Python在機器學習 在機器學習方面也有很好的前景,因此咱們在近期發佈的Apache Flink 1.9 以全新的架構推出新的 Python API微信
Flink 是一款流批統一的計算引擎,社區很是重視和關注 Flink 用戶,除 Java 語言或者 Scala 語言,社區但願提供多種入口,多種途徑,讓更多的用戶更方便的使用 Flink,並收穫 Flink 在大數據算力上帶來的價值。所以 Flink 1.9 開始,Flink 社區以一個全新的技術體系來推出 Python API,而且已經支持了大部分經常使用的一些算子,好比如 JOIN,AGG,WINDOW 等。數據結構
在 Flink 1.9 中雖然 Python 可使用 Java 的 User-defined Function,可是還缺少 Python native 的 User-defined function 的定義,因此咱們計劃在 Flink 1.10 中進行支持 Python User-defined function 的支持。並技術增長對數據分析工具類庫 Pandas 的支持,在 Flink 1.11 增長對 DataStream API 和 ML API 的支持。架構
新的 Python API 架構分爲用戶 API 部分,PythonVM 和 Java VM 的通信部分,和最終將做業提交到 Flink 集羣進行運行的部分。那麼 PythonVM 和 JavaVM 是怎樣通信的呢?咱們在Python 端會會有一個 Python 的 Gateway 用於保持和 Java 通信的連接,在 Java 部分有一個 GateWayServer 用於接收 Python 部分的調用請求。框架
關於 Python API 的架構部分,在 1.9 以前,Flink 的 DataSet 和 DataStream 已經有了對 Python API 的支持,可是擁有 DataSet API 和 DataStream API 兩套不一樣的 API。對於 Flink 這樣一個流批統一的流式計算引擎來說,統一的架構相當重要。而且對於已有的 Python DataSet API 和 DataStream API 而言,採用了JPython 的技術體系架構,而 JPython 自己對目前 Python 的 3.X 系列沒法很好的支持,因此 Flink 1.9 發佈後,決定將原有的 Python API 體系架構廢棄,以全新的技術架構出現。這套全新的 Python API 基於 Table API 之上。
Table API 和 Python API 之間的通信採用了一種簡單的辦法,利用 Python VM 和 Java VM 進行通訊。在 Python API 的書寫或者調用過程當中,以某種方式來與 Java API 進行通信。操做 Python API 就像操做 Java 的 Table API同樣。新架構中能夠確保如下內容:
如圖,當 Python 發起對Java的對象請求時候,在 Java 段建立對象並保存在一個存儲結構中,並分配一個 ID 給 Python 端,Python 端在拿到 Java 對象的 ID 後就能夠對這個對象進行操做,也就是說 Python 端能夠操做任何 Java 端的對象,這也就是爲何新的架構能夠保證Python Table API 和 Java Table API功能一致,而且能過服用現有的優化模型。
在新的架構和通信模型下,Python API 調用 Java API 只須要在持有 Java 對象的 ID,將調用方法的名字和參數傳遞給 Java VM,就能完成對 Java Table API 的調用,因此在這樣的架構中開發 Python Table API 與開發 Java Table API 的方式徹底一致,接下來我爲你們詳細介紹如何開發一個簡單的 Python API 做業。
一般來說一個 Python Table Job 通常會分紅四個部分,首先要根據目前的現狀,要決定這個Job 是以批的方式運行,仍是流的方式運行。固然後續版本用戶能夠不考慮,但當前 1.9 版本仍是須要考慮。
在決定第一步以怎樣的方式執行 Job 後,咱們須要瞭解數據從哪裏來,如何定義 Source、結構數據類型等信息。而後須要寫計算邏輯,而後就是對數據進行計算操做,但最終計算的結果須要持久化到某個系統。最後定義 Sink,與 Source 相似,咱們須要定義 Sink Schema,以及每個字段類型。
下面將詳細分享如何用 Python API 寫每一步?首先,咱們建立一個執行環境,對於執行環境自己來說,首先須要一個 ExecutionEnvironment,根本上咱們須要一個 TableEnvironment。那麼在 TableEnvironment 中,有一個參數 Table Config,Table Config 中會有一些在執行過程當中的配置參數,能夠傳遞到 RunTime 層。除此以外,還提供了一些個性化的配置項,能夠在實際業務開發中進行使用。
在拿到 Environment 後,須要對數據源表進行定義,以 CSV 格式文件爲例,用"逗號"分隔,用 Field 來代表這個文件中有哪些字段。那麼會看到,目前裏面用逗號分隔,而且只有一個字段叫 word,類型是 String。
在定義並描述完數據源數據結構轉換成 Table 數據結構後,也就是說轉換到 Table API 層面以後是怎樣的數據結構和數據類型?下面將經過 with_schema 添加字段及字段類型。這裏只有一個字段,數據類型也是 String,最終註冊成一個表,註冊到 catlog 中,就能夠供後面的查詢計算使用了。
建立結果表,當計算完成後須要將這些結果存儲到持久化系統中,以 WordCount 爲例,首先存儲表會有一個 word 以及它的計數兩個字段,一個是 String 類型的 word,另外一個是 Bigint 的計數,而後把它註冊成 Sink。
編寫註冊完 Table Sink 後,再來看如何編寫邏輯。其實用 Python API 寫 WordCount 和 Table API 同樣很是簡單。由於相對於 DataSream 而言 Python API 寫一個 WordCount 只須要一行。好比 group by,先掃描Source表,而後 group by 一個 Word,再進行 Select word 並加上聚合統計Count ,最終將最數據結果插入到結果表裏面中。
那麼WordCount 怎樣才能真正的運行起來?首先須要搭建開發環境,不一樣的機器上可能安裝的軟件版本不同,這裏列出來了一些版本的需求和要求,其中括號中是示例機器上的版本。
第二步,構建一個 Java 的二進制發佈包,以從源代碼進行構建,那麼這一頁面就是從原代碼獲取咱們的主幹代碼,而且拉取 1.9 的分支。固然你們能夠用 Mater,可是 Master 不夠穩定,仍是建議你們在本身學習的過程當中,最好是用 1.9 的分支去作。接下來進行實戰演練環節,首先驗證 PPT 的正確性。首先編譯代碼,示例以下:
//下載源代碼
git clone https://github.com/apache/flink.git
// 拉取1.9分支
cd flink; git fetch origin release-1.9
git checkout -b release-1.9 origin/release-1.9
//構建二進制發佈包
mvn clean install -DskipTests -Dfast
複製代碼
編譯完成後,須要在相應目錄下找到發佈包:
cd flink-dist/target/flink-1.9.0-bin/flink-1.9.0
tar -zcvf flink-1.9.0.tar.gz flink-1.9.0
複製代碼
在構建完 Java 的 API 以後進行檢驗,咱們要構建一個 Python 的發佈包。
由於大多數 Python 的用戶咱們都知道咱們須要 pip install 方式,將須要的依賴庫進行與本地的 Python 環境進行集成或者安裝。
那麼 Flink 也是同樣,PyFlink 也須要打包一個 Pypip 可以識別的資源進行安裝,在實際的使用中,也能夠按這種命令去拷貝,在本身的環境中嘗試。
cd flink-Python;Python setup.py sdist
複製代碼
這個過程只是將 Java 包囊括進來,再把本身 PyFlink 自己模塊的一些 Java 的包和 Python 包打包成一塊兒,它會在 dist 目錄下,有一個 apache-flink-1.9.dev0.tar.gz。
cd dist/
複製代碼
在 dist 目錄的 apache-flink-1.9.dev0.tar.gz 就是咱們能夠用於 pip install 的 PyFlink 包。在1.9版本,除了 Flink Table,還有 Flink Table Blink。Flink 同時會支持兩個 plan,若是你們能夠嘗試,咱們能夠自由的切換是 Flink 原有的 Planner,仍是 Blink 的 Planner,你們能夠去嘗試。完成打包後,就能夠嘗試把包安裝到咱們的實際環境當中。
接下來是一個很是簡單的命令,首先檢查命令的正確性,在執行以前,咱們用 pip 檢查一下 list,咱們要看在已有的包裏有沒有,如今嘗試把剛纔打包的包再安裝。在實際的使用過程當中,若是升級版,也要有這個過程,要把新的包要進行安裝。
pip install dist/*.tar.gz pip list|grep flink 複製代碼
安裝完成後,就能夠用剛纔寫的 WordCount 例子來驗證環境是否正確。驗證一下剛纔的正確性,怎麼驗證?爲了你們方便,能夠直接克隆 enjoyment.code 倉庫。
git clone https://github.com/sunjincheng121/enjoyment.code.git
cd enjoyment.code; Python word_count.py
複製代碼
接下來體驗並嘗試。在這個目錄下,咱們剛纔開發的 WordCount 例子。直接用 Python 或檢驗環境是否 OK。這個時候 Flink Python API 會啓動一個 Mini 的 Cluster,會將剛纔 WordCount Job 進行執行,提交到一個 Mini Cluster 進行執行。如今 Run 的過程當中其實已經在集羣上進行執行了。其實在這個代碼裏面是讀了一個 Source 文件,把結果寫到 CSV 文件,在當前目錄,是有一個 Sink CSV 的。具體的操做步驟能夠查看Flink中文社區視頻Apache Flink Python API 現狀及規劃
IDE 的配置在正常的開發過程當中,其實咱們大部分仍是在本地進行開發的,這裏推薦你們仍是用 Pychram 來開發 Python 相關的邏輯或者 Job。
同時因爲有很大量的截圖存在,也把這些內容整理到了博客當中,你們能夠掃描二維碼去關注和查看那麼一些詳細的注意事項,博客詳細地址:enjoyment.cool。這裏有一個很關鍵的地方,你們要注意,就是可能你的環境中有多種 Python 的環境,這時候選擇的環境必定是剛纔 pip install 環境。具體操做詳見Apache Flink Python API 現狀及規劃。
還有哪些方式來提交 Job 呢?這是一個 CLI 的方式,也就是說真正的提交到一個現有的集羣。首先啓動一個集羣。構建的目錄通常在 target 目錄下,若是要啓動一個集羣,直接啓動就能夠。這裏要說一點的是,其中一個集羣外部有個 Web Port,它的端口的地址都是在 flink-conf.yaml 配置的。按照 PPT 中命令,能夠去查看日誌,看是否啓動成功,而後從外部的網站訪問。若是集羣正常啓動,接下來看如何提交 Job 。
Flink 經過 run 提交做業,示例代碼以下:
./bin/flink run -py ~/training/0806/enjoyment.code/myPyFlink/enjoyment/word_count_cli.py
複製代碼
用命令行方式去執行,除了用 PY 參數,還能夠指定 Python 的 module,以及其餘一些依賴的資源文件、JAR等。
在 1.9 版本中還爲你們提供一種更便利的方式,就是以 Python Shell 交互式的方式來寫 Python API 拿到結果。有兩種方式可執行,第一種方式是 Local,第二種方式 Remote,其實這兩種沒有本質的差別。首先來看 Local ,命令以下:
bin/pyflink-shell.sh local
複製代碼
啓動一個mini Cluster ,當輸出後,會出來一個 Python 的 Flink CLI 同時會有一些示例程序,供你們來體驗,按照上面的案例就可以達到正確的輸出和提交,既能夠寫 Streaming,也能夠寫 Batch。詳細步驟你們參考視頻操做便可。
到目前爲止,你們應該已經對 Flink 1.9 上 Python API 架構有了大概瞭解,同時也瞭解到如何搭建 Python API 環境。而且以一個簡單的 WordCount 示例,體驗如何在 IDE 裏面去執行程序,如何以 Flink run 和交互式的方式去提交 Job。同時也體驗了現有一些交互上的一種方式來使用 Flink Python API。那麼介紹完了整個 Flink 的一些環境搭建和一個簡單的示例後。接下來詳細介紹一下在1.9裏面全部的核心算子。
上面分享建立一個 Job 的過程,第一要選擇執行的方式是Streaming仍是Batch;第二個要定義使用的表,Source、Schema、數據類型;第三是開發邏輯,同時在寫 WordCount 時,使用 Count 的函數。最後,在 Python API 裏面內置了不少聚合函數,可使用count,sum, max,min等等。
因此在目前 Flink 1.9 版本中,已經可以知足大多數常規需求。除了剛纔講到的 count。Flink Table API 算子 1.9 中也已經支持。關於 Flink Table API 算子,不管是 Python Table API 仍是 Java 的Table API,都有如下幾種類型的操做。第一單流上的操做,好比說作一些SELECT、Filter,同時還能夠在流上作一些聚合,包括開窗函數的 windows 窗口聚合以及列的一些操做,好比最下面的 add_columns 和 drop_columns。
除了單流,還有雙流的操做,好比說雙流 JOIN、雙流 minus、union ,這些算子在Python Table API 裏面都提供了很好的支持。Python Table API 在 Flink 1.9 中,從功能的角度看幾乎徹底等同於Java Table API,下面以實際代碼來看上述算子是怎麼編寫的以及怎麼去開發Python算子。 2.Python Table API 算子-Watermark定義
細心的同窗可能會注意到,咱們還沒有提到流的一個特質性 -> 時序。流的特性是來的順序是可能亂序,而這種亂序又是流上客觀存在的一種狀態。在 Flink 中通常採用 Watermark 機制來解決這種亂序的問題。
在 Python API 中如何定義 Watermark?假設有一個 JSON 數據,a 字段 String,time 字段 datetime。這個時候定義 Watermark 就要在增長 Schema 時增長 rowtime 列。rowtime 必須是 timestamps 類型。
Watermark 有多種定義方式,上圖中 watermarks_periodic_bounded 即會週期性的去發 Watermark,6萬單位是毫秒。若是數據是亂序的,可以處理一分鐘以內的亂序,因此這個值調的越大,數據亂序接受程度越高,可是有一點數據的延遲也會越高。關於 Watermark 原理你們能夠查看個人blog: 1t.click/7dM。
最後,跟你們分享一下 Java UDF在 Flink 1.9 版本中的應用, 雖然在1.9中不支持 Python 的 UDF ,但 Flink 爲你們提供了能夠在 Python 中使用 Java UDF。在 Flink 1.9 中,對 Table 模塊進行了優化和重構,目前開發 Java UDF 只須要引入 Flink common 依賴就能夠進行 Python API 開發。
接下來以一個具體的示例給你們介紹利用 Java UDF 開發 Python API UDF,假設咱們開發一個求字符串長度的 UDF,在 Python 中須要用 Java 中的 register_java_function,function 的名字是包全路徑。而後在使用時,就能夠用註冊的名字完成UDF的調用,詳細能夠查閱個人Blog: 1t.click/HQF。
那怎樣來執行?能夠用 Flink run 命令去執行,同時須要將UDF的JAR包攜帶上去。
Java UDF 只支持 Scalar Function?其實否則,在 Java UDF中既支持 Scalar Function,也支持 Table Function和Aggregate Function。以下所示:
上面所講到的一些東西,有一些長鏈的文檔和連接,也放在PPT上方便你們查閱,同時最下面我也有我的博客。但願對你們有幫助。
簡單的總結一下,本篇首先是介紹了Apache Flink Python API 歷史發展的過程,介紹了Apache Flink Python API架構變動的緣由以及當前架構模型;任何對將來 Flink Python API 是的規劃與功能特性繼續詳細介紹,最後指望你們能在QA環節能給一些建議和意見,謝謝!更多細節內容,歡迎訂閱個人博客: enjoyment.cool/
▼ Apache Flink 社區推薦 ▼
Apache Flink 及大數據領域盛會 Flink Forward Asia 2019 將於 11月28-30日在北京舉辦,阿里、騰訊、美團、字節跳動、百度、英特爾、DellEMC、Lyft、Netflix 及 Flink 創始團隊等近 30 家知名企業資深技術專家齊聚國際會議中心,與全球開發者共同探討大數據時代核心技術與開源生態。瞭解更多精彩議程請點擊:
developer.aliyun.com/special/ffa…
Flink 社區公衆號後臺回覆「門票」,少許免費門票搶先拿。
Flink 社區官方微信公衆號