Hadoop學習——Client源碼分析

其餘更多java基礎文章:
java基礎學習(目錄)java


咱們從client端的入口job.waitForCompletion(true)開始看:apache

public static void main(String[] args) throws Exception{
        Configuration configuration = new Configuration();
        configuration.set("hbase.zookeeper.quorum",Hconfiguration.hbase_zookeeper_quorum);
        configuration.set("hbase.zookeeper.clientPort","2181");
        configuration.set(TableOutputFormat.OUTPUT_TABLE,Hconfiguration.tableName);
        configuration.set("dfs.socket.timeout","1800000");

        MRDriver myDriver = MRDriver.getInstance();

        /*try{
            myDriver.creatTable(Hconfiguration.tableName, Hconfiguration.colFamily);
        }catch (Exception e){
            e.printStackTrace();
        }*/

        Job job = new Job(configuration,"Map+ReduceImport");
        job.setMapperClass(HMapper.class);
        job.setReducerClass(HReducer.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TableOutputFormat.class);

        FileInputFormat.setInputPaths(job, Hconfiguration.mapreduce_inputPath);
        job.waitForCompletion(true);
    }
複製代碼
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {
    if (state == JobState.DEFINE) {
    //重點是提交的過程
    submit();
}
    if (verbose) {
    //監控並打印執行過程
        monitorAndPrintJob();
    } else {
        ……
    }
    return isSuccessful();
}
複製代碼

咱們跟進去看,繼續submit()方法,submit()調用 submitJobInternal()方法把做業提交到集羣bash

public void submit()throws IOException, InterruptedException,ClassNotFoundException {
    ensureState(JobState.DEFINE);
    //判斷使用的是 hadoop 1.x 仍是 2.x 的 jar 包
    setUseNewAPI();
    //鏈接集羣
    connect();
    final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {public JobStatus run() throws IOException,
        InterruptedException, ClassNotFoundException {
            //把做業提交到集羣
            return submitter.submitJobInternal(Job.this, cluster);
        }
        });
    ……
}
複製代碼

submitJobInternal()方法詳解:app

JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
    //validate the jobs output specs
    //Checking the input and output specifications of the job. 檢查輸入輸出路徑
    checkSpecs(job);
    Configuration conf = job.getConfiguration();
    addMRFrameworkToDistributedCache(conf);
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
        //封裝提交的信息
        submitHostAddress = ip.getHostAddress();
        submitHostName = ip.getHostName();
        conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
        conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,
        submitHostAddress);
    }
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    //得到提交的目錄
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    ……
    //copy 配置文件
    copyAndConfigureFiles(job, submitJobDir);
    Path submitJobFile =
    JobSubmissionFiles.getJobConfPath(submitJobDir);// Create the splits for the job 建立切片
    LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
    //建立切片的方法
    int maps = writeSplits(job, submitJobDir);
    
    conf.setInt(MRJobConfig.NUM_MAPS, maps);
    LOG.info("number of splits:" + maps);
    ……
    // Write job file to submit dirwriteConf(conf, submitJobFile);
    // Now, actually submit the job (using the submit name)
    printTokens(jobId, job.getCredentials());
    //以前都是提交前的準備, 最終提交做業
    status = submitClient.submitJob(
    jobId, submitJobDir.toString(), job.getCredentials());
    ……
}
複製代碼

writeSplits()調用 writeNewSplits()socket

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir)
    throws IOException, InterruptedException, ClassNotFoundException {
        JobConf jConf = (JobConf)job.getConfiguration();
        int maps;
        //根據前面的信息選擇使用 1.x 或者 2.x 的配置
        if (jConf.getUseNewMapper()) {
            maps = writeNewSplits(job, jobSubmitDir);
        } else {
            maps = writeOldSplits(jConf, jobSubmitDir);
        }
        return maps;
    }
複製代碼

咱們繼續跟進看writeNewSplits(job, jobSubmitDir)方法ide

private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws
    IOException,InterruptedException, ClassNotFoundException {
        Configuration conf = job.getConfiguration();
        //經過反射獲得 InputFormatClass
        InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
        List<InputSplit> splits = input.getSplits(job);
        ……
    }
複製代碼

這裏有兩個方法重點跟進一下,一個是getInputFormatClass(),另外一個是getSplits(job)。咱們先看一下getInputFormatClass()方法oop

public Class<? extends InputFormat<?,?>> getInputFormatClass() throws ClassNotFoundException {
    return (Class<? extends InputFormat<?,?>>)
    //若是用戶設置過 InputFormat,
    //job.setInputFormatClass(cls);
    //就使用用戶設置的
    //不然使用默認的 Textconf.getClass(INPUT_FORMAT_CLASS_ATTR,TextInputFormat.class);
}
複製代碼

