Flink+Disruptor踩坑記

Flink+Disruptor踩坑記

記錄一次Flink中使用disruptor的坑,背景是這樣的:有一個flink job工做流程是從kafka讀取日誌數據,通過處理後寫入es。寫入es的sink採用了disruptor,sink將數據寫入disruptor,disruptor消費者(workpool)負責將數據累積後寫入es。該job在某地上線啓動後,過一下子就會出現checkpoint超時的狀況,致使job長時間沒有正常工做。此時,若是中止job會致使flink taskmanager進程shutdown。flink集羣是standalone cluster,flink版本是1.7.2。java


taskmanager怎麼停了

首先來看看flink taskmanager進程shutdown的日誌,從日誌裏能夠看到,進程shutdown的緣由是在中止job的時候該taskmanager上運行的某個task180多秒都沒有響應中止指令,所以taskmanager本身把本身殺死了。好吧,首先看看task在taskmanager上是怎樣運行的,又是怎樣中止的?react

org.apache.flink.runtime.taskmanager.Task                     - Task did not exit gracefully within 180 + seconds.
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Task did not exit gracefully within 180 + seconds.
org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Fatal error occurred while executing the TaskManager. Shutting it down...
WARN  org.apache.flink.runtime.taskmanager.Task               - Task 'XXXXX' did not react to cancelling signal for 30 seconds, but is stuck in method:
 sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:338)
com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:136)
com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:105)
com.lmax.disruptor.RingBuffer.next(RingBuffer.java:263)

咱們從org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask方法能夠看出,job的算子最終被封裝成了org.apache.flink.runtime.taskmanager.Task。Task實現了Runnable,所以核心方法就是run方法。從該方法入手,能夠跟蹤到org.apache.flink.streaming.runtime.tasks.OneInputStreamTask#run方法,該方法有一個while循環,Task就是在這個循環中不斷調用org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput來處理流中的數據。綜上咱們能夠得出結論,job的任務最終在taskmanager上被封裝在一個線程中,而且在一個while循環中不斷處理流中數據。 咱們知道了任務最終被封裝成了線程,那任務是怎樣中止的呢?咱們從org.apache.flink.runtime.taskexecutor.TaskExecutor#cancelTask跟蹤到org.apache.flink.runtime.taskmanager.Task#cancelOrFailAndCancelInvokable方法,從該方法能夠看到爲了中止task建立了3個線程,分別是TaskCanceler,TaskInterrupter和TaskCancelerWatchDog。TaskCanceler作了3件事,第一,讓處理流的while循環退出;第二,關閉資源;第三,中斷task執行線程。TaskInterrupter作了1件事就是不斷間歇性的中斷task執行線程。TaskCancelerWatchDog就是不斷檢查task執行線程是否退出,若是超時未退出,就關閉當前的jvm。綜上咱們能夠得出結論,Task的中止要作幾個工做,第一,退出處理流的while循環;第二,關閉資源;第三,不斷間歇性中斷Task執行線程;第四,不斷檢查線程狀態若是長時間未退出則關閉jvm。 綜上,能夠知道taskmanager退出的緣由是task的執行線程在正常中止job的時候沒有正常退出。咱們從日誌能夠看出線程卡在了disruptor的生產數據時申請位置的next方法中,而且next方法沒有處理線程中斷。(咱們知道中斷線程只是改變了標誌位,須要在業務代碼裏本身處理線程中斷)apache


checkpoint爲啥超時了

咱們回到org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput方法,從這個方法咱們能夠看出它不只處理了流中的業務數據,還處理watermark,streamStatus,latencyMarker,還在barrierHandler.getNextNonBlocked()中處理了CheckpointBarrier。也就是說Task處理業務數據,watermark和checkpoint都是用一個線程。咱們從日誌能夠看出線程卡在了disruptor的插入方法,使得Task執行線程沒有機會去處理checkpoint,因此checkpoint超時了。api


什麼,disruptor這麼坑?

