自從函數式編程和響應式編程逐漸進入到程序員的生活以後,map函數做爲其中一個重要算子也爲你們所熟知,不管是前端web開發,手機開發仍是後端服務器開發,都很難逃過它的手心。而在大數據領域中又每每能夠見到另一個算子mapPartition的身影。在性能調優中,常常會被建議儘可能用 mappartition 操做去替代 map 操做。本文將從Flink源碼和示例入手,爲你們解析爲何mapPartition比map更高效。html
Map的做用是將數據流上每一個元素轉換爲另外的元素,好比data.map { x => x.toInt }
。它把數組流
中的每個值,使用所提供的函數執行一遍,一一對應。獲得與元素個數相同的數組流
。而後返回這個新數據流。前端
MapPartition的做用是單個函數調用並行分區,好比data.mapPartition { in => in map { (_, 1) } }
。該函數將分區做爲「迭代器」,能夠產生任意數量的結果。每一個分區中的元素數量取決於並行度和之前的operations。java
其實,二者完成的業務操做是同樣的,本質上都是將數據流上每一個元素轉換爲另外的元素。程序員
區別主要在兩點。web
從邏輯實現來說,sql
void mapPartition(Iterable<T> values, Collector<O> out)
。其中values是須要映射轉換的全部記錄,out是用來發送結果的collector。具體返回什麼,如何操做out來返回結果,則徹底依賴於業務邏輯。從調用次數來講,數據庫
爲何MapPartition有這麼高效呢,下面咱們將具體論證。apache
首先咱們給出示例代碼,從下文中咱們能夠看出,map就是簡單的轉換,而mapPartition則不但要作轉換,程序員還須要手動操做如何返回結果:編程
public class IteratePi { public static void main(String[] args) throws Exception { final ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment(); //迭代次數 int iterativeNum=10; DataSet<Integer> wordList = env.fromElements(1, 2, 3); IterativeDataSet<Integer> iterativeDataSet=wordList.iterate(iterativeNum); DataSet<Integer> mapResult=iterativeDataSet .map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { value += 1; return value; } }); //迭代結束的條件 DataSet<Integer> result=iterativeDataSet.closeWith(mapResult); result.print(); MapPartitionOperator<Integer, Integer> mapPartitionResult = iterativeDataSet .mapPartition(new MapPartitionFunction<Integer, Integer>() { @Override public void mapPartition(Iterable<Integer> values, Collector<Integer> out) { for (Integer value : values) { // 這裏須要程序員自行決定如何返回,即調用collect操做。 out.collect(value + 2); } } } ); //迭代結束的條件 DataSet<Integer> partitionResult=iterativeDataSet.closeWith(mapPartitionResult); partitionResult.print(); } }
世界上不多有沒有來由的愛,也少見免費的午飯。mapPartition之因此高效,其所依賴的基礎就是Flink的傳輸機制。因此咱們下面就講解下爲何。後端
你們都知道,Spark是用微批處理來模擬流處理,就是說,spark仍是一批一批的傳輸和處理數據,因此咱們就能理解mapPartition的機制就是基於這一批數據作統一處理。這樣確實能夠高效。
可是Flink號稱是純流,即Flink是每來一個輸入record,就進行一次業務處理,而後返回給下游算子。
有的兄弟就會產生疑問:每次都只是處理單個記錄,怎麼可以讓mapPartition作到批次處理呢。其實這就是Flink的微妙之處:即Flink確實是每次都處理一個輸入record,可是在上下游傳輸時候,Flink仍是把records累積起來作批量傳輸的。也能夠這麼理解:從傳輸的角度講,Flink是微批處理的。
Flink 的網絡棧是組成 flink-runtime 模塊的核心組件之一,也是 Flink 做業的核心部分。全部來自 TaskManager 的工做單元(子任務)都經過它來互相鏈接。流式傳輸數據流都要通過網絡棧,因此它對 Flink 做業的性能表現(包括吞吐量和延遲指標)相當重要。與經過 Akka 使用 RPC 的 TaskManager 和 JobManager 之間的協調通道相比,TaskManager 之間的網絡棧依賴的是更底層的,基於 Netty 的 API。
一個運行的application的tasks在持續交換數據。TaskManager負責作數據傳輸。不一樣任務之間的每一個(遠程)網絡鏈接將在 Flink 的網絡棧中得到本身的 TCP 通道。可是若是同一任務的不一樣子任務被安排到了同一個 TaskManager,則它們與同一個 TaskManager 的網絡鏈接將被多路複用,並共享一個 TCP 信道以減小資源佔用。
每一個TaskManager有一組網絡緩衝池(默認每一個buffer是32KB),用於發送與接受數據。如發送端和接收端位於不一樣的TaskManager進程中,則它們須要經過操做系統的網絡棧進行交流。流應用須要以管道的模式進行數據交換,也就是說,每對TaskManager會維持一個永久的TCP鏈接用於作數據交換。在shuffle鏈接模式下(多個sender與多個receiver),每一個sender task須要向每一個receiver task發送數據,此時TaskManager須要爲每一個receiver task都分配一個緩衝區。
一個記錄被建立並傳遞以後(例如經過 Collector.collect()),它會被遞交到RecordWriter,其未來自 Java 對象的記錄序列化爲一個字節序列,後者最終成爲網絡緩存。RecordWriter 首先使用SpanningRecordSerializer將記錄序列化爲一個靈活的堆上字節數組。而後它嘗試將這些字節寫入目標網絡通道的關聯網絡緩存。
由於若是逐個發送會下降每一個記錄的開銷並帶來更高的吞吐量,因此爲了取得高吞吐量,TaskManager的網絡組件首先從緩衝buffer中收集records,而後再發送。也就是說,records並非一個接一個的發送,而是先放入緩衝,而後再以batch的形式發送。這個技術能夠高效使用網絡資源,並達到高吞吐。相似於網絡或磁盤 I/O 協議中使用的緩衝技術。
接收方網絡棧(netty)將接收到的緩存寫入適當的輸入通道。最後(流式)任務的線程從這些隊列中讀取並嘗試在RecordReader的幫助下,經過Deserializer將積累的數據反序列化爲 Java 對象。
若sender與receiver任務都運行在同一個TaskManager進程,則sender任務會將發送的條目作序列化,並存入一個字節緩衝。而後將緩衝放入一個隊列,直到隊列被填滿。
Receiver任務從隊列中獲取緩衝,並反序列化輸入的條目。因此,在同一個TaskManager內,任務之間的數據傳輸並不通過網絡交互。
即在同一個TaskManager進程內,也是批量傳輸。
咱們基於Flink優化的結果進行分析驗證,看看Flink是否是把記錄寫入到buffer中,這種狀況下運行的是CountingCollector和ChainedMapDriver。
copyFromSerializerToTargetChannel:153, RecordWriter (org.apache.flink.runtime.io.network.api.writer) emit:116, RecordWriter (org.apache.flink.runtime.io.network.api.writer) emit:60, ChannelSelectorRecordWriter (org.apache.flink.runtime.io.network.api.writer) collect:65, OutputCollector (org.apache.flink.runtime.operators.shipping) collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics) collect:79, ChainedMapDriver (org.apache.flink.runtime.operators.chaining) collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics) invoke:196, DataSourceTask (org.apache.flink.runtime.operators) doRun:707, Task (org.apache.flink.runtime.taskmanager) run:532, Task (org.apache.flink.runtime.taskmanager) run:748, Thread (java.lang)
當執行完用戶定義的map函數以後,系統運行在 ChainedMapDriver.collect 函數。
public void collect(IT record) { this.outputCollector.collect(this.mapper.map(record));// mapper就是用戶代碼 }
而後調用到了CountingCollector.collect
public void collect(OUT record) { this.collector.collect(record);// record就是用戶轉換後的記錄 }
OutputCollector.collect函數會把記錄發送給全部的writers。
this.delegate.setInstance(record);// 先把record設置到SerializationDelegate中 for (RecordWriter<SerializationDelegate<T>> writer : writers) { // 全部的writer writer.emit(this.delegate); // 發送record }
RecordWriter
負責把數據序列化,而後寫入到緩存中。它有兩個實現類:
BroadcastRecordWriter
: 維護了多個下游channel,發送數據到下游全部的channel中。ChannelSelectorRecordWriter
: 經過channelSelector
對象判斷數據須要發往下游的哪一個channel。咱們用的正是這個RecordWriter
。這裏咱們分析下ChannelSelectorRecordWriter
的emit
方法:
public void emit(T record) throws IOException, InterruptedException { emit(record, channelSelector.selectChannel(record)); }
這裏使用了channelSelector.selectChannel
方法。該方法爲record尋找到對應下游channel id。
public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T>> { public final int selectChannel(SerializationDelegate<T> record) { switch (strategy) { case FORWARD: return forward(); // 咱們代碼用到了這種狀況。這裏 return 0; ...... } } }
接下來咱們又回到了父類RecordWriter.emit
。
protected void emit(T record, int targetChannel) throws IOException, InterruptedException { serializer.serializeRecord(record); // Make sure we don't hold onto the large intermediate serialization buffer for too long if (copyFromSerializerToTargetChannel(targetChannel)) { serializer.prune(); } }
關鍵的邏輯在於copyFromSerializerToTargetChannel
。此方法從序列化器中複製數據到目標channel,咱們能夠看出來,每條記錄都是寫入到buffer中。
protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException { // We should reset the initial position of the intermediate serialization buffer before // copying, so the serialization results can be copied to multiple target buffers. // 此處Serializer爲SpanningRecordSerializer // reset方法將serializer內部的databuffer position重置爲0 serializer.reset(); boolean pruneTriggered = false; // 獲取目標channel的bufferBuilder // bufferBuilder內維護了MemorySegment,即內存片斷 // Flink的內存管理依賴MemorySegment,可實現堆內堆外內存的管理 // RecordWriter內有一個bufferBuilder數組,長度和下游channel數目相同 // 該數組以channel ID爲下標,存儲和channel對應的bufferBuilder // 若是對應channel的bufferBuilder還沒有建立,調用requestNewBufferBuilder申請一個新的bufferBuilder BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); // 複製serializer的數據到bufferBuilder中 SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); // 循環直到result徹底被寫入到buffer // 一條數據可能會被寫入到多個緩存中 // 若是緩存不夠用,會申請新的緩存 // 數據徹底寫入完畢之時,當前正在操做的緩存是沒有寫滿的 // 所以返回true,代表須要壓縮該buffer的空間 while (result.isFullBuffer()) { finishBufferBuilder(bufferBuilder); // If this was a full record, we are done. Not breaking out of the loop at this point // will lead to another buffer request before breaking out (that would not be a // problem per se, but it can lead to stalls in the pipeline). if (result.isFullRecord()) { pruneTriggered = true; emptyCurrentBufferBuilder(targetChannel); break; } bufferBuilder = requestNewBufferBuilder(targetChannel); result = serializer.copyToBufferBuilder(bufferBuilder); } checkState(!serializer.hasSerializedData(), "All data should be written at once"); // 若是buffer超時時間爲0,須要flush目標channel的數據 if (flushAlways) { flushTargetPartition(targetChannel); } return pruneTriggered; }
Driver是Flink runtime的一個重要概念,是在一個task中運行的用戶業務邏輯組件,具體實現了批量操做代碼。其內部API包括初始化,清除,運行,取消等邏輯。
public interface Driver<S extends Function, OT> { ...... void setup(TaskContext<S, OT> context); void run() throws Exception; void cleanup() throws Exception; void cancel() throws Exception; }
具體在 org.apache.flink.runtime.operators 目錄下,咱們可以看到各類Driver的實現,基本的算子都有本身的Driver。
...... CoGroupDriver.java FlatMapDriver.java FullOuterJoinDriver.java GroupReduceCombineDriver.java GroupReduceDriver.java JoinDriver.java LeftOuterJoinDriver.java MapDriver.java MapPartitionDriver.java ......
map算子對應的就是MapDriver。
結合上節咱們知道,上游數據是經過batch方式批量傳入的。因此,在run函數會遍歷輸入,每次取出一個record,而後調用用戶自定義函數function.map對這個record作map操做。
public class MapDriver<IT, OT> implements Driver<MapFunction<IT, OT>, OT> { @Override public void run() throws Exception { final MutableObjectIterator<IT> input = this.taskContext.getInput(0); ..... else { IT record = null; // runtime主動進行循環,這樣致使大量函數調用 while (this.running && ((record = input.next()) != null)) { numRecordsIn.inc(); output.collect(function.map(record)); // function是用戶函數 } } } }
MapPartitionDriver是mapPartition的具體組件。系統會把獲得的批量數據inIter一次性的都傳給用戶自定義函數,由用戶代碼來進行遍歷操做。
public class MapPartitionDriver<IT, OT> implements Driver<MapPartitionFunction<IT, OT>, OT> { @Override public void run() throws Exception { final MutableObjectIterator<IT> input = new CountingMutableObjectIterator<>(this.taskContext.<IT>getInput(0), numRecordsIn); ...... } else { final NonReusingMutableToRegularIteratorWrapper<IT> inIter = new NonReusingMutableToRegularIteratorWrapper<IT>(input, this.taskContext.<IT>getInputSerializer(0).getSerializer()); // runtime不參與循環,這樣能夠減小函數調用 function.mapPartition(inIter, output); } } }
咱們可以看到map和mapPartition的input都是MutableObjectIterator
假設有上億個數據須要map,這資源佔用和運行速度效率差異會至關大。
以前提到了優化,這裏咱們再詳細深刻下如何優化map算子。
Flink有一個關鍵的優化技術稱爲任務鏈,用於(在某些狀況下)減小本地通訊的過載。爲了知足任務鏈的條件,至少兩個以上的operator必須配置爲同一並行度,而且使用本地向前的(local forwad)方式鏈接。任務鏈能夠被認爲是一種管道。
當管道以任務鏈的方式執行時候,Operators的函數被融合成單個任務,並由一個單獨的線程執行。一個function產生的records,經過使用一個簡單的方法調用,被遞交給下一個function。因此這裏在方法之間的records傳遞中,基本沒有序列化以及通訊消耗。
針對優化後的Operator Chain,runtime對應的Driver則是ChainedMapDriver。這是經過 MAP(MapDriver.class, ChainedMapDriver.class, PIPELINED, 0)
, 映射獲得的。
咱們能夠看到,由於是任務鏈,因此每一個record是直接在管道中流淌 ,ChainedMapDriver連循環都省略了,直接map轉換後丟給下游去也。
public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> { private MapFunction<IT, OT> mapper; // 用戶函數 @Override public void collect(IT record) { try { this.numRecordsIn.inc(); this.outputCollector.collect(this.mapper.map(record)); } catch (Exception ex) { throw new ExceptionInChainedStubException(this.taskName, ex); } } } // 這時的調用棧以下 map:23, UserFunc$1 (com.alibaba.alink) collect:79, ChainedMapDriver (org.apache.flink.runtime.operators.chaining) collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics) invoke:196, DataSourceTask (org.apache.flink.runtime.operators) doRun:707, Task (org.apache.flink.runtime.taskmanager) run:532, Task (org.apache.flink.runtime.taskmanager) run:748, Thread (java.lang)
map和mapPartition實現的基礎是Flink的數據傳輸機制 :Flink確實是每次都處理一個輸入record,可是在上下游之間傳輸時候,Flink仍是把records累積起來作批量傳輸。便可以認爲從數據傳輸模型角度講,Flink是微批次的。
對於數據流轉換,由於是批量傳輸,因此對於積累的records,map是在runtime Driver代碼中進行循環,mapPartition在用戶代碼中進行循環。
map的函數調用次數要遠高於mapPartition。若是在用戶函數中涉及到頻繁建立額外的對象或者外部資源操做,則mapPartition性能遠遠高出。
若是沒有connection之類的操做,則一般性能差異並不大,一般不會成爲瓶頸,也沒有想象的那麼嚴重。