Flink Task執行之數據流處理流程源碼分析
從這裏的分析已經知道咱們編寫的代碼最終被封裝成Flink Task並在一個while循環中由一個線程中不斷執行,本篇文章將進一步瞭解Flink Task處理數據流的整個流程。java
獲取流數據
用戶提交的代碼最終被封裝成了org.apache.flink.runtime.taskmanager.Task,Task是一個Runnable所以核心代碼就在run方法,run方法調用了doRun方法,在doRun中調用了invokable.invoke(),Task的整個處理流程其實就在這裏面。org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable是一個抽象類,它的子類是不一樣類型的Task,這裏咱們主要關注流處理任務相關的org.apache.flink.streaming.runtime.tasks.StreamTask,StreamTask的invoke方法執行了runMailboxLoop()方法。apache
runMailboxLoop()方法就是執行org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor的runMailboxLoop方法。MailboxProcessor是一種線程模型,runMailboxLoop就是在while輪詢中不斷執行任務和默認動做,其中默認動做就是StreamTask的processInput方法,該方法調用了StreamInputProcessor的inputProcessor方法,在這個方法中獲取並處理了流數據。StreamInputProcessor的子類StreamOneInputProcessor和StreamTwoInputProcessor分別用來處理有1個和2個入度的Task(StreamMultipleInputProcessor先無論)。StreamOneInputProcessor中有1個StreamTaskInput用來獲取數據,1個DataOutput用來收集從StreamTaskInput獲取的數據;同理,StreamTwoInputProcessor有2個StreamTaskInput和2個DataOutput。StreamTaskInput的子類StreamTaskNetworkInput用來從網絡中獲取流數據,經過調用他它的emitNext不只處理流數據還處理了checkpoint barrier,本篇文章只關注數據流的處理流程。StreamTaskNetworkInput從反序列化器中獲取到完整流數據後把數據交給DataOutput。DataOutput也有處理1個入度和2個入度的子類,它們都持有OperatorChain中第一個operator的引用,稱爲headOperator,DataOutput從StreamTaskInput那裏獲取到數據後會交給headOperator來處理。到此爲止,流數據被獲取並傳入了OperatorChain。 這裏總結一下:StreamTask的processInput方法在MailboxProcessor中被反覆調用,在processInput方法中StreamTask使用StreamInputProcessor來獲取並處理流數據。StreamInputProcessor中的StreamTaskInput用來獲取數據,獲取的數據交給DataOutput,DataOutput將數據傳入OperatorChain的第一個operator。其中StreamTask,StreamInputProcessor和DataOutput都有處理1個入度和2個入度的子類。api
數據流過OperatorChain
OperatorChain的第一個operator獲取數據後,數據是怎樣在OperatorChain中流動的呢?首先說說OperatorChain,StreamOperatorWrapper是chain的每一個節點,每一個節點都有指向下一個或上一個節點的引用,所以OperatorChain是一個雙向鏈表。可是數據的流動並不依靠這個鏈式結構。上文咱們提到DataOutput將數據交給了headOperator,OperatorChain的第一個節點都是StreamOperator的子類,咱們編寫的filer算子,map算子等最終都會被封裝成StreamOperator,例如子類StreamFlatMap就是執行flatMap方法,StreamFilter就是執行fliter方法等。這些方法執行的時候用org.apache.flink.streaming.api.operators.Output對處理後的結果進行收集。例如StreamFilter當FilterFunction返回true時收集數據,而StreamFlatMap將Output傳入flatMap方法中由用戶代碼進行收集數據。收集的數據是怎樣向OperatorChain的下一個節點傳遞的呢?原來Output中持有OneInputStreamOperator變量指向了chain中下一個節點的算子,調用Output的collect方法會調用下一個算子的processElement,數據就這樣在整個OperatorChain中傳遞了。數組
發向下游Task
當數據傳到OperatorChain的最後一個算子時數據是怎樣發向下個Task的呢?最後一個算子擁有的Output實現類是org.apache.flink.streaming.runtime.io.RecordWriterOutput。RecordWriterOutput的collect方法會調用的org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit方法用來發送數據,該方法會將序列化器中的數據複製到BufferBuilder中。BufferBuilder維護了一個內存片斷MemorySegment而且能夠建立相應的消費者。RecordWriter有2個實現類ChannelSelectorRecordWriter和BroadcastRecordWriter。Task向下遊節點的多個並行度發送數據,每一個並行度都對應一個channel。ChannelSelectorRecordWriter爲每一個chanel都保存一個BufferBuilder並分別添加BufferConsumer:緩存
BufferBuilder bufferBuilder = super.requestNewBufferBuilder(targetChannel);//按channel獲取BufferBuilder addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);//按channel添加BufferConsumer bufferBuilders[targetChannel] = bufferBuilder;
BroadcastRecordWriter只有一個BufferBuilder,使用同一個BufferBuilder給全部的channel添加BufferConsumer:網絡
try (BufferConsumer bufferConsumer = builder.createBufferConsumer()) { for (int channel = 0; channel < numberOfChannels; channel++) { addBufferConsumer(bufferConsumer.copy(), channel);//全部channel用同一個BufferBuilder達到廣播的目的 } }
RecordWriter#requestNewBufferBuilder方法會獲取BufferBuilder,若是獲取失敗會致使Task執行線程阻塞形成反壓。app
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException { BufferBuilder builder = targetPartition.tryGetBufferBuilder(targetChannel);//嘗試獲取,獲取不到返回null if (builder == null) { long start = System.currentTimeMillis(); builder = targetPartition.getBufferBuilder(targetChannel);//阻塞獲取,致使反壓 idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start); } return builder; }
BufferBuilder最終來自LocalBufferPool,LocalBufferPool有幾個重要的屬性:oop
//taskmanager的網絡緩存池,MemorySegment從這裏獲取 private final NetworkBufferPool networkBufferPool; //已經獲取的MemorySegment被組織成一個隊列 private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>(); //當前localBufferPool的大小 private int currentPoolSize; //已經獲取的MemorySegment private int numberOfRequestedMemorySegments; //每一個channel能同時獲取的最大BufferBuilder數 private final int maxBuffersPerChannel; //subpartition就是channel,數組存儲了每一個channel同時使用的BufferBuilder數 private final int[] subpartitionBuffersCount;
BufferBuilder由requestMemorySegment方法和requestMemorySegmentBlocking方法獲取,requestMemorySegmentBlocking方法也是調用requestMemorySegment方法並在沒有獲取到MemorySegment時經過AvailableFuture的get方法來阻塞直到獲取成功爲止,AvailableFuture是一個用CompletableFuture表示的狀態位,這裏用到了CompletableFuture的get方法會阻塞直到complete的特性,沒有完成的future表示unavailable,完成了的表示available。requestMemorySegment方法中若是已經獲取的MemorySegment(numberOfRequestedMemorySegments)大於了localBufferPool的大小(currentPoolSize)須要將多餘的MemorySegment先歸還給networkBufferPool。以後獲取MemorySegment,若是獲取不到就設置AvailableFuture爲不可用,不然記錄channel使用的MemorySegment數量,若是大於maxBuffersPerChannel,也設置AvailableFuture爲不可用。源碼分析
@Nullable private MemorySegment requestMemorySegment(int targetChannel) throws IOException { MemorySegment segment = null; synchronized (availableMemorySegments) { returnExcessMemorySegments();//將多餘的segment歸還給networkBufferPool if (availableMemorySegments.isEmpty()) { segment = requestMemorySegmentFromGlobal();//全局獲取 } // segment may have been released by buffer pool owner if (segment == null) { segment = availableMemorySegments.poll();//局部獲取 } if (segment == null) { availabilityHelper.resetUnavailable();//獲取不到設置爲不可用 } //記錄channel正在使用segment數,若是超了設置爲不可用 if (segment != null && targetChannel != UNKNOWN_CHANNEL) { if (subpartitionBuffersCount[targetChannel]++ == maxBuffersPerChannel) { unavailableSubpartitionsCount++; availabilityHelper.resetUnavailable(); } } } return segment; }
反壓的採集
上面說的AvailableFuture設置爲不可用其實和反壓有關,Task的isBackPressured方法返回了該Task是否產生了反壓。ui
public boolean isBackPressured() { if (invokable == null || consumableNotifyingPartitionWriters.length == 0 || !isRunning()) { return false; } //獲取全部的AvailableFuture,若是有沒完成了則有反壓 final CompletableFuture<?>[] outputFutures = new CompletableFuture[consumableNotifyingPartitionWriters.length]; for (int i = 0; i < outputFutures.length; ++i) { outputFutures[i] = consumableNotifyingPartitionWriters[i].getAvailableFuture(); } return !CompletableFuture.allOf(outputFutures).isDone(); }