Apache Flink Task執行之數據流處理流程源碼分析

Flink Task執行之數據流處理流程源碼分析

這裏的分析已經知道咱們編寫的代碼最終被封裝成Flink Task並在一個while循環中由一個線程中不斷執行,本篇文章將進一步瞭解Flink Task處理數據流的整個流程。java

獲取流數據

用戶提交的代碼最終被封裝成了org.apache.flink.runtime.taskmanager.Task,Task是一個Runnable所以核心代碼就在run方法,run方法調用了doRun方法,在doRun中調用了invokable.invoke(),Task的整個處理流程其實就在這裏面。org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable是一個抽象類,它的子類是不一樣類型的Task,這裏咱們主要關注流處理任務相關的org.apache.flink.streaming.runtime.tasks.StreamTask,StreamTask的invoke方法執行了runMailboxLoop()方法。apache

runMailboxLoop()方法就是執行org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor的runMailboxLoop方法。MailboxProcessor是一種線程模型,runMailboxLoop就是在while輪詢中不斷執行任務和默認動做,其中默認動做就是StreamTask的processInput方法,該方法調用了StreamInputProcessor的inputProcessor方法,在這個方法中獲取並處理了流數據。StreamInputProcessor的子類StreamOneInputProcessor和StreamTwoInputProcessor分別用來處理有1個和2個入度的Task(StreamMultipleInputProcessor先無論)。StreamOneInputProcessor中有1個StreamTaskInput用來獲取數據,1個DataOutput用來收集從StreamTaskInput獲取的數據;同理,StreamTwoInputProcessor有2個StreamTaskInput和2個DataOutput。StreamTaskInput的子類StreamTaskNetworkInput用來從網絡中獲取流數據,經過調用他它的emitNext不只處理流數據還處理了checkpoint barrier,本篇文章只關注數據流的處理流程。StreamTaskNetworkInput從反序列化器中獲取到完整流數據後把數據交給DataOutput。DataOutput也有處理1個入度和2個入度的子類,它們都持有OperatorChain中第一個operator的引用,稱爲headOperator,DataOutput從StreamTaskInput那裏獲取到數據後會交給headOperator來處理。到此爲止,流數據被獲取並傳入了OperatorChain。 這裏總結一下:StreamTask的processInput方法在MailboxProcessor中被反覆調用,在processInput方法中StreamTask使用StreamInputProcessor來獲取並處理流數據。StreamInputProcessor中的StreamTaskInput用來獲取數據,獲取的數據交給DataOutput,DataOutput將數據傳入OperatorChain的第一個operator。其中StreamTask,StreamInputProcessor和DataOutput都有處理1個入度和2個入度的子類。api

StreamTaskProcessInput

數據流過OperatorChain

OperatorChain的第一個operator獲取數據後,數據是怎樣在OperatorChain中流動的呢?首先說說OperatorChain,StreamOperatorWrapper是chain的每一個節點,每一個節點都有指向下一個或上一個節點的引用,所以OperatorChain是一個雙向鏈表。可是數據的流動並不依靠這個鏈式結構。上文咱們提到DataOutput將數據交給了headOperator,OperatorChain的第一個節點都是StreamOperator的子類,咱們編寫的filer算子,map算子等最終都會被封裝成StreamOperator,例如子類StreamFlatMap就是執行flatMap方法,StreamFilter就是執行fliter方法等。這些方法執行的時候用org.apache.flink.streaming.api.operators.Output對處理後的結果進行收集。例如StreamFilter當FilterFunction返回true時收集數據,而StreamFlatMap將Output傳入flatMap方法中由用戶代碼進行收集數據。收集的數據是怎樣向OperatorChain的下一個節點傳遞的呢?原來Output中持有OneInputStreamOperator變量指向了chain中下一個節點的算子,調用Output的collect方法會調用下一個算子的processElement,數據就這樣在整個OperatorChain中傳遞了。數組

OperatorChain

發向下游Task

當數據傳到OperatorChain的最後一個算子時數據是怎樣發向下個Task的呢?最後一個算子擁有的Output實現類是org.apache.flink.streaming.runtime.io.RecordWriterOutput。RecordWriterOutput的collect方法會調用的org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit方法用來發送數據,該方法會將序列化器中的數據複製到BufferBuilder中。BufferBuilder維護了一個內存片斷MemorySegment而且能夠建立相應的消費者。RecordWriter有2個實現類ChannelSelectorRecordWriter和BroadcastRecordWriter。Task向下遊節點的多個並行度發送數據,每一個並行度都對應一個channel。ChannelSelectorRecordWriter爲每一個chanel都保存一個BufferBuilder並分別添加BufferConsumer:緩存

