做者:邱從賢(山智)ios
在 Flink 中,狀態可靠性保證由 Checkpoint 支持,看成業出現 failover 的狀況下,Flink 會從最近成功的 Checkpoint 恢復。在實際狀況中,咱們可能會遇到 Checkpoint 失敗,或者 Checkpoint 慢的狀況,本文會統一聊一聊 Flink 中 Checkpoint 異常的狀況(包括失敗和慢),以及可能的緣由和排查思路。git
首先咱們須要瞭解 Flink 中 Checkpoint 的整個流程是怎樣的,在瞭解整個流程以後,咱們才能在出問題的時候,更好的進行定位分析。github
從上圖咱們能夠知道,Flink 的 Checkpoint 包括以下幾個部分:apache
上面的任何一個步驟不成功,整個 checkpoint 都會失敗。性能優化
能夠在 Checkpoint 界面看到以下圖所示,下圖中 Checkpoint 10423 失敗了。網絡
點擊 Checkpoint 10423 的詳情,咱們能夠看到類系下圖所示的表格(下圖中將 operator 名字截取掉了)。多線程
上圖中咱們看到三行,表示三個 operator,其中每一列的含義分別以下:app
Acknowledged
一列表示有多少個 subtask 對這個 Checkpoint 進行了 ack,從圖中咱們能夠知道第三個 operator 總共有 5 個 subtask,可是隻有 4 個進行了 ack;Latest Acknowledgement
表示該 operator 的全部 subtask 最後 ack 的時間;End to End Duration
表示整個 operator 的全部 subtask 中完成 snapshot 的最長時間;State Size
表示當前 Checkpoint 的 state 大小 -- 主要這裏若是是增量 checkpoint 的話,則表示增量大小;Buffered During Alignment
表示在 barrier 對齊階段積攢了多少數據,若是這個數據過大也間接表示對齊比較慢);Checkpoint 失敗大體分爲兩種狀況:Checkpoint Decline 和 Checkpoint Expire。機器學習
咱們能從 jobmanager.log
中看到相似下面的日誌Decline checkpoint 10423 by task 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178.
其中
10423 是 checkpointID,0b60f08bf8984085b59f8d9bc74ce2e1
是 execution id,85d268e6fbc19411185f7e4868a44178
是 job id,咱們能夠在 jobmanager.log
中查找 execution id,找到被調度到哪一個 taskmanager 上,相似以下所示:異步
2019-09-02 16:26:20,972 INFO [jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph - XXXXXXXXXXX (100/289) (87b751b1fd90e32af55f02bb2f9a9892) switched from SCHEDULED to DEPLOYING. 2019-09-02 16:26:20,972 INFO [jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying XXXXXXXXXXX (100/289) (attempt #0) to slot container_e24_1566836790522_8088_04_013155_1 on hostnameABCDE
從上面的日誌咱們知道該 execution 被調度到 hostnameABCDE
的 container_e24_1566836790522_8088_04_013155_1
slot 上,接下來咱們就能夠到 container container_e24_1566836790522_8088_04_013155
的 taskmanager.log 中查找 Checkpoint 失敗的具體緣由了。
另外對於 Checkpoint Decline 的狀況,有一種狀況咱們在這裏單獨抽取出來進行介紹:Checkpoint Cancel。
當前 Flink 中若是較小的 Checkpoint 尚未對齊的狀況下,收到了更大的 Checkpoint,則會把較小的 Checkpoint 給取消掉。咱們能夠看到相似下面的日誌:
$taskNameWithSubTaskAndID: Received checkpoint barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.
這個日誌表示,當前 Checkpoint 19 還在對齊階段,咱們收到了 Checkpoint 20 的 barrier。而後會逐級通知到下游的 task checkpoint 19 被取消了,同時也會通知 JM 當前 Checkpoint 被 decline 掉了。
在下游 task 收到被 cancelBarrier 的時候,會打印相似以下的日誌:
DEBUG $taskNameWithSubTaskAndID: Checkpoint 19 canceled, aborting alignment. 或者 DEBUG $taskNameWithSubTaskAndID: Checkpoint 19 canceled, skipping alignment. 或者 WARN $taskNameWithSubTaskAndID: Received cancellation barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.
上面三種日誌都表示當前 task 接收到上游發送過來的 barrierCancel 消息,從而取消了對應的 Checkpoint。
若是 Checkpoint 作的很是慢,超過了 timeout 尚未完成,則整個 Checkpoint 也會失敗。當一個 Checkpoint 因爲超時而失敗是,會在 jobmanager.log
中看到以下的日誌:
Checkpoint 1 of job 85d268e6fbc19411185f7e4868a44178 expired before completing.
表示 Chekpoint 1 因爲超時而失敗,這個時候能夠能夠看這個日誌後面是否有相似下面的日誌:
Received late message for now expired checkpoint attempt 1 from 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178.
能夠按照 2.1.1 中的方法找到對應的 taskmanager.log 查看具體信息。
下面的日誌若是是 DEBUG 的話,咱們會在開始處標記
DEBUG
咱們按照下面的日誌把 TM 端的 snapshot 分爲三個階段,開始作 snapshot 前,同步階段,異步階段:
DEBUG Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)
這個日誌表示 TM 端 barrier 對齊後,準備開始作 Checkpoint。
DEBUG 2019-08-06 13:43:02,613 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@70442baf, checkpointDirectory=xxxxxxxx, sharedStateDirectory=xxxxxxxx, taskOwnedStateDirectory=xxxxxx, metadataFilePath=xxxxxx, reference=(default), fileStateSizeThreshold=1024}, synchronous part) in thread Thread[Async calls on Source: xxxxxx _source -> Filter (27/70),5,Flink Task Threads] took 0 ms.
上面的日誌表示當前這個 backend 的同步階段完成,共使用了 0 ms。
DEBUG DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@7908affe, checkpointDirectory=xxxxxx, sharedStateDirectory=xxxxx, taskOwnedStateDirectory=xxxxx, metadataFilePath=xxxxxx, reference=(default), fileStateSizeThreshold=1024}, asynchronous part) in thread Thread[pool-48-thread-14,5,Flink Task Threads] took 369 ms
上面的日誌表示異步階段完成,異步階段使用了 369 ms
在現有的日誌狀況下,咱們經過上面三個日誌,定位 snapshot 是開始晚,同步階段作的慢,仍是異步階段作的慢。而後再按照狀況繼續進一步排查問題。
在 2.1 節中,咱們介紹了 Checkpoint 失敗的排查思路,本節會分狀況介紹 Checkpoint 慢的狀況。
Checkpoint 慢的狀況以下:好比 Checkpoint interval 1 分鐘,超時 10 分鐘,Checkpoint 常常須要作 9 分鐘(咱們但願 1 分鐘左右就可以作完),並且咱們預期 state size 不是很是大。
對於 Checkpoint 慢的狀況,咱們能夠按照下面的順序逐一檢查。
這個通常發生較少,可是也有可能,由於 source 作 snapshot 並往下游發送 barrier 的時候,須要搶鎖(這個如今社區正在進行用 mailBox 的方式替代當前搶鎖的方式,詳情參考[1])。若是一直搶不到鎖的話,則可能致使 Checkpoint 一直得不到機會進行。若是在 Source 所在的 taskmanager.log
中找不到開始作 Checkpoint 的 log,則能夠考慮是否屬於這種狀況,能夠經過 jstack
進行進一步確認鎖的持有狀況。
如今 Flink 中 Checkpoint 有兩種模式,全量 Checkpoint 和 增量 Checkpoint,其中全量 Checkpoint 會把當前的 state 所有備份一次到持久化存儲,而增量 Checkpoint,則只備份上一次 Checkpoint 中不存在的 state,所以增量 Checkpoint 每次上傳的內容會相對更好,在速度上會有更大的優點。
如今 Flink 中僅在 RocksDBStateBackend 中支持增量 Checkpoint,若是你已經使用 RocksDBStateBackend,能夠經過開啓增量 Checkpoint 來加速,具體的能夠參考 [2]。
咱們知道 task 僅在接受到全部的 barrier 以後纔會進行 snapshot,若是做業存在反壓,或者有數據傾斜,則會致使所有的 channel 或者某些 channel 的 barrier 發送慢,從而總體影響 Checkpoint 的時間,這兩個能夠經過以下的頁面進行檢查:
上圖中咱們選擇了一個 task,查看全部 subtask 的反壓狀況,發現都是 high,表示反壓狀況嚴重,這種狀況下會致使下游接收 barrier 比較晚。
上圖中咱們選擇其中一個 operator,點擊全部的 subtask,而後按照 Records Received/Bytes Received/TPS 從大到小進行排序,能看到前面幾個 subtask 會比其餘的 subtask 要處理的數據多。
若是存在反壓或者數據傾斜的狀況,咱們須要首先解決反壓或者數據傾斜問題以後,再查看 Checkpoint 的時間是否符合預期。
從前面咱們知道 Checkpoint 在 task 端分爲 barrier 對齊(收齊全部上游發送過來的 barrier),而後開始同步階段,再作異步階段。若是 barrier 一直對不齊的話,就不會開始作 snapshot。
barrier 對齊以後會有以下日誌打印:
DEBUG Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)
若是 taskmanager.log
中沒有這個日誌,則表示 barrier 一直沒有對齊,接下來咱們須要瞭解哪些上游的 barrier 沒有發送下來,若是你使用 At Least Once 的話,能夠觀察下面的日誌:
DEBUG Received barrier for checkpoint 96508 from channel 5
表示該 task 收到了 channel 5 來的 barrier,而後看對應 Checkpoint,再查看還剩哪些上游的 barrier 沒有接受到,對於 ExactlyOnce 暫時沒有相似的日誌,能夠考慮本身添加,或者 jmap 查看。
在 task 端,全部的處理都是單線程的,數據處理和 barrier 處理都由主線程處理,若是主線程在處理太慢(好比使用 RocksDBBackend,state 操做慢致使總體處理慢),致使 barrier 處理的慢,也會影響總體 Checkpoint 的進度,在這一步咱們須要可以查看某個 PID 對應 hotmethod,這裏推薦兩個方法:
RUNNABLE
狀態的線程有哪些;若是有其餘更方便的方法固然更好,也歡迎推薦。
同步階段通常不會太慢,可是若是咱們經過日誌發現同步階段比較慢的話,對於非 RocksDBBackend 咱們能夠考慮查看是否開啓了異步 snapshot,若是開啓了異步 snapshot 仍是慢,須要看整個 JVM 在幹嗎,也可使用前一節中的工具。對於 RocksDBBackend 來講,咱們能夠用 iostate
查看磁盤的壓力如何,另外能夠查看 tm 端 RocksDB 的 log 的日誌如何,查看其中 SNAPSHOT 的時間總共開銷多少。
RocksDB 開始 snapshot 的日誌以下:
2019/09/10-14:22:55.734684 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:83] Started the snapshot process -- creating snapshot in directory /tmp/flink-io-87c360ce-0b98-48f4-9629-2cf0528d5d53/XXXXXXXXXXX/chk-92729
snapshot 結束的日誌以下:
2019/09/10-14:22:56.001275 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:145] Snapshot DONE. All is good
對於異步階段來講,tm 端主要將 state 備份到持久化存儲上,對於非 RocksDBBackend 來講,主要瓶頸來自於網絡,這個階段能夠考慮觀察網絡的 metric,或者對應機器上可以觀察到網絡流量的狀況(好比 iftop
)。
對於 RocksDB 來講,則須要從本地讀取文件,寫入到遠程的持久化存儲上,因此不只須要考慮網絡的瓶頸,還須要考慮本地磁盤的性能。另外對於 RocksDBBackend 來講,若是以爲網絡流量不是瓶頸,可是上傳比較慢的話,還能夠嘗試考慮開啓多線程上傳功能[3]。
在第二部份內容中,咱們介紹了官方編譯的包的狀況下排查一些 Checkpoint 異常狀況的主要場景,以及相應的排查方法,若是排查了上面全部的狀況,仍是沒有發現瓶頸所在,則能夠考慮添加更詳細的日誌,逐步將範圍縮小,而後最終定位緣由。
上文提到的一些 DEBUG 日誌,若是 flink dist 包是本身編譯的話,則建議將 Checkpoint 整個步驟內的一些 DEBUG 改成 INFO,可以經過日誌瞭解整個 Checkpoint 的總體階段,何時完成了什麼階段,也在 Checkpoint 異常的時候,快速知道每一個階段都消耗了多少時間。
[1] Change threading-model in StreamTask to a mailbox-based approach
[2] 增量 checkpoint 原理介紹
[3] RocksDBStateBackend 多線程上傳 State
▼ Apache Flink 社區推薦 ▼
Apache Flink 及大數據領域頂級盛會 Flink Forward Asia 2019 重磅開啓,目前正在徵集議題,限量早鳥票優惠ing。瞭解 Flink Forward Asia 2019 的更多信息,請查看:
https://developer.aliyun.com/...
首屆 Apache Flink 極客挑戰賽重磅開啓,聚焦機器學習與性能優化兩大熱門領域,40萬獎金等你拿,加入挑戰請點擊: