簡介:本文介紹了PyFlink項目的目標和發展歷程,以及PyFlink目前的核心功能,包括Python Table API、Python UDF、向量化Python UDF、Python UDF Metrics、PyFlink依賴管理和Python UDF執行優化,同時也針對功能展現了相關demo。
做者|付典python
本文介紹了PyFlink項目的目標和發展歷程,以及PyFlink目前的核心功能,包括Python Table API、Python UDF、向量化Python UDF、Python UDF Metrics、PyFlink依賴管理和Python UDF執行優化,同時也針對功能展現了相關demo。本文主要分爲4個部分:git
- PyFlink介紹
- PyFlink相關功能
- PyFlink功能演示
- PyFlink下一步規劃
PyFlink是Flink的一個子模塊,也是整個Flink項目的一部分,主要目的是提供Flink的Python語言支持。由於在機器學習和數據分析等領域,Python語言很是重要,甚至是最主要的開發語言。因此,爲了知足更多用戶需求,拓寬Flink的生態,咱們啓動了PyFlink項目。github
PyFlink項目的目標主要有兩點,第一點是將Flink的計算能力輸出給Python用戶,也就是咱們會在Flink中提供一系列的Python API,方便對Python語言比較熟悉的用戶開發Flink做業。docker
第二點,就是將Python生態基於Flink進行分佈式化。雖然咱們會在Flink中提供一系列的Python API來給Python用戶來使用,但這對用戶來講是有學習成本的,由於用戶要學習怎麼使用Flink的Python API,瞭解每個API的用途。因此咱們但願用戶能在API層使用他們比較熟悉的 Python庫的API,可是底層的計算引擎使用Flink,從而下降他們的學習成本。這是咱們將來要作的事情,目前處於啓動階段。apache
下圖是PyFlink項目的發展狀況,目前發佈了3個版本,支持的內容也愈來愈豐富。api
咱們主要介紹PyFlink如下功能,Python Table API、Python UDF、向量化Python UDF、Python UDF Metrics、PyFlink依賴管理和Python UDF執行優化。數據結構
Python Table API的目的是爲了讓用戶可使用Python語言來開發Flink做業。Flink裏面有三種類型的API,Process、Function和Table API,前二者是較爲底層的API,基於Process和Function開發的做業,其邏輯會嚴格按照用戶定義的行爲進行執行,而Table API是較爲高層的API,基於Table API開發的做業,其邏輯會通過一系列的優化以後進行執行。架構
Python Table API,顧名思義就是提供 Table API的Python語言支持。併發
如下是Python Table API開發的一個Flink做業,做業邏輯是讀取文件,計算word count,而後再把計算結果寫到文件中去。這個例子雖然簡單,但包括了開發一個Python Table API做業的全部基本流程。機器學習
首先咱們須要定義做業的執行模式,好比說是批模式仍是流模式,做業的併發度是多少?做業的配置是什麼。接下來咱們須要定義source表和sink表,source表定義了做業的數據源來源於哪裏,數據的格式是什麼;sink表定義了做業的執行結果寫到哪裏去,數據格式是什麼。最後咱們須要定義做業的執行邏輯,在這個例子中是計算寫過來的count。
如下是Python Table API的部分截圖,能夠看到它的數量和功能都比較齊全。
Python Table API是一種關係型的API,其功能能夠類比成SQL,而SQL裏自定義函數是很是重要的功能,能夠極大地擴展SQL的使用範圍。Python UDF的主要目的就是容許用戶使用Python語言來開發自定義函數,從而擴展Python Table API的使用場景。同時,Python UDF除了能夠用在Python Table API做業中以外,還能夠用在Java Table API做業以及SQL做業中。
在PyFlink中咱們支持多種方式來定義Python UDF。用戶能夠定義一個Python類,繼承ScalarFunction,也能夠定義一個普通的Python函數或者Lambda函數,實現自定義函數的邏輯。除此以外,咱們還支持經過Callable Function和Partial Function定義Python UDF。用戶能夠根據本身的須要選擇最適合本身的方式。
PyFlink裏面提供了多種Python UDF的使用方式,包括Python Table API、Java table API和SQL,咱們一一介紹。
在Python Table API中使用Python UDF,在定義完Python UDF以後,用戶首先須要註冊Python UDF,能夠調用table environment register來註冊,而後命名,而後就能夠在做業中經過這個名字來使用 Python UDF了。
在Java Table API中它的使用方式也比較類似,可是註冊方式不同,Java Table API做業中須要經過DDL語句來進行註冊。
除此以外,用戶也能夠在SQL的做業中使用Python UDF。與前面兩種方式相似,用戶首先須要註冊Python UDF,能夠在SQL腳本中經過DDL語句來註冊,也能夠在SQL Client的環境配置文件裏面註冊。
簡單介紹下Python UDF的執行架構。Flink是用Java語言編寫的,運行在Java虛擬機中,而Python UDF運行在 Python虛擬機中,因此Java進程和Python進程須要進行數據通訊。 除此以外,二者間還須要傳輸state、log、metrics,它們的傳輸協議須要支持4種類型。
向量化Python UDF的主要目的是使 Python用戶能夠利用Pandas或者Numpy等數據分析領域經常使用的Python庫,開發高性能的Python UDF。
向量化Python UDF是相對於普通Python UDF而言的,咱們能夠在下圖看到二者的區別。
下圖顯示了向量化Python UDF的執行過程。首先在Java端,Java在攢完多條數據以後會轉換成Arrow格式,而後發送給Python進程。Python進程在收到數據以後,將其轉換成Pandas的數據結構,而後調用用戶自定義的向量化Python UDF。同時向量化Python UDF的執行結果會再轉化成Arrow格式的數據,再發送給 Java進程。
在使用方式上,向量化Python UDF與普通Python UDF是相似的,只有如下幾個地方稍有不一樣。首先向量化Python UDF的聲明方式須要加一個UDF type,聲明這是一個向量化Python UDF,同時UDF的輸入輸出類型是Pandas Series。
前面咱們提到 Python UDF有多種定義方式,可是若是須要在Python UDF中使用Metrics,那麼Python UDF必須繼承ScalarFunction來進行定義。在Python UDF的 open方法裏面提供了一個Function Context參數,用戶能夠經過Function Context參數來註冊Metrics,而後就能夠經過註冊的 Metrics對象來彙報了。
從類型來講,PyFlink依賴主要包括如下幾種類型,普通的PyFlink文件、存檔文件,第三方的庫、PyFlink解釋器,或者Java的Jar包等等。從解決方案來看,針對每種類型的依賴,PyFlink提供了兩種解決方案,一種是API的解決方案,一種是命令行選項的方式,你們選擇其一便可。
Python UDF的執行優化主要包括兩個方面,執行計劃優化和運行時優化。它與SQL很是像,一個包含Python UDF的做業,首先會通過預先定義的規則,生成一個最優的執行計劃。在執行計劃已經肯定的狀況下,在實際執行的時候,又能夠運用一些其餘的優化手段來達到儘量高的執行效率。
執行計劃的優化主要有如下幾個優化思路。一個是不一樣類型的 UDF的拆分,因爲在一個節點中可能同時包含多種類型的UDF,而不一樣的類型的UDF是不能放在一塊執行的;第二個方面是Filter下推,其主要目的是儘量下降含有Python UDF節點的輸入數據量,從而提高整個做業的執行性能;第三個優化思路是Python UDF Chaining,Java進程與Python進程之間的通訊開銷以及序列化反序列化開銷比較大,而Python UDF Chaining能夠儘可能減小Java進程和Python進程之間的通訊開銷。
假若有這樣一個做業,它包含了兩個UDF,其中add是Python UDF, subtract是向量化Python UDF。默認狀況下,這個做業的執行計劃會有一個project節點,這兩個 UDF同時位於這一project的節點裏面。這個執行計劃的主要問題是,普通Python UDF每次處理一條數據,而向量化Python UDF,每次處理多條數據,因此這樣的一個執行計劃是沒有辦法執行的。
可是經過拆分,咱們能夠把這一個project的節點拆分紅了兩個project的節點,其中第一個project的節點只包含普通Python UDF,而第二個節點只包含向量化Python UDF。不一樣類型的Python UDF拆分到不一樣的節點以後,每個節點都只包含了一種類型的UDF,因此算子就能夠根據它所包含的UDF的類型選擇最合適的執行方式。
Filter下推的主要目的是將過濾算子下推到Python UDF節點以前,儘可能減小Python UDF節點的數據量。
假如咱們有這樣一個做業,做業原始執行計劃裏面包括了兩個Project的節點,一個是add、 subtract,同時還包括一個Filter節點。這個執行計劃是能夠運行的,但須要更優化。能夠看到,由於Python的節點位於Filter節點以前,因此在Filter節點以前Python UDF已經計算完了,可是若是把Filter過濾下,推到Python UDF以前,那麼就能夠大大下降Python UDF節點的輸入數據量。
假如咱們有這樣一個做業,裏面包含兩種類型的UDF,一個是add,一個是subtract,它們都是普通的Python UDF。在一個執行計劃裏面包含兩個project的節點,其中第一個project的節點先算subtract,而後再傳輸給第二個project節點進行執行。
它的主要問題是,因爲subtract和add位於兩個不一樣的節點,其計算結果須要從Python發送回Java,而後再由Java進程發送給第二個節點的Python進行執行。至關於數據在Java進程和Python進程之間轉了一圈,因此它帶來了徹底沒有必要的通訊開銷和序列化反序列化開銷。所以,咱們能夠將執行計劃優化成右圖,就是將add節點和subtract節點放在一個節點中運行,subtract節點的結果計算出來以後直接去調用add節點。
目前提升Python UDF運營時的執行效率有三種:一是Cython優化,用它來提升Python代碼的執行效率;二是自定義Java進程和Python進程之間的序列化器和反序列化器,提升序列化和反序列化效率;三是提供向量化Python UDF功能。
首先你們打開這個頁面,裏面提供了PyFlink的一些demo,這些demo是運行在docker裏面的,因此你們若是要運行這些demo就須要在本機安裝docker環境。
隨後,咱們能夠運行命令,命令會啓動一個PyFlink的集羣,後面咱們運行的PyFlink的例子都會提交到集羣去執行。
第一個例子是word count,咱們首先在裏面定義了環境、source、sink等,咱們能夠運行一下這個做業。
這是做業的執行結果,能夠看到Flink這個單詞出現了兩次,PyFlink這個單詞出現了一次。
接下來再運行一個Python UDF的例子。這個例子和前面有一些相似,首先咱們定義它使用PyFlink,運行在批這種模式下,同時做業的併發度是1。不同的地方是咱們在做業裏定義了一個UDF,它的輸入包括兩個列,都是Bigint類型,並且它輸出類型也是對應的。這個UDF的邏輯是把這兩個列的相加做爲一個結果輸出。
咱們執行一下做業,執行結果是3。
接下來咱們再運行一個帶有依賴的Python UDF。前面做業的UDF是不包含任何依賴的,直接就把兩個輸入列相加起來。而在這個例子裏,UDF引用了一個第三方的依賴,咱們能夠經過API set python requirement來執行。
接下來咱們運行做業,它的執行結果和前面是同樣的,由於這兩個做業的邏輯是相似的。
接下來咱們再看一個向量化Python UDF的例子。在 UDF定義的時候,咱們加了一個UDF的type字段,說明說咱們是一個向量化的Python UDF,其餘的邏輯和普通Python UDF的邏輯相似。最後它的執行結果也是3,由於它的邏輯和前面是同樣的,計算兩頁的之和。
咱們再來看一個例子,在Java的Table做業裏面使用Python。在這個做業裏面咱們又會用到一個Python UDF,它經過DDL語句進行註冊,而後在execute SQL語句裏面進行使用。
接下來咱們再看在純SQL做業中使用Python UDF的例子。在資源文件裏面咱們聲明瞭一個UDF,名字叫add1,它的類型是Python,同時咱們也能看到它的UDF位置。
接下來咱們運行它,執行結果是234。
目前PyFlink只支持了Python Table API,咱們計劃在下一個版本中支持DataStream API,同時也會支持Python UDAF以及Pandas UDAF,另外,在執行層也會持續優化PyFlink的執行效率。
這是一些資源的連接,包括PyFlink的文檔地址。
https://ci.apache.org/projects/flink/flink-docs-master/api/python/
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/
https://github.com/pyflink/playgrounds/tree/1.11
好的,咱們今天的分享就到這裏了,歡迎你們繼續關注咱們的課程。
活動推薦:
僅需99元便可體驗阿里雲基於 Apache Flink 構建的企業級產品-實時計算 Flink 版!點擊下方連接瞭解活動詳情:https://www.aliyun.com/product/bigdata/sc?utm\_content=g\_1000250506
本文內容由阿里雲實名註冊用戶自發貢獻,版權歸原做者全部,阿里雲開發者社區不擁有其著做權,亦不承擔相應法律責任。具體規則請查看《阿里雲開發者社區用戶服務協議》和《阿里雲開發者社區知識產權保護指引》。若是您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將馬上刪除涉嫌侵權內容。