BufferBuilder bufferBuilder = super.requestNewBufferBuilder(targetChannel);//按channel獲取BufferBuilder
addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);//按channel添加BufferConsumer
bufferBuilders[targetChannel] = bufferBuilder;

BroadcastRecordWriter只有一個BufferBuilder,使用同一個BufferBuilder給全部的channel添加BufferConsumer:網絡

try (BufferConsumer bufferConsumer = builder.createBufferConsumer()) {
    for (int channel = 0; channel < numberOfChannels; channel++) {
        addBufferConsumer(bufferConsumer.copy(), channel);//全部channel用同一個BufferBuilder達到廣播的目的
    }
}

RecordWriter#requestNewBufferBuilder方法會獲取BufferBuilder,若是獲取失敗會致使Task執行線程阻塞形成反壓。app

public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
    BufferBuilder builder = targetPartition.tryGetBufferBuilder(targetChannel);//嘗試獲取,獲取不到返回null
    if (builder == null) {
        long start = System.currentTimeMillis();
        builder = targetPartition.getBufferBuilder(targetChannel);//阻塞獲取,致使反壓
        idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);
    }
    return builder;
}

BufferBuilder最終來自LocalBufferPool,LocalBufferPool有幾個重要的屬性:oop

//taskmanager的網絡緩存池,MemorySegment從這裏獲取
private final NetworkBufferPool networkBufferPool;
//已經獲取的MemorySegment被組織成一個隊列
private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();
//當前localBufferPool的大小
private int currentPoolSize;
//已經獲取的MemorySegment
private int numberOfRequestedMemorySegments;
//每一個channel能同時獲取的最大BufferBuilder數
private final int maxBuffersPerChannel;
//subpartition就是channel,數組存儲了每一個channel同時使用的BufferBuilder數
private final int[] subpartitionBuffersCount;

BufferBuilder由requestMemorySegment方法和requestMemorySegmentBlocking方法獲取,requestMemorySegmentBlocking方法也是調用requestMemorySegment方法並在沒有獲取到MemorySegment時經過AvailableFuture的get方法來阻塞直到獲取成功爲止,AvailableFuture是一個用CompletableFuture表示的狀態位,這裏用到了CompletableFuture的get方法會阻塞直到complete的特性,沒有完成的future表示unavailable,完成了的表示available。requestMemorySegment方法中若是已經獲取的MemorySegment(numberOfRequestedMemorySegments)大於了localBufferPool的大小(currentPoolSize)須要將多餘的MemorySegment先歸還給networkBufferPool。以後獲取MemorySegment,若是獲取不到就設置AvailableFuture爲不可用,不然記錄channel使用的MemorySegment數量,若是大於maxBuffersPerChannel,也設置AvailableFuture爲不可用。源碼分析

@Nullable
private MemorySegment requestMemorySegment(int targetChannel) throws IOException {
    MemorySegment segment = null;
    synchronized (availableMemorySegments) {
        returnExcessMemorySegments();//將多餘的segment歸還給networkBufferPool

        if (availableMemorySegments.isEmpty()) {
            segment = requestMemorySegmentFromGlobal();//全局獲取
        }
        // segment may have been released by buffer pool owner
        if (segment == null) {
            segment = availableMemorySegments.poll();//局部獲取
        }
        if (segment == null) {
            availabilityHelper.resetUnavailable();//獲取不到設置爲不可用
        }

        //記錄channel正在使用segment數,若是超了設置爲不可用
        if (segment != null && targetChannel != UNKNOWN_CHANNEL) {
            if (subpartitionBuffersCount[targetChannel]++ == maxBuffersPerChannel) {
                unavailableSubpartitionsCount++;
                availabilityHelper.resetUnavailable();
            }
        }
    }
    return segment;
}

反壓的採集

上面說的AvailableFuture設置爲不可用其實和反壓有關,Task的isBackPressured方法返回了該Task是否產生了反壓。ui

public boolean isBackPressured() {
    if (invokable == null || consumableNotifyingPartitionWriters.length == 0 || !isRunning()) {
        return false;
    }
    //獲取全部的AvailableFuture,若是有沒完成了則有反壓
    final CompletableFuture<?>[] outputFutures = new CompletableFuture[consumableNotifyingPartitionWriters.length];
    for (int i = 0; i < outputFutures.length; ++i) {
        outputFutures[i] = consumableNotifyingPartitionWriters[i].getAvailableFuture();
    }
    return !CompletableFuture.allOf(outputFutures).isDone();
}
相關文章
相關標籤/搜索