從上面的分析能夠知道,一切都是由線程卡在disruptor寫入時形成的。從com.lmax.disruptor.MultiProducerSequencer#next方法能夠得知,該方法只有在成功申請了位置後纔會返回。也就是說線程在這裏長時間未能成功申請到位置,致使線程一直在這個方法中循環(更多disruptor內容請查看這裏)。由此能夠初步判斷爲消費者由於寫 es慢致使disruptor隊列滿使得Task執行線程長時間沒法插入成功。從flink日誌中也確實發現了長達幾百秒的es慢插入,可是從數量上來講在某一時間段內這種慢插入是遠少於disruptor消費者,可是無論怎麼樣仍是用jstack查看一下disruptor消費者線程的運行狀況。不看不知道,一看嚇一跳。從jstack信息來看除了1個消費者阻塞在了寫es的代碼上,其他消費者居然都阻塞在了WaitStrategy上。這意味着,一方面生產者沒法生產數據,彷佛隊列就像滿了同樣;另外一方面消費者沒法消費數據,彷佛隊列就像空的同樣。這種看似矛盾的現象究竟是怎樣產生的呢?難道一個消費者的阻塞最終形成了整個隊列沒法正常使用?答案是的(什麼,這麼坑)。這一切都要從disruptor的消費者進度的保存位置提及。咱們首先來看看java.util.concurrent.ArrayBlockingQueue,這是jdk實現的循環隊列,當使用它做爲java線程池的工做隊列的話,若是一個線程在執行任務的時候阻塞了是不會影響隊列的生產和消費的,這是由於ArrayBlockingQueue是在隊列保存隊首和隊尾位置的,當一個線程阻塞了,消費者位置可由其餘消費者繼續推動。而disruptor不同他的消費者位置是保存在消費者那裏的,若是消費者線程卡在用戶代碼裏,那麼消費者就沒有機會更新位置,這樣的話,當生產者繞過一圈來到這個阻塞的消費者的時候就沒法成功寫入數據了。而其他非沒有阻塞的消費者能夠把隊列裏的數據消費完,此時因爲生產者沒法生產新數據,這樣消費者就阻塞到WaitStrategy上。下面是com.lmax.disruptor.WorkProcessor#runjvm

省略
while (true)
{
    try
    {
        if (processedSequence)
        {
            processedSequence = false;
            do
            {
                nextSequence = workSequence.get() + 1L;
                sequence.set(nextSequence - 1L);       設置消費者位置
            }
            while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
        }

        if (cachedAvailableSequence >= nextSequence)
        {
            event = ringBuffer.get(nextSequence);
            workHandler.onEvent(event);               執行用戶代碼,若是這裏線程被卡住了,那麼消費者位置沒法被更新
            processedSequence = true;
        }
省略

啓發

  1. disruptor確實不適合消費者會有較長時間阻塞的場景。在本案例中disruptor其實並無多大做用,後期已經被去掉。
  2. 在編寫可能會阻塞的flink算子時,應該考慮job被cancle時的線程中斷問題。例如,使用帶超時的api,超時後線程能夠返回並檢查是否中斷,若是中斷了就跳出阻塞等待,使得task執行線程有機會處理cancle,避免taskmanager殺死本身。
  3. 在standalone cluster模式下taskmanager殺後,jobmanager是沒法啓動它的,這會致使其餘job被重啓,集羣slot變少也可能會致使job沒法啓動。相比之下,flink on yarn會好一點,而且single-job模式至關於每一個job都有一個集羣,job間隔離比較好,job中止的時候taskmanager和jobmanager的進程都退出了。而standalone cluster模式進程是沒有中止的,也就是說job若是開啓了線程,在standalone cluster模式下必定要在job中止的時候終止這些線程,不然,線程不會隨job中止而中止,他會一直存在,因爲線程棧是gc root,線程引用的內存資源也沒法釋放,形成泄漏。
相關文章
相關標籤/搜索