記錄一次Flink做業異常的排查過程

本文來自: PerfMa技術社區

PerfMa(笨馬網絡)官網java

最近2周開始接手apache flink全鏈路監控數據的做業,包括指標統計,業務規則匹配等邏輯,計算結果實時寫入elasticsearch. 昨天遇到生產環境有做業沒法正常重啓的問題,我負責對這個問題進行排查跟進。apache

第一步,基礎排查

首先拿到jobmanager和taskmanager的日誌,我從taskmanager日誌中很快發現2個基礎類型的報錯,一個是npe,一個是索引找不到的異常api

elasticsearch sinker在執行寫入數據的先後提供回調接口讓做業開發人員對異常或者成功寫入進行處理,若是在處理異常過程當中有異常拋出,那麼框架會讓該task失敗,致使做業重啓。網絡

npe很容易修復,索引找不到是建立索引的服務中的一個小bug,這些都是小問題。併發

重點是在日誌中我看到另外一個錯誤:框架

java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Unknown Source)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.<init>(RecordWriter.java:122)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.createRecordWriter(RecordWriter.java:321)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriter(StreamTask.java:1202)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1170)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:212)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:190)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.<init>(OneInputStreamTask.java:52)
    at sun.reflect.GeneratedConstructorAccessor4.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
    at java.lang.reflect.Constructor.newInstance(Unknown Source)
    at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1405)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:689)
    at java.lang.Thread.run(Unknown Source)

這種異常,通常是nproc設置過小致使的,或者物理內存耗盡,檢查完ulimit和內存,發現都很正常,這就比較奇怪了。elasticsearch

第二步、分析jstack和jmap

perfma有一個產品叫xland,我也是第一次使用,不得不說,確實牛逼,好用!
首先把出問題的taskmanager的線程棧信息和內存dump出來,具體命令:ide

jstatck pid > 生成的文件名
jmap -dump:format=b,file=生成的文件名 進程號

接着把這兩個文件導入xland,xland能夠直接看到線程總數,能夠方便搜索統計線程數、實例個數等等學習

最早發現的問題是這個taskmanager 線程總數居然有17000+,這個數字顯然有點大,這個時候我想看一下,哪種類型的線程比較大,xland能夠很方便的搜索,統計,這時候我注意到有一種類型的線程很是多,總數15520測試

image.png

更上層的調用信息看不到了,只看到來自apache http client,根據做業流程,首先想到的就是es sinker的RestHighLevelClient用到這個東西

那麼咱們在xland中統計RestHighLevelClient對象個數,發現有幾百個,很顯然這裏有問題

第三步、定位具體問題

有了前面xland的幫助,咱們很容易定位到是esclient出了問題
在咱們的做業裏面有2個地方用到了es client,一個是es sinker,es sinker使用的就是RestHighLevelClient,另外一個是咱們同窗本身寫的一個es client,一樣是使用RestHighLevelClient,在es sinker的ElasticsearchSinkFunction中單獨構造,用於在寫入es前,先搜索一些東西拿來合併,還作了cache

一、懷疑RestHighLevelClient bug

咱們經過一個測試,來驗證是否是RestHighLevelClient的問題

啓動一個單純使用es sinker的job,調整併發度,觀察前面出現較多的
I/O dispatcher線程的個數,最後發現單個es sinker也會有240+個
I/O dispatcher線程,經過調整併發,全部taskmanager的
I/O dispatcher線程總數基本和併發成正向比例
停掉寫es做業,此時全部taskmanager是不存在I/O dispatcher線程的

看起來I/O dispatcher那種線程數量大,彷佛是「正常的」

二、殺掉做業,觀察線程是否被正常回收
殺掉做業,I/O dispatcher線程變成0了,看起來es sinker使用是正常的

這時候基本上能夠判斷是咱們本身寫的es client的問題。究竟是什麼問題呢?

咱們再作一個測試進一步確認

三、啓動問題做業,殺死job後,觀察I/O dispatcher線程個數
重啓flink的全部taskmanager,給一個「純淨」的環境,發現殺死做業後,還有I/O dispatcher線程。
這個測試能夠判斷是咱們的es client存在線程泄漏

4、背後的原理

es sinker本質上是一個RichSinkFunction,RichSinkFunction帶了open 和close 方法,在close方法中,es sinker正確關閉了http client

@Override
    public void close() throws Exception {
        if (bulkProcessor != null) {
            bulkProcessor.close();
            bulkProcessor = null;
        }

        if (client != null) {
            client.close();
            client = null;
        }

        callBridge.cleanup();

        // make sure any errors from callbacks are rethrown
        checkErrorAndRethrow();
    }

而咱們的es client是沒有被正確關閉的。

具體原理應該是是這樣的,當es sinker出現npe或者寫es rejected等異常時,job會被flink重啓,es sinker這種RichSinkFunction類型的算子會被flink 調用close關閉釋放掉一些資源,而咱們寫在ElasticsearchSinkFunction中es client,是不會被框架關照到的,而這種寫法咱們本身也沒法預先定義重啓後關閉client的邏輯.

若是在構造時使用單例,理論上應該是能夠避免做業反覆重啓時es client不斷被構造致使線程泄漏和內存泄漏的,可是編寫單例寫法有問題,雖然有double check,可是沒加volatile,同時鎖的是this, 而不是類。

5、小結

一、xland確實好用,排查問題幫助很大。

二、flink做業用到的外部客戶端不要單獨構造,要使用相似RichFunction這種方式,提供open,close方法,確保讓資源可以被flink正確釋放掉。

三、用到的對象,建立的線程,線程池等等最好都起一個名字,方便使用xland過後排查問題,若是有經驗的話,應該一開始就統計下用於構造es client的那個包裝類對象個數。

一塊兒來學習吧

PerfMa KO 系列課之 JVM 參數【Memory篇】

JCU之 FutureTask 源碼與工做原理分析

相關文章
相關標籤/搜索