[Flink原理介紹第四篇】:Flink的Checkpoint和Savepoint介紹

原文:http://www.javashuo.com/article/p-ktaazqlj-ks.htmlhtml

http://www.javashuo.com/article/p-swtbcvco-ge.htmlnode

https://www.jianshu.com/p/8e74c7cdd463apache

https://blog.csdn.net/u013014724/article/details/84800255緩存

第一部分:Flink的Checkpoint數據結構

1. Flink Checkpoint原理介紹app

Checkpoint是Flink實現容錯機制最核心的功能,它可以根據配置週期性地基於Stream中各個Operator的狀態來生成Snapshot,從而將這些狀態數據按期持久化存儲下來,當Flink程序一旦意外崩潰時,從新運行程序時能夠有選擇地從這些Snapshot進行恢復,從而修正由於故障帶來的程序數據狀態中斷。這裏,咱們簡單理解一下Flink Checkpoint機制,如官網下圖所示:分佈式

Checkpoint指定觸發生成時間間隔後,每當須要觸發Checkpoint時,會向Flink程序運行時的多個分佈式的Stream Source中插入一個Barrier標記,這些Barrier會根據Stream中的數據記錄一塊兒流向下游的各個Operator。當一個Operator接收到一個Barrier時,它會暫停處理Steam中新接收到的數據記錄。由於一個Operator可能存在多個輸入的Stream,而每一個Stream中都會存在對應的Barrier,該Operator要等到全部的輸入Stream中的Barrier都到達。當全部Stream中的Barrier都已經到達該Operator,這時全部的Barrier在時間上看來是同一個時刻點(表示已經對齊),在等待全部Barrier到達的過程當中,Operator的Buffer中可能已經緩存了一些比Barrier早到達Operator的數據記錄(Outgoing Records),這時該Operator會將數據記錄(Outgoing Records)發射(Emit)出去,做爲下游Operator的輸入,最後將Barrier對應Snapshot發射(Emit)出去做爲這次Checkpoint的結果數據。函數

 

2. Checkpoint的簡單設置oop

開啓Checkpoint功能,有兩種方式。其一是在conf/flink_conf.yaml中作系統設置;其二是針對任務再代碼裏靈活配置。可是我我的推薦第二種方式,針對當前任務設置,設置代碼以下所示:ui

 1 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 2 // 設置保存點的保存路徑,這裏是保存在hdfs中
 3 env.setStateBackend(new FsStateBackend("hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints"));
 4 CheckpointConfig config = env.getCheckpointConfig();
 5 // 任務流取消和故障應保留檢查點
 6 config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 7 // 保存點模式:exactly_once
 8 config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 9 // 觸發保存點的時間間隔
10 config.setCheckpointInterval(60000);

 

上面調用enableExternalizedCheckpoints設置爲ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink處理程序被cancel後,會保留Checkpoint數據,以便根據實際須要恢復到指定的Checkpoint處理。上面代碼配置了執行Checkpointing的時間間隔爲1分鐘。

3. 保存多個Checkpoint
默認狀況下,若是設置了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink程序失敗時,能夠從最近的這個Checkpoint來進行恢復。可是,若是咱們但願保留多個Checkpoint,並可以根據實際須要選擇其中一個進行恢復,這樣會更加靈活,好比,咱們發現最近4個小時數據記錄處理有問題,但願將整個狀態還原到4小時以前。
Flink能夠支持保留多個Checkpoint,須要在Flink的配置文件conf/flink-conf.yaml中,添加以下配置,指定最多須要保存Checkpoint的個數:

state.checkpoints.num-retained: 20
這樣設置之後,運行Flink Job,並查看對應的Checkpoint在HDFS上存儲的文件目錄,示例以下所示:

 1 hdfs dfs -ls /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/
 2 Found 22 items
 3 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:23 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-858
 4 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:24 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-859
 5 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:25 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860
 6 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:26 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-861
 7 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:27 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-862
 8 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:28 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-863
 9 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:29 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-864
10 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:30 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-865
11 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:31 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-866
12 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:32 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-867
13 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:33 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-868
14 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:34 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-869
15 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:35 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-870
16 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:36 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-871
17 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:37 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-872
18 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:38 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-873
19 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:39 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-874
20 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:40 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-875
21 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:41 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-876
22 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:42 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-877
23 drwxr-xr-x   - hadoop supergroup          0 2018-08-31 20:05 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/shared
24 drwxr-xr-x   - hadoop supergroup          0 2018-08-31 20:05 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/taskowned

可見,咱們配置了state.checkpoints.num-retained的值爲20,只保留了最近的20個Checkpoint。若是但願會退到某個Checkpoint點,只須要指定對應的某個Checkpoint路徑便可實現。

4.從Checkpoint進行恢復
若是Flink程序異常失敗,或者最近一段時間內數據處理錯誤,咱們能夠將程序從某一個Checkpoint點,好比chk-860進行回放,執行以下命令:

bin/flink run -s hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata flink-app-jobs.jar
程序正常運行後,還會按照Checkpoint配置進行運行,繼續生成Checkpoint數據,以下所示:

1 hdfs dfs -ls /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e
2 Found 6 items
3 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:56 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-861
4 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:57 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-862
5 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:58 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-863
6 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:59 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-864
7 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:55 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/shared
8 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:55 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/taskowned

 

從上面咱們能夠看到,前面Flink Job的ID爲582e17d2cc343e6c56255d111bae0191,全部的Checkpoint文件都在以Job ID爲名稱的目錄裏面,當Job停掉後,從新從某個Checkpoint點(chk-860)進行恢復時,從新生成Job ID(這裏是11bbc5d9933e4ff7e25198a760e9792e),而對應的Checkpoint編號會從該次運行基於的編號繼續連續生成:chk-86一、chk-86二、chk-863等等。

第二部分: Flink的Savepoint
1.Flink的Savepoint原理介紹
Savepoint會在Flink Job以外存儲自包含(self-contained)結構的Checkpoint,它使用Flink的Checkpoint機制來建立一個非增量的Snapshot,裏面包含Streaming程序的狀態,並將Checkpoint的數據存儲到外部存儲系統中。

Flink程序中包含兩種狀態數據,一種是用戶定義的狀態(User-defined State),他們是基於Flink的Transformation函數來建立或者修改獲得的狀態數據;另外一種是系統狀態(System State),他們是指做爲Operator計算一部分的數據Buffer等狀態數據,好比在使用Window Function時,在Window內部緩存Streaming數據記錄。爲了可以在建立Savepoint過程當中,惟一識別對應的Operator的狀態數據,Flink提供了API來爲程序中每一個Operator設置ID,這樣能夠在後續更新/升級程序的時候,能夠在Savepoint數據中基於Operator ID來與對應的狀態信息進行匹配,從而實現恢復。固然,若是咱們不指定Operator ID,Flink也會咱們自動生成對應的Operator狀態ID。
並且,強烈建議手動爲每一個Operator設置ID,即便將來Flink應用程序可能會改動很大,好比替換原來的Operator實現、增長新的Operator、刪除Operator等等,至少咱們有可能與Savepoint中存儲的Operator狀態對應上。另外,保存的Savepoint狀態數據,畢竟是基於當時程序及其內存數據結構生成的,因此若是將來Flink程序改動比較大,尤爲是對應的須要操做的內存數據結構都變化了,可能根本就沒法從原來舊的Savepoint正確地恢復。

下面,咱們以Flink官網文檔中給定的例子,來看下如何設置Operator ID,代碼以下所示:

 1 DataStream<String> stream = env.
 2   // 有狀態的source ID (例如:Kafka)
 3   .addSource(new StatefulSource())
 4   .uid("source-id") // source操做的ID
 5   .shuffle()
 6   // 有狀態的mapper ID
 7   .map(new StatefulMapper())
 8   .uid("mapper-id") // mapper的ID 
 9   // 無狀態sink打印
10   .print(); // 自動生成ID

2.建立Savepoint
建立一個Savepoint,須要指定對應Savepoint目錄,有兩種方式來指定:
一種是,須要配置Savepoint的默認路徑,須要在Flink的配置文件conf/flink-conf.yaml中,添加以下配置,設置Savepoint存儲目錄,例如以下所示:

state.savepoints.dir: hdfs://namenode01.td.com/flink-1.5.3/flink-savepoints
另外一種是,在手動執行savepoint命令的時候,指定Savepoint存儲目錄,命令格式以下所示:

bin/flink savepoint :jobId [:targetDirectory]
例如,正在運行的Flink Job對應的ID爲40dcc6d2ba90f13930abce295de8d038,使用默認state.savepoints.dir配置指定的Savepoint目錄,執行以下命令:

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038
能夠看到,在目錄hdfs://namenode01.td.com/flink-1.5.3/flink-savepoints/savepoint-40dcc6-4790807da3b0下面生成了ID爲40dcc6d2ba90f13930abce295de8d038的Job的Savepoint數據。
爲正在運行的Flink Job指定一個目錄存儲Savepoint數據,執行以下命令:

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038 hdfs://namenode01.td.com/tmp/flink/savepoints
能夠看到,在目錄 hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f下面生成了ID爲40dcc6d2ba90f13930abce295de8d038的Job的Savepoint數據。

3.從Savepoint恢復
如今,咱們能夠停掉Job 40dcc6d2ba90f13930abce295de8d038,而後經過Savepoint命令來恢復Job運行,命令格式以下所示:

bin/flink run -s :savepointPath [:runArgs]
以上面保存的Savepoint爲例,恢復Job運行,執行以下命令:

bin/flink run -s hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f flink-app-jobs.jar
能夠看到,啓動一個新的Flink Job,ID爲cdbae3af1b7441839e7c03bab0d0eefd。

4. Savepoint目錄結構
下面,咱們看一下Savepoint目錄下面存儲內容的結構,以下所示:

hdfs dfs -ls /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b
Found 5 items
-rw-r--r--   3 hadoop supergroup       4935 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/50231e5f-1d05-435f-b288-06d5946407d6
-rw-r--r--   3 hadoop supergroup       4599 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/7a025ad8-207c-47b6-9cab-c13938939159
-rw-r--r--   3 hadoop supergroup       4976 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/_metadata
-rw-r--r--   3 hadoop supergroup       4348 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/bd9b0849-aad2-4dd4-a5e0-89297718a13c
-rw-r--r--   3 hadoop supergroup       4724 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/be8c1370-d10c-476f-bfe1-dd0c0e7d498a

如上面列出的HDFS路徑中,11bbc5是Flink Job ID字符串前6個字符,後面bd967f90709b是隨機生成的字符串,而後savepoint-11bbc5-bd967f90709b做爲存儲這次Savepoint數據的根目錄,最後savepoint-11bbc5-bd967f90709b目錄下面_metadata文件包含了Savepoint的元數據信息,其中序列化包含了savepoint-11bbc5-bd967f90709b目錄下面其它文件的路徑,這些文件內容都是序列化的狀態信息。

參考

http://shiyanjun.cn/archives/1855.html
https://www.colabug.com/2259405.html
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/savepoints.html

相關文章
相關標籤/搜索