[源碼解析]爲何mapPartition比map更高效

[源碼解析]爲何mapPartition比map更高效

0x00 摘要

自從函數式編程和響應式編程逐漸進入到程序員的生活以後,map函數做爲其中一個重要算子也爲你們所熟知,不管是前端web開發,手機開發仍是後端服務器開發,都很難逃過它的手心。而在大數據領域中又每每能夠見到另一個算子mapPartition的身影。在性能調優中,常常會被建議儘可能用 mappartition 操做去替代 map 操做。本文將從Flink源碼和示例入手,爲你們解析爲何mapPartition比map更高效。html

0x01 map vs mapPartition

1.1 map

Map的做用是將數據流上每一個元素轉換爲另外的元素,好比data.map { x => x.toInt }。它把數組流中的每個值,使用所提供的函數執行一遍,一一對應。獲得與元素個數相同的數組流。而後返回這個新數據流。前端

1.2 mapPartition

MapPartition的做用是單個函數調用並行分區,好比data.mapPartition { in => in map { (_, 1) } }。該函數將分區做爲「迭代器」,能夠產生任意數量的結果。每一個分區中的元素數量取決於並行度和之前的operations。java

1.3 異同

其實,二者完成的業務操做是同樣的,本質上都是將數據流上每一個元素轉換爲另外的元素。程序員

區別主要在兩點。web

從邏輯實現來說sql

  • map邏輯實現簡單,就是在函數中簡單一一轉換,map函數的輸入和輸入都是單個元素。
  • mapPartition相對複雜,函數的輸入有兩個,通常格式爲 void mapPartition(Iterable<T> values, Collector<O> out) 。其中values是須要映射轉換的全部記錄,out是用來發送結果的collector。具體返回什麼,如何操做out來返回結果,則徹底依賴於業務邏輯。

從調用次數來講數據庫

  • 數據有多少個元素,map就會被調用多少次。
  • 數據有多少分區,mapPartition就會被調用多少次。

爲何MapPartition有這麼高效呢,下面咱們將具體論證。apache

0x02 代碼

首先咱們給出示例代碼,從下文中咱們能夠看出,map就是簡單的轉換,而mapPartition則不但要作轉換,程序員還須要手動操做如何返回結果:編程

public class IteratePi {

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
        //迭代次數
        int iterativeNum=10;
        DataSet<Integer> wordList = env.fromElements(1, 2, 3);
      
        IterativeDataSet<Integer> iterativeDataSet=wordList.iterate(iterativeNum);
        DataSet<Integer> mapResult=iterativeDataSet
          			.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer value) throws Exception {
                value += 1;
                return value;
            }
        });
        //迭代結束的條件
        DataSet<Integer> result=iterativeDataSet.closeWith(mapResult);
        result.print();

        MapPartitionOperator<Integer, Integer> mapPartitionResult = iterativeDataSet
                .mapPartition(new MapPartitionFunction<Integer, Integer>() {
            @Override
            public void mapPartition(Iterable<Integer> values, Collector<Integer> out) {
                for (Integer value : values) {
                    // 這裏須要程序員自行決定如何返回,即調用collect操做。
                    out.collect(value + 2);
                }
            }                                                                                                                           					}
        );
        //迭代結束的條件
        DataSet<Integer> partitionResult=iterativeDataSet.closeWith(mapPartitionResult);
        partitionResult.print();
    }
}

0x03 Flink的傳輸機制

世界上不多有沒有來由的愛,也少見免費的午飯。mapPartition之因此高效,其所依賴的基礎就是Flink的傳輸機制。因此咱們下面就講解下爲何。後端

你們都知道,Spark是用微批處理來模擬流處理,就是說,spark仍是一批一批的傳輸和處理數據,因此咱們就能理解mapPartition的機制就是基於這一批數據作統一處理。這樣確實能夠高效。

可是Flink號稱是純流,即Flink是每來一個輸入record,就進行一次業務處理,而後返回給下游算子。

有的兄弟就會產生疑問:每次都只是處理單個記錄,怎麼可以讓mapPartition作到批次處理呢。其實這就是Flink的微妙之處:即Flink確實是每次都處理一個輸入record,可是在上下游傳輸時候,Flink仍是把records累積起來作批量傳輸的。也能夠這麼理解:從傳輸的角度講,Flink是微批處理的

3.1 傳輸機制概述

