咱們知道 PyFlink 是在 Apache Flink 1.9 版新增的,那麼在 Apache Flink 1.10 中 Python UDF 功能支持的速度是否可以知足用戶的急切需求呢?python
直觀的判斷,PyFlink Python UDF 的功能也能夠如上圖同樣可以迅速從幼苗變成大樹,爲啥有此判斷,請繼續往下看…git
咱們都知道有 Beam on Flink 的場景,就是 Beam 支持多種 Runner,也就是說 Beam SDK 編寫的 Job 能夠運行在 Flink 之上。以下圖所示:github
上面這圖是 Beam Portability Framework 的架構圖,他描述了 Beam 如何支持多語言,如何支持多 Runner,單獨說 Apache Flink 的時候咱們就能夠說是 Beam on Flink,那麼怎麼解釋 Flink on Beam 呢?web
在 Apache Flink 1.10 中咱們所說的 Flink on Beam 更精確的說是 PyFlink on Beam Portability Framework。咱們看一下簡單的架構圖,以下:macos
Beam Portability Framework 是一個成熟的多語言支持框架,框架高度抽象了語言之間的通訊協議(gRPC),定義了數據的傳輸格式(Protobuf),而且根據通用流計算框架所須要的組件,抽象個各類服務,好比 DataService,StateService,MetricsService 等。在這樣一個成熟的框架下,PyFlink 能夠快速的構建本身的 Python 算子,同時重用 Apache Beam Portability Framework 中現有 SDK harness 組件,能夠支持多種 Python 運行模式,如:Process,Docker,etc.,這使得 PyFlink 對 Python UDF 的支持變得很是容易,在 Apache Flink 1.10 中的功能也很是的穩定和完整。那麼爲啥說是 Apache Flink 和 Apache Beam 共同打造呢,是由於我發現目前 Apache Beam Portability Framework 的框架也存在不少優化的空間,因此我在 Beam 社區進行了優化討論,而且在 Beam 社區也貢獻了 20+ 的優化補丁。apache
概要了解了 Apache Flink 1.10 中 Python UDF 的架構以後,咱們仍是切入的代碼部分,看看如何開發和使用 Python UDF。瀏覽器
在 Apache Flink 1.10 中咱們有多種方式進行 UDF 的定義,好比:bash
class HashCodeMean(ScalarFunction): def eval(self, i, j): return (hash(i) + hash(j)) / 2
lambda i, j: (hash(i) + hash(j)) / 2
def hash_code_mean(i, j): return (hash(i) + hash(j)) / 2
class CallableHashCodeMean(object): def __call__(self, i, j): return (hash(i) + hash(j)) / 2
咱們發現上面定義函數除了第一個擴展 ScalaFunction 的方式是 PyFlink 特有的,其餘方式都是 Python 語言自己就支持的,也就是說,在 Apache Flink 1.10 中 PyFlink 容許以任何 Python 語言所支持的方式定義 UDF。數據結構
那麼定義完 UDF 咱們應該怎樣使用呢?Apache Flink 1.10 中提供了 2 種 Decorators,以下:架構
udf(lambda i, j: (hash(i) + hash(j)) / 2, [for input types], [for result types])
@udf(input_types=..., result_type=...) def hash_code_mean(…): return …
而後在使用以前進行註冊,以下:
st_env.register_function("hash_code", hash_code_mean)
接下來就能夠在 Table API/SQL 中進行使用了,以下:
my_table.select("hash_code_mean(a, b)").insert_into("Results")
目前爲止,咱們已經完成了 Python UDF 的定義,聲明和註冊了。接下來咱們仍是看一個完整的示例吧:)
假設蘋果公司要統計該公司產品在雙 11 期間各城市的銷售數量和銷售金額分佈狀況。
每一筆訂單是一個字符串,字段用逗號分隔, 例如:
ItemName, OrderCount, Price, City ------------------------------------------- iPhone 11, 30, 5499, Beijing\n iPhone 11 Pro,20,8699,Guangzhou\n
根據案例的需求和數據結構分析,咱們須要對原始字符串進行結構化解析,那麼須要一個按「,」號分隔的 UDF(split) 和一個可以將各個列信息展平的 DUF(get)。同時咱們須要根據城市進行分組統計。
UDF 定義
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.ARRAY(DataTypes.STRING())) def split(line): return line.split(",")
@udf(input_types=[DataTypes.ARRAY(DataTypes.STRING()), DataTypes.INT()], result_type=DataTypes.STRING()) def get(array, index): return array[index]
註冊 UDF
t_env.register_function("split", split)
t_env.register_function("get", get)
以下代碼咱們發現核心實現邏輯很是簡單,只須要對數據進行解析和對數據進行集合計算:
t_env.from_table_source(SocketTableSource(port=9999))\ .alias("line")\ .select("split(line) as str_array")\ .select("get(str_array, 3) as city, " "get(str_array, 1).cast(LONG) as count, " "get(str_array, 2).cast(LONG) as unit_price")\ .select("city, count, count * unit_price as total_price")\ .group_by("city")\ .select("city, sum(count) as sales_volume, sum(total_price) as sales")\ .insert_into("sink") t_env.execute("Sales Statistic")
上面的代碼咱們假設是一個 Socket 的 Source,Sink 是一個 Chart Sink,那麼最終運行效果圖,以下:
我老是認爲在博客中只是文本描述而不能讓讀者真正的在本身的機器上運行起來的博客,不是好博客,因此接下來咱們看看按照咱們下面的操做,是否能在你的機器上也運行起來?:)
由於目前 PyFlink 尚未部署到 PyPI 上面,在 Apache Flink 1.10 發佈以前,咱們須要經過構建 Flink 的 master 分支源碼來構建運行咱們 Python UDF 的 PyFlink 版本。
在進行編譯代碼以前,咱們須要你已經安裝了 JDK8 和 Maven3x。
tar -xvf apache-maven-3.6.1-bin.tar.gz mv -rf apache-maven-3.6.1 /usr/local/
MAVEN_HOME=/usr/local/apache-maven-3.6.1 export MAVEN_HOME export PATH=${PATH}:${MAVEN_HOME}/bin
除了 JDK 和 MAVEN 完整的環境依賴性以下:
咱們看到基礎環境安裝比較簡單,我這裏就不每個都貼出來了。若是你們有問題歡迎郵件或者博客留言。
git clone https://github.com/apache/flink.git
cd flink mvn clean install -DskipTests ... ... [INFO] flink-walkthrough-datastream-scala ................. SUCCESS [ 0.192 s] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 18:34 min [INFO] Finished at: 2019-12-04T23:03:25+08:00 [INFO] ------------------------------------------------------------------------
cd flink-python; python3 setup.py sdist bdist_wheel ... ... adding 'apache_flink-1.10.dev0.dist-info/WHEEL' adding 'apache_flink-1.10.dev0.dist-info/top_level.txt' adding 'apache_flink-1.10.dev0.dist-info/RECORD' removing build/bdist.macosx-10.14-x86_64/wheel
pip3 install dist/*.tar.gz ... ... Successfully installed apache-beam-2.15.0 apache-flink-1.10.dev0 avro-python3-1.9.1 cloudpickle-1.2.2 crcmod-1.7 dill-0.2.9 docopt-0.6.2 fastavro-0.21.24 future-0.18.2 grpcio-1.25.0 hdfs-2.5.8 httplib2-0.12.0 mock-2.0.0 numpy-1.17.4 oauth2client-3.0.0 pbr-5.4.4 protobuf-3.11.1 pyarrow-0.14.1 pyasn1-0.4.8 pyasn1-modules-0.2.7 pydot-1.4.1 pymongo-3.9.0 pyyaml-3.13 rsa-4.0
也能夠查看一下,咱們核心須要 apache-beam 和 apache-flink,以下命令:
jincheng:flink-python jincheng.sunjc$ pip3 list Package Version ----------------------------- --------- alabaster 0.7.12 apache-beam 2.15.0 apache-flink 1.10.dev0 atomicwrites 1.3.0
如上信息證實你咱們所需的 Python 依賴已經沒問題了,接下來回過頭來在看看如何進行業務需求的開發。
一個完成的 PyFlink 的 Job 須要有外部數據源的定義,有業務邏輯的定義和最終計算結果輸出的定義。也就是 Source connector, Transformations, Sink connector,接下來咱們根據這個三個部分進行介紹來完成咱們的需求。
咱們須要實現一個 Socket Connector,首先要實現一個 StreamTableSource, 核心代碼是實現 getDataStream,代碼以下:
@Override public DataStream<Row> getDataStream(StreamExecutionEnvironment env) { return env.socketTextStream(hostname, port, lineDelimiter, MAX_RETRY) .flatMap(new Spliter(fieldNames.length, fieldDelimiter, appendProctime)) .returns(getReturnType()); }
上面代碼利用了 StreamExecutionEnvironment 中現有 socketTextStream 方法接收數據,而後將業務訂單數據傳個一個 FlatMapFunction, FlatMapFunction 主要實現將數據類型封裝爲 Row,詳細代碼查閱 Spliter。
同時,咱們還須要在 Python 封裝一個 SocketTableSource,詳情查閱 socket_table_source.py。
咱們預期要獲得的一個效果是可以將結果數據進行圖形化展現,簡單的思路是將數據寫到一個本地的文件,而後在寫一個 HTML 頁面,使其可以自動更新結果文件,並展現結果。因此咱們還須要自定義一個 Sink 來完成該功能,咱們的需求計算結果是會不斷的更新的,也就是涉及到 Retraction(若是你們不理解這個概念,能夠查閱我之前的博客),目前在 Flink 裏面尚未默認支持 Retract 的 Sink,因此咱們須要自定義一個 RetractSink,好比咱們實現一下 CsvRetractTableSink。
CsvRetractTableSink 的核心邏輯是緩衝計算結果,每次更新進行一次全量(這是個純 demo,不能用於生產環境)文件輸出。源代碼查閱 CsvRetractTableSink。
同時咱們還須要利用 Python 進行封裝,詳見 chart_table_sink.py。
在 chart_table_sink.py 咱們封裝了一個 http server,這樣咱們能夠在瀏覽器中查閱咱們的統計結果。
完成自定義的 Source 和 Sink 以後咱們終於能夠進行業務邏輯的開發了,其實整個過程自定義 Source 和 Sink 是最麻煩的,核心計算邏輯彷佛要簡單的多。
若是你本地環境 python 命令版本是 2.x,那麼須要對 Python 版本進行設置,以下:
t_env.get_config().set_python_executable("python3")
PyFlink 1.10 以後支持 Python 3.6+ 版本。
PyFlink 讀取數據源很是簡單,以下:
... ... t_env.from_table_source(SocketTableSource(port=9999)).alias("line")
上面這一行代碼定義了監聽端口 9999 的數據源,同時結構化 Table 只有一個名爲 line 的列。
咱們須要對上面列進行分析,爲了演示 Python UDF,咱們在 SocketTableSource中並無對數據進行預處理,因此咱們利用上面 UDF 定義 一節定義的 UDF,來對原始數據進行預處理。
... ... .select("split(line) as str_array") .select("get(str_array, 3) as city, " "get(str_array, 1).cast(LONG) as count, " "get(str_array, 2).cast(LONG) as unit_price") .select("city, count, count * unit_price as total_price")
核心的統計邏輯是根據 city 進行分組,而後對 銷售數量和銷售金額進行求和,以下:
... ... .group_by("city") .select("city, sum(count) as sales_volume, sum(total_price) as sales")\
計算結果寫入到咱們自定義的 Sink 中,以下:
... ... .insert_into("sink")
from pyflink.datastream import StreamExecutionEnvironment from pyflink.demo import ChartConnector, SocketTableSource from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes from pyflink.table.descriptors import Schema from pyflink.table.udf import udf env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create( env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) t_env.connect(ChartConnector())\ .with_schema(Schema() .field("city", DataTypes.STRING()) .field("sales_volume", DataTypes.BIGINT()) .field("sales", DataTypes.BIGINT()))\ .register_table_sink("sink") @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.ARRAY(DataTypes.STRING())) def split(line): return line.split(",") @udf(input_types=[DataTypes.ARRAY(DataTypes.STRING()), DataTypes.INT()], result_type=DataTypes.STRING()) def get(array, index): return array[index] t_env.get_config().set_python_executable("python3") t_env.register_function("split", split) t_env.register_function("get", get) t_env.from_table_source(SocketTableSource(port=6666))\ .alias("line")\ .select("split(line) as str_array")\ .select("get(str_array, 3) as city, " "get(str_array, 1).cast(LONG) as count, " "get(str_array, 2).cast(LONG) as unit_price")\ .select("city, count, count * unit_price as total_price")\ .group_by("city")\ .select("city, " "sum(count) as sales_volume, " "sum(total_price) as sales")\ .insert_into("sink") t_env.execute("Sales Statistic")
上面代碼中你們會發現一個陌生的部分,就是 from pyflink.demo import ChartConnector, SocketTableSource. 其中 pyflink.demo 是哪裏來的呢?其實就是包含了上面咱們介紹的 自定義 Source/Sink(Java&Python)。下面咱們來介紹如何增長這個 pyflink.demo 模塊。
爲了你們方便我把自定義 Source/Sink(Java&Python)的源代碼放到了這裏 ,你們能夠進行以下操做:
git clone https://github.com/sunjincheng121/enjoyment.code.git
cd enjoyment.code/PyUDFDemoConnector/; mvn clean install
python3 setup.py sdist bdist_wheel ... ... adding 'pyflink_demo_connector-0.1.dist-info/WHEEL' adding 'pyflink_demo_connector-0.1.dist-info/top_level.txt' adding 'pyflink_demo_connector-0.1.dist-info/RECORD' removing build/bdist.macosx-10.14-x86_64/wheel
pip3 install dist/pyflink-demo-connector-0.1.tar.gz ... ... Successfully built pyflink-demo-connector Installing collected packages: pyflink-demo-connector Successfully installed pyflink-demo-connector-0.1
出現上面信息證實已經將 PyFlink.demo 模塊成功安裝。接下來咱們能夠運行咱們的示例了 :)
示例的代碼在上面下載的源代碼裏面已經包含了,爲了簡單,咱們利用 PyCharm 打開enjoyment.code/myPyFlink。同時在 Terminal 啓動一個端口:
nc -l 6666
啓動 blog_demo,若是一切順利,啓動以後,控制檯會輸出一個 web 地址,以下所示:
咱們打開這個頁面,開始是一個空白頁面,以下:
咱們嘗試將下面的數據,一條,一條的發送給 Source Connector:
iPhone 11,30,5499,Beijing iPhone 11 Pro,20,8699,Guangzhou MacBook Pro,10,9999,Beijing AirPods Pro,50,1999,Beijing MacBook Pro,10,11499,Shanghai iPhone 11,30,5999,Shanghai iPhone 11 Pro,20,9999,Shenzhen MacBook Pro,10,13899,Hangzhou iPhone 11,10,6799,Beijing MacBook Pro,10,18999,Beijing iPhone 11 Pro,10,11799,Shenzhen MacBook Pro,10,22199,Shanghai AirPods Pro,40,1999,Shanghai
當輸入第一條訂單 iPhone 11,30,5499,Beijing,以後,頁面變化以下:
隨之訂單數據的不斷輸入,統計圖不斷變化。一個完整的 GIF 演示以下:
本篇從架構到 UDF 接口定義,再到具體的實例,向你們介紹了在 Apache Flink 1.10 發佈以後,如何利用 PyFlink 進行業務開發,其中 用戶自定義 Source 和 Sink部分比較複雜,這也是目前社區須要進行改進的部分(Java/Scala)。真正的核心邏輯部分其實比較簡單,爲了你們按照本篇進行實戰操做有些成就感,因此我增長了自定義 Source/Sink 和圖形化部分。但若是你們想簡化實例的實現也能夠利用 Kafka 做爲 Source 和 Sink,這樣就能夠省去自定義的部分,作起來也會簡單一些。
本文做者:孫金城(金竹)
本文爲阿里雲內容,未經容許不得轉載。