剖析Spark數據分區之Hadoop分片

本文來自OPPO互聯網技術團隊,是《剖析Spark數據分區》系列文章的第一篇,將重點分析Hadoop分片。網絡

轉載請註名做者,同時歡迎關注OPPO互聯網技術團隊的公衆號:OPPO_tech,一同分享OPPO前沿互聯網技術及活動。併發

數據分析師一般會發出這樣的疑問?個人任務以前一般半小時就能得出結果,如今要3個小時才能出結果。app

爲何變慢了?個人SQL沒有變動,是否是集羣出問題了?運維

針對這種問題,大數據運維工程師一般會說,數據量不同,在資源相同的條件下,數據量多的任務確定要比數據量少的任務執行時間長,能夠適當把資源調整一下。jvm

Spark任務經常使用參數:ide

那麼咱們有沒有其餘方法能讓計算任務在數據量很大的狀況下也能輕鬆應對,執行時間不會有明顯的提升?oop

答案就是合理分區即Partition,合理分區可以讓任務的task數量隨着數據量的增加而增大,提升任務的併發度。大數據

下面OPPO互聯網技術團隊的工程師們將從源碼層面解決解答數據分析師們的疑惑。優化

在這裏,咱們結合Hadoop, Spark, SparkStreaming + Kafka, Tispark源碼詳細的分析一下出現這種問題的緣由。spa

本篇文章屬於一個系列的一部分,該系列共分3篇文章,歡迎持續關注。

  • 第一篇:主要分析Hadoop中的分片;
  • 第二篇:主要分析Spark RDD的分區;
  • 第三篇:主要分析Spark Streaming,TiSpark中的數據分區;

01 核心原理分析

咱們先來看一下上述幾個組件在整個Hadoop生態體系中的位置。

Yarn做爲整個大數據集羣的核心,也是任務調度,計算資源分配的核心,接下來了解一下Container。

Yarn Container

Container是YARN中的資源抽象,它封裝了某個節點上的多維度資源,如內存、CPU、磁盤、網絡等。當AM向RM申請資源時,RM爲AM返回的資源即是用Container表示的。

YARN會爲每一個任務分配Container,且該任務只能使用該Container中描述的資源。與MRv1中的slot不一樣的是,它是一個動態資源劃分單位,是根據應用程序的需求動態生成的。

實際上在spark任務中一個executor至關於一個運行中的container。

一個excution container就是一系列cpu資源,內存資源的JVM進程,task做爲excution container的jvm進程中的一個線程執行,一個JVM進程能夠有多個線程,一個excution container中能夠運行多個task,而partition又決定了task數量。

咱們將從Hdfs中的數據分片開始,逐步分析Partition。

Hadoop InputSplit

首先咱們分析一下在HDFS中,數據的組織形式。

HDFS是以固定大小的Block爲基本單位存儲數據,Block塊是以Block size進行切割,當前從2.7.3版本開始Block size的默認大小爲128M,以前版本的默認值是64M, 能夠經過修改hdfs-site.xml文件中的dfs.blocksize對應的值。

值得注意的是:在修改HDFS的數據塊大小時,首先須要停掉集羣hadoop的運行進程,修改完畢後從新啓動。

假設邏輯記錄大小分別是 100MB,100MB, 100MB。

那麼第一條記錄能夠徹底在一個塊中,可是第二條記錄不能徹底在一個塊中,第二條記錄將同時出如今兩個塊中,從塊1開始,溢出到塊2中。

若是每一個Map任務處理特定數據塊中的全部記錄,那怎麼處理這種跨越邊界的記錄呢?

在這種狀況下Mapper不能處理第二條記錄,由於塊1中沒有完整的第二條記錄,HDFS內部並不清楚一個記錄何時可能溢出到另外一個塊。(Because HDFS has no conception of what’s inside the file blocks, it can’t gauge when a record might spill over into another block.)

InputSplit就是解決這種跨越邊界記錄的問題的。

InputSplit是一個邏輯概念,並無對實際文件進行切分,它只是包含一些元數據信息,好比數據的起始位置,數據的長度,數據所在的節點等。然而InputSplit所做出的切分結果將會直接影響任務的併發度。

當Mapper嘗試讀取數據時,它清楚的知道從何處開始讀取以及在哪裏中止讀取。InputSplit的開始位置能夠在一個塊中開始,在另外一個塊中結束。

InputSplit表明了邏輯記錄邊界,在MapReduce執行期間,Hadoop掃描塊並建立InputSplits,並將每一個InputSplit分配給一個Mapper處理,能夠得出,一個InputSplit對應一個MapTask。

MapReduce

Map經過 RecordReader 讀取Input的key/value對,Map根據用戶自定義的任務,運行完畢後,產生另一系列 key/value,並將其寫入到Hadoop的內存緩衝取中,在內存緩衝區中的key/value對按key排序,此時會按照Reduce partition進行,分到不一樣partition中。

Reduce以 key 及對應的 value 列表做爲輸入,按照用戶本身的程序邏輯,經合併 key 相同的 value 值後,產生另一系列 key/value 對做爲最終輸出寫入 HDFS。

Hadoop Partition

在MapReduce任務中,Partitioner 的做用是對 Mapper 產生的中間結果進行分片,以便將同一分組的數據交給同一個 Reducer 處理。

在MapReduce任務中默認的Partitioner是HashPartitioner,根據這個Partitioner將數據分發到不一樣的Reducer中。

HashPartitioner使用hash方法(好比常見的:hash(key) mod R)進行分區,hash方法可以產生很是平衡的分區。

那麼在MR任務中MapTask數量是怎麼決定的呢?

因爲一個Split對應一個map task,咱們來分析一下FileInputFormat類getInputSplit切片邏輯。

經過分析源碼,在FileInputFormat中,計算切片大小的邏輯:

Math.max(minSize, Math.min(maxSize, blockSize));

切片主要由這幾個值來運算決定

minsize:

默認值:1

配置參數:

mapreduce.input.fileinputformat.split.minsize;

maxsize:

默認值:Long.MAXValue

配置參數:

mapreduce.input.fileinputformat.split.maxsize;

blocksize;

所以,默認狀況下,切片大小=blocksize

maxsize(切片最大值)

參數若是調得比blocksize小,則會讓切片變小,並且就等於配置的這個參數的值;

minsize (切片最小值)

參數調的比blockSize大,則可讓切片變得比blocksize還大;

經過上述分析可知,能夠經過調整

mapreduce.input.fileinputformat.split.minsize&

mapreduce.input.fileinputformat.split.maxsize

的大小來調整MapTask的數量。

02 結語

Hadoop做爲當前主流大數據的基石,HDFS一般做爲Spark任務的數據來源,要想深入的理解Spark中的數據分區必須理解HDFS上的數據分片。

做爲大數據工程師,理解原理,理解調優參數背後的邏輯並加以應用將使咱們的任務跑的更快,使咱們的大數據集羣在相同的計算能力前提下可以運行更多的任務。

下一篇文章中,OPPO互聯網技術團隊將從源碼層面重點分析Spark RDD的 Partition的實現原理與優化策略,敬請期待。

相關文章
相關標籤/搜索