【hadoop】20.MapReduce-InputFormat數據切片機制

簡介

經過本章節,您能夠學習到:前端

  1. Job的提交流程
  2. InputFormat數據切片的機制

一、Job提交流程源碼分析

1)job提交流程源碼詳解
waitForCompletion()
submit();
// 1創建鏈接
connect();	
// 1)建立提交job的代理
new Cluster(getConfiguration());
			// (1)判斷是本地yarn仍是遠程
			initialize(jobTrackAddr, conf); 
	// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
	// 1)建立給集羣提交數據的Stag路徑
	Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
	// 2)獲取jobid ,並建立job路徑
	JobID jobId = submitClient.getNewJobID();
	// 3)拷貝jar包到集羣
copyAndConfigureFiles(job, submitJobDir);	
	rUploader.uploadFiles(job, jobSubmitDir);
// 4)計算切片,生成切片規劃文件
writeSplits(job, submitJobDir);
	maps = writeNewSplits(job, jobSubmitDir);
		input.getSplits(job);
// 5)向Stag路徑寫xml配置文件
writeConf(conf, submitJobFile);
	conf.writeXml(out);
// 6)提交job,返回提交狀態
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

注意以上代碼只是大過程的提取,並非連續的某處的代碼。要了解詳細的過程,能夠經過編譯器打斷點了解。java

FileInputFormat

二、InputFomat數據切片機制

2.一、FileInputFormat圖解分析

數據切片分析

紅色劃分是均分方式,這種方式比較低下。apache

而當前採用的是藍色方式,以一個塊爲一個切片。大體流程以下:oop

  1. 找到你數據輸入的目錄。
  2. 開始遍歷處理(規劃切片)目錄下的每個文件
  3. 循環執行4-6步驟,直接遍歷完全部輸入文件。
  4. 遍歷第一個文件test1.file
    • 獲取文件大小fs.sizeOf(ss.txt);
    • 計算切片大小computeSliteSize(Math.max(minSize,Math.max(maxSize,blocksize)))=blocksize;
    • 默認狀況下,切片大小=blocksize
  5. 開始切片,造成第1個切片:test1.file—0:128M;第2個切片test1.file—128:256M 第3個切片test1.file—256M:300M(每次切片時,都要判斷切完剩下的部分是否大於塊的1.1倍,不大於1.1倍就劃分一塊切片)
  6. 將切片信息寫到一個切片規劃文件中。
    • 整個切片的核心過程在getSplit()方法中完成。須要注意的是數據切片只是在邏輯上對輸入數據進行分片,並不會再磁盤上將其切分紅分片進行存儲。InputSplit只記錄了分片的元數據信息,好比起始位置、長度以及所在的節點列表等。
  7. 提交切片規劃文件到yarn上,yarn上的MrAppMaster就能夠根據切片規劃文件計算開啓maptask個數。

block是HDFS上物理上存儲的存儲的數據,切片是對數據邏輯上的劃分。源碼分析

2.二、FileInputFormat中默認的切片機制

經過如下的學習,咱們能夠總結出如下三個結論:學習

  • 切片過程只是簡單地按照文件的內容長度進行切片
  • 切片大小默認等於block大小
  • 切片時不考慮數據集總體,而是逐個針對每個文件單獨切片

舉個例子加入咱們有如下兩個文件this

file1.txt    320M
file2.txt    10M

通過FileInputFormat的切片機制運算後,默認配置下造成的切片信息以下:debug

file1.txt.split1--  0~128
file1.txt.split2--  128~256
file1.txt.split3--  256~320
file2.txt.split1--  0~10M

2.三、FileInputFormat切片大小的參數配置

經過分析源碼org.apache.hadoop.mapreduce.lib.input.FileInputFormat,咱們先來看看他的父類InputFormat代理

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.hadoop.mapreduce;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;

@Public
@Stable
public abstract class InputFormat<K, V> {
    public InputFormat() {
    }

    public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;

    public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}

父類規定了兩個抽象方法getSplits以及RecordReader。code

再來看看FileInputFormat計算分片大小的相關代碼:

public List<InputSplit> getSplits(JobContext job) throws IOException {
        StopWatch sw = (new StopWatch()).start();
        long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);
        List<InputSplit> splits = new ArrayList();
        List<FileStatus> files = this.listStatus(job);
        Iterator var9 = files.iterator();

        while(true) {
            while(true) {
                while(var9.hasNext()) {
                    FileStatus file = (FileStatus)var9.next();
                    Path path = file.getPath();
                    long length = file.getLen();
                    if (length != 0L) {
                        BlockLocation[] blkLocations;
                        if (file instanceof LocatedFileStatus) {
                            blkLocations = ((LocatedFileStatus)file).getBlockLocations();
                        } else {
                            FileSystem fs = path.getFileSystem(job.getConfiguration());
                            blkLocations = fs.getFileBlockLocations(file, 0L, length);
                        }

                        if (this.isSplitable(job, path)) {
                            long blockSize = file.getBlockSize();
                            long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);

                            long bytesRemaining;
                            int blkIndex;
                            for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
                                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
                                splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
                            }

                            if (bytesRemaining != 0L) {
                                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
                                splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
                            }
                        } else {
                            splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));
                        }
                    } else {
                        splits.add(this.makeSplit(path, 0L, length, new String[0]));
                    }
                }

                job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size());
                sw.stop();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
                }

                return splits;
            }
        }
    }

從中咱們能夠了解到,計算分片大小的邏輯爲

// 初始化值
long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
...
// 計算分片大小
 long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
 ...
	protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
        return Math.max(minSize, Math.min(maxSize, blockSize));
    }

...
// minSize默認值爲1L
   protected long getFormatMinSplitSize() {
        return 1L;
    }

也就說,切片主要由這幾個值來運算決定

mapreduce.input.fileinputformat.split.minsize=1 默認值爲1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默認值Long.MAXValue

所以,默認狀況下,切片大小=blocksize。咱們不可貴到,要想修改分片的大小,徹底能夠經過配置文件的mapreduce.input.fileinputformat.split.minsize以及mapreduce.input.fileinputformat.split.maxsize進行配置:

  • mapreduce.input.fileinputformat.split.maxsize(切片最大值):參數若是調得比blocksize小,則會讓切片變小。 mapreduce.input.fileinputformat.split.minsize (切片最小值):參數調的比blockSize大,則可讓切片變得比blocksize還大。

2.四、繼承樹

FileInputFormat有多個底層實現,2.7版本的jdk具備以下的繼承樹

FI的繼承樹

默認狀況下Job任務使用的是

2.五、獲取切片信息API

// 根據文件類型獲取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
// 獲取切片的文件名稱
String name = inputSplit.getPath().getName();

三、CombineTextInputFormat切片機制

默認狀況下TextInputformat對任務的切片機制是按文件規劃切片,無論文件多小,都會是一個單獨的切片,都會交給一個maptask,這樣若是有大量小文件,就會產生大量的maptask,處理效率極其低下。最好的辦法,在數據處理系統的最前端(預處理/採集),將小文件先合併成大文件,再上傳到HDFS作後續分析。

若是已是大量小文件在HDFS中了,可使用另外一種InputFormat來作切片(CombineTextInputFormat),它的切片邏輯跟TextFileInputFormat不一樣:它能夠將多個小文件從邏輯上規劃到一個切片中,這樣,多個小文件就能夠交給一個maptask。

優先知足最小切片大小,不超過最大切片大小

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

舉例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m

若是不設置InputFormat,它默認用的是TextInputFormat.class,所以咱們須要手動指定InputFormat類型,在執行job以前指定:

job.setInputFormatClass(CombineTextInputFormat.class)
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

經過此設置以後,分片會變得更少一些,不會像以前同樣,一個文件造成一個分片(文件太小的狀況尤爲浪費)。

相關文章
相關標籤/搜索