Flink 的網絡棧是組成 flink-runtime 模塊的核心組件之一,也是 Flink 做業的核心部分。全部來自 TaskManager 的工做單元(子任務)都經過它來互相鏈接。流式傳輸數據流都要通過網絡棧,因此它對 Flink 做業的性能表現(包括吞吐量和延遲指標)相當重要。與經過 Akka 使用 RPC 的 TaskManager 和 JobManager 之間的協調通道相比,TaskManager 之間的網絡棧依賴的是更底層的,基於 Netty 的 API。

3.2 遠程通訊

一個運行的application的tasks在持續交換數據。TaskManager負責作數據傳輸。不一樣任務之間的每一個(遠程)網絡鏈接將在 Flink 的網絡棧中得到本身的 TCP 通道。可是若是同一任務的不一樣子任務被安排到了同一個 TaskManager,則它們與同一個 TaskManager 的網絡鏈接將被多路複用,並共享一個 TCP 信道以減小資源佔用。

每一個TaskManager有一組網絡緩衝池(默認每一個buffer是32KB),用於發送與接受數據。如發送端和接收端位於不一樣的TaskManager進程中,則它們須要經過操做系統的網絡棧進行交流。流應用須要以管道的模式進行數據交換,也就是說,每對TaskManager會維持一個永久的TCP鏈接用於作數據交換。在shuffle鏈接模式下(多個sender與多個receiver),每一個sender task須要向每一個receiver task發送數據,此時TaskManager須要爲每一個receiver task都分配一個緩衝區。

一個記錄被建立並傳遞以後(例如經過 Collector.collect()),它會被遞交到RecordWriter,其未來自 Java 對象的記錄序列化爲一個字節序列,後者最終成爲網絡緩存。RecordWriter 首先使用SpanningRecordSerializer將記錄序列化爲一個靈活的堆上字節數組。而後它嘗試將這些字節寫入目標網絡通道的關聯網絡緩存。

由於若是逐個發送會下降每一個記錄的開銷並帶來更高的吞吐量,因此爲了取得高吞吐量,TaskManager的網絡組件首先從緩衝buffer中收集records,而後再發送。也就是說,records並非一個接一個的發送,而是先放入緩衝,而後再以batch的形式發送。這個技術能夠高效使用網絡資源,並達到高吞吐。相似於網絡或磁盤 I/O 協議中使用的緩衝技術。

接收方網絡棧(netty)將接收到的緩存寫入適當的輸入通道。最後(流式)任務的線程從這些隊列中讀取並嘗試在RecordReader的幫助下,經過Deserializer將積累的數據反序列化爲 Java 對象。

3.3 TaskManager進程內傳輸

若sender與receiver任務都運行在同一個TaskManager進程,則sender任務會將發送的條目作序列化,並存入一個字節緩衝。而後將緩衝放入一個隊列,直到隊列被填滿。

Receiver任務從隊列中獲取緩衝,並反序列化輸入的條目。因此,在同一個TaskManager內,任務之間的數據傳輸並不通過網絡交互。

在同一個TaskManager進程內,也是批量傳輸

3.4 源碼分析

咱們基於Flink優化的結果進行分析驗證,看看Flink是否是把記錄寫入到buffer中,這種狀況下運行的是CountingCollector和ChainedMapDriver。

copyFromSerializerToTargetChannel:153, RecordWriter (org.apache.flink.runtime.io.network.api.writer)
emit:116, RecordWriter (org.apache.flink.runtime.io.network.api.writer)
emit:60, ChannelSelectorRecordWriter (org.apache.flink.runtime.io.network.api.writer)
collect:65, OutputCollector (org.apache.flink.runtime.operators.shipping)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
collect:79, ChainedMapDriver (org.apache.flink.runtime.operators.chaining)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
invoke:196, DataSourceTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)

當執行完用戶定義的map函數以後,系統運行在 ChainedMapDriver.collect 函數。

public void collect(IT record) {
    this.outputCollector.collect(this.mapper.map(record));// mapper就是用戶代碼
}

而後調用到了CountingCollector.collect

public void collect(OUT record) {
		this.collector.collect(record);// record就是用戶轉換後的記錄
}

OutputCollector.collect函數會把記錄發送給全部的writers。

this.delegate.setInstance(record);// 先把record設置到SerializationDelegate中
for (RecordWriter<SerializationDelegate<T>> writer : writers) {  // 全部的writer
   writer.emit(this.delegate); // 發送record
}

RecordWriter負責把數據序列化,而後寫入到緩存中。它有兩個實現類:

  • BroadcastRecordWriter: 維護了多個下游channel,發送數據到下游全部的channel中。
  • ChannelSelectorRecordWriter: 經過channelSelector對象判斷數據須要發往下游的哪一個channel。咱們用的正是這個RecordWriter

這裏咱們分析下ChannelSelectorRecordWriteremit方法:

public void emit(T record) throws IOException, InterruptedException {
   emit(record, channelSelector.selectChannel(record));
}

這裏使用了channelSelector.selectChannel方法。該方法爲record尋找到對應下游channel id。

public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T>> {
	public final int selectChannel(SerializationDelegate<T> record) {
		switch (strategy) {
		case FORWARD:
			return forward(); // 咱們代碼用到了這種狀況。這裏 return 0;
    ......
		}
	}
}

接下來咱們又回到了父類RecordWriter.emit

protected void emit(T record, int targetChannel) throws IOException, InterruptedException {
   serializer.serializeRecord(record);
   // Make sure we don't hold onto the large intermediate serialization buffer for too long
   if (copyFromSerializerToTargetChannel(targetChannel)) {
      serializer.prune();
   }
}

關鍵的邏輯在於copyFromSerializerToTargetChannel此方法從序列化器中複製數據到目標channel,咱們能夠看出來,每條記錄都是寫入到buffer中

protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {
   // We should reset the initial position of the intermediate serialization buffer before
   // copying, so the serialization results can be copied to multiple target buffers.
   // 此處Serializer爲SpanningRecordSerializer
   // reset方法將serializer內部的databuffer position重置爲0
   serializer.reset();

   boolean pruneTriggered = false;
    // 獲取目標channel的bufferBuilder
    // bufferBuilder內維護了MemorySegment,即內存片斷
    // Flink的內存管理依賴MemorySegment,可實現堆內堆外內存的管理
    // RecordWriter內有一個bufferBuilder數組,長度和下游channel數目相同
    // 該數組以channel ID爲下標,存儲和channel對應的bufferBuilder
    // 若是對應channel的bufferBuilder還沒有建立,調用requestNewBufferBuilder申請一個新的bufferBuilder  
   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
    // 複製serializer的數據到bufferBuilder中
   SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
    // 循環直到result徹底被寫入到buffer
    // 一條數據可能會被寫入到多個緩存中
    // 若是緩存不夠用,會申請新的緩存
    // 數據徹底寫入完畢之時,當前正在操做的緩存是沒有寫滿的
    // 所以返回true,代表須要壓縮該buffer的空間  
   while (result.isFullBuffer()) {
      finishBufferBuilder(bufferBuilder);

      // If this was a full record, we are done. Not breaking out of the loop at this point
      // will lead to another buffer request before breaking out (that would not be a
      // problem per se, but it can lead to stalls in the pipeline).
      if (result.isFullRecord()) {
         pruneTriggered = true;
         emptyCurrentBufferBuilder(targetChannel);
         break;
      }

      bufferBuilder = requestNewBufferBuilder(targetChannel);
      result = serializer.copyToBufferBuilder(bufferBuilder);
   }
   checkState(!serializer.hasSerializedData(), "All data should be written at once");

   // 若是buffer超時時間爲0,須要flush目標channel的數據
   if (flushAlways) {
      flushTargetPartition(targetChannel);
   }
   return pruneTriggered;
}

0x04 runtime

4.1 Driver

Driver是Flink runtime的一個重要概念,是在一個task中運行的用戶業務邏輯組件,具體實現了批量操做代碼。其內部API包括初始化,清除,運行,取消等邏輯。

public interface Driver<S extends Function, OT> {
   ......
   void setup(TaskContext<S, OT> context);
   void run() throws Exception;
   void cleanup() throws Exception;
   void cancel() throws Exception;
}

具體在 org.apache.flink.runtime.operators 目錄下,咱們可以看到各類Driver的實現,基本的算子都有本身的Driver。

......
CoGroupDriver.java
FlatMapDriver.java
FullOuterJoinDriver.java
GroupReduceCombineDriver.java
GroupReduceDriver.java
JoinDriver.java
LeftOuterJoinDriver.java
MapDriver.java
MapPartitionDriver.java
......

4.2 MapDriver

map算子對應的就是MapDriver。

結合上節咱們知道,上游數據是經過batch方式批量傳入的。因此,在run函數會遍歷輸入,每次取出一個record,而後調用用戶自定義函數function.map對這個record作map操做。

public class MapDriver<IT, OT> implements Driver<MapFunction<IT, OT>, OT> {

   @Override
   public void run() throws Exception {
      final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
      .....
      else {
         IT record = null;
        
         // runtime主動進行循環,這樣致使大量函數調用
         while (this.running && ((record = input.next()) != null)) {
            numRecordsIn.inc();
            output.collect(function.map(record)); // function是用戶函數
         }
      }
   }
}