而後咱們繼續看getSplits(job)方法。這個方法很是重要post

public List<InputSplit> getSplits(JobContext job) throws IOException {
    Stopwatch sw = new Stopwatch().start();
    //在用戶沒有干預的狀況下, 值爲 1
    long minSize = Math.max(getFormatMinSplitSize(),getMinSplitSize(job));
    /*
    protected long getFormatMinSplitSize() {
        return 1;
    }
    public static long getMinSplitSize(JobContext job) {
        若是用戶設置了,用用戶設置的值,不然使用1
        //FileInputFormat.setMinInputSplitSize(job, size);
        return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
    }
    */
    long maxSize = getMaxSplitSize(job);
    /*
    若是用戶設置了, 去用戶的值, 不然去一個無限大的值
    public static long getMaxSplitSize(JobContext context) {
        return context.getConfiguration().getLong(SPLIT_MAXSIZE,Long.MAX_VALUE);
    }
    */
    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);
    //迭代用戶給的目錄下的全部文件,獲得每一個文件的
    //BlockLocations
    for (FileStatus file: files) {
        Path path = file.getPath();
        long length = file.getLen();
        if (length != 0) {
            BlockLocation[] blkLocations;
            if (file instanceof LocatedFileStatus) {
                    blkLocations = ((LocatedFileStatus)file).getBlockLocations();
                } else {
                    FileSystem fs = path.getFileSystem(job.getConfiguration());
                    blkLocations = fs.getFileBlockLocations(file, 0, length);
                }
            if (isSplitable(job, path)) {
                long blockSize = file.getBlockSize();
                long splitSize = computeSplitSize(blockSize, minSize, maxSize);
        /*
        在用戶沒有干預的狀況下
        取 maxSize 和 blockSize 的最小值, 默認狀況下爲 blockSize
        取 blockSize 和 minSize 的最大值, 最後結果爲 blockSize
        protected long computeSplitSize(long blockSize, long minSize,long maxSize) {
        return  Math.max(minSize, Math.min(maxSize, blockSize));
        }
        */
            long bytesRemaining = length;
            while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                //計算切片屬於哪一個 block
                int blkIndex = getBlockIndex(blkLocations, lengthbytesRemaining);
        /*
        protected int getBlockIndex(BlockLocation[] blkLocations,long offset) {
        判斷 offset 在 block 塊的偏移量的哪一個範圍
            for (int i = 0 ; i < blkLocations.length; i++) {
            // is the offset inside this block?
                if ((blkLocations[i].getOffset() <= offset) && (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ 
                    return i;
                }
            }
            BlockLocation last =blkLocations[blkLocations.length -1];
            long fileLength = last.getOffset() + last.getLength() -1;
            throw new IllegalArgumentException("Offset " + offset +" is outside of file (0.." + fileLength + ")");
        }
        */
                splits.add(makeSplit(path,
                length-bytesRemaining, splitSize,
                blkLocations[blkIndex].getHosts(),
                blkLocations[blkIndex].getCachedHosts()));
                bytesRemaining -= splitSize;
            }
            if (bytesRemaining != 0) {
                int blkIndex = getBlockIndex(blkLocations, lengthbytesRemaining);
                //建立切片
                //切片信息包括文件名,偏移量,大小,位置信息
                splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                    blkLocations[blkIndex].getHosts(),
                    blkLocations[blkIndex].getCachedHosts()));
            }
            } else { // not splitable
                ……
            }
        } else {
        //Create empty hosts array for zero length files
        ……
        }
    }
    ……
    return splits;  
}
複製代碼

總的來講, 客戶端作了如下幾件事:學習

  1. 配置完善
  2. 檢查路徑
  3. 計算 split: maps
  4. 資源提交到 HDFS
  5. 提交任務

而後, AppMaster 根據 split 列表信息向 ResourceManager 申請資源, RS 建立 container,然 後 AppMaster 啓動 container, 把 MapReducer 任務放進去。ui

總結圖

MAPTASK 內存默認 1G(調優能夠更改)
public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
public static final int DEFAULT_MAP_MEMORY_MB = 1024;
Reduce 內存默認 1G,這個默認數值過小,應該調整 public static final String REDUCE_MEMORY_MB ="mapreduce.reduce.memory.mb"; public static final int DEFAULT_REDUCE_MEMORY_MB = 1024;
相關文章
相關標籤/搜索