4.3 MapPartitionDriver

MapPartitionDriver是mapPartition的具體組件。系統會把獲得的批量數據inIter一次性的都傳給用戶自定義函數,由用戶代碼來進行遍歷操做

public class MapPartitionDriver<IT, OT> implements Driver<MapPartitionFunction<IT, OT>, OT> {
   @Override
   public void run() throws Exception {
     
		final MutableObjectIterator<IT> input = new CountingMutableObjectIterator<>(this.taskContext.<IT>getInput(0), numRecordsIn);     
      ......
      } else {
         final NonReusingMutableToRegularIteratorWrapper<IT> inIter = new NonReusingMutableToRegularIteratorWrapper<IT>(input, this.taskContext.<IT>getInputSerializer(0).getSerializer());

         // runtime不參與循環,這樣能夠減小函數調用
         function.mapPartition(inIter, output);
      }
   }
}

4.4 效率區別

咱們可以看到map和mapPartition的input都是MutableObjectIterator input類型, 說明二者的輸入一致。只不過map是在Driver代碼中進行循環,mapPartition在用戶代碼中進行循環。具體mapPartition的 效率提升體如今以下方面 :

  1. 假設一共有60個數據須要轉換,map會在runtime中調用用戶函數60次。
  2. runtime把數據分紅6個partition操做,則mapPartition在runtime中會調用用戶函數6次,在每一個用戶函數中分別循環10次。對於runtime來講,map操做會多出54次用戶函數調用。
  3. 若是用戶業務中須要頻繁建立額外的對象或者外部資源操做,mapPartition的優點更能夠體現。 例如將數據寫入Mysql, 那麼map須要爲每一個元素建立一個數據庫鏈接,而mapPartition爲每一個partition建立一個連接。

假設有上億個數據須要map,這資源佔用和運行速度效率差異會至關大。

0x05 優化和ChainedMapDriver

以前提到了優化,這裏咱們再詳細深刻下如何優化map算子。

Flink有一個關鍵的優化技術稱爲任務鏈,用於(在某些狀況下)減小本地通訊的過載。爲了知足任務鏈的條件,至少兩個以上的operator必須配置爲同一並行度,而且使用本地向前的(local forwad)方式鏈接。任務鏈能夠被認爲是一種管道。

當管道以任務鏈的方式執行時候,Operators的函數被融合成單個任務,並由一個單獨的線程執行。一個function產生的records,經過使用一個簡單的方法調用,被遞交給下一個function。因此這裏在方法之間的records傳遞中,基本沒有序列化以及通訊消耗

針對優化後的Operator Chain,runtime對應的Driver則是ChainedMapDriver。這是經過 MAP(MapDriver.class, ChainedMapDriver.class, PIPELINED, 0), 映射獲得的。

咱們能夠看到,由於是任務鏈,因此每一個record是直接在管道中流淌 ,ChainedMapDriver連循環都省略了,直接map轉換後丟給下游去也

public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> {

   private MapFunction<IT, OT> mapper; // 用戶函數

   @Override
   public void collect(IT record) {
      try {
         this.numRecordsIn.inc();
         this.outputCollector.collect(this.mapper.map(record));
      } catch (Exception ex) {
         throw new ExceptionInChainedStubException(this.taskName, ex);
      }
   }
}

// 這時的調用棧以下
map:23, UserFunc$1 (com.alibaba.alink)
collect:79, ChainedMapDriver (org.apache.flink.runtime.operators.chaining)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
invoke:196, DataSourceTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)

0x06 總結

map和mapPartition實現的基礎是Flink的數據傳輸機制 :Flink確實是每次都處理一個輸入record,可是在上下游之間傳輸時候,Flink仍是把records累積起來作批量傳輸。便可以認爲從數據傳輸模型角度講,Flink是微批次的。

對於數據流轉換,由於是批量傳輸,因此對於積累的records,map是在runtime Driver代碼中進行循環,mapPartition在用戶代碼中進行循環。

map的函數調用次數要遠高於mapPartition。若是在用戶函數中涉及到頻繁建立額外的對象或者外部資源操做,則mapPartition性能遠遠高出。

若是沒有connection之類的操做,則一般性能差異並不大,一般不會成爲瓶頸,也沒有想象的那麼嚴重。

0x07 參考

深刻了解 Flink 網絡棧 ——A Deep-Dive into Flink's Network Stack

Flink架構(二)- Flink中的數據傳輸

Flink 源碼之節點間通訊

相關文章
相關標籤/搜索