其餘更多java基礎文章:
java基礎學習(目錄)java
咱們從client端的入口job.waitForCompletion(true)
開始看:apache
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
configuration.set("hbase.zookeeper.quorum",Hconfiguration.hbase_zookeeper_quorum);
configuration.set("hbase.zookeeper.clientPort","2181");
configuration.set(TableOutputFormat.OUTPUT_TABLE,Hconfiguration.tableName);
configuration.set("dfs.socket.timeout","1800000");
MRDriver myDriver = MRDriver.getInstance();
/*try{
myDriver.creatTable(Hconfiguration.tableName, Hconfiguration.colFamily);
}catch (Exception e){
e.printStackTrace();
}*/
Job job = new Job(configuration,"Map+ReduceImport");
job.setMapperClass(HMapper.class);
job.setReducerClass(HReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.setInputPaths(job, Hconfiguration.mapreduce_inputPath);
job.waitForCompletion(true);
}
複製代碼
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {
if (state == JobState.DEFINE) {
//重點是提交的過程
submit();
}
if (verbose) {
//監控並打印執行過程
monitorAndPrintJob();
} else {
……
}
return isSuccessful();
}
複製代碼
咱們跟進去看,繼續submit()
方法,submit()調用 submitJobInternal()
方法把做業提交到集羣bash
public void submit()throws IOException, InterruptedException,ClassNotFoundException {
ensureState(JobState.DEFINE);
//判斷使用的是 hadoop 1.x 仍是 2.x 的 jar 包
setUseNewAPI();
//鏈接集羣
connect();
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {public JobStatus run() throws IOException,
InterruptedException, ClassNotFoundException {
//把做業提交到集羣
return submitter.submitJobInternal(Job.this, cluster);
}
});
……
}
複製代碼
submitJobInternal()
方法詳解:app
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs
//Checking the input and output specifications of the job. 檢查輸入輸出路徑
checkSpecs(job);
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
//封裝提交的信息
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,
submitHostAddress);
}
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
//得到提交的目錄
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
……
//copy 配置文件
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile =
JobSubmissionFiles.getJobConfPath(submitJobDir);// Create the splits for the job 建立切片
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
//建立切片的方法
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
……
// Write job file to submit dirwriteConf(conf, submitJobFile);
// Now, actually submit the job (using the submit name)
printTokens(jobId, job.getCredentials());
//以前都是提交前的準備, 最終提交做業
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
……
}
複製代碼
writeSplits()
調用 writeNewSplits()
socket
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir)
throws IOException, InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
//根據前面的信息選擇使用 1.x 或者 2.x 的配置
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
複製代碼
咱們繼續跟進看writeNewSplits(job, jobSubmitDir)
方法ide
private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws
IOException,InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
//經過反射獲得 InputFormatClass
InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
……
}
複製代碼
這裏有兩個方法重點跟進一下,一個是getInputFormatClass()
,另外一個是getSplits(job)
。咱們先看一下getInputFormatClass()
方法oop
public Class<? extends InputFormat<?,?>> getInputFormatClass() throws ClassNotFoundException {
return (Class<? extends InputFormat<?,?>>)
//若是用戶設置過 InputFormat,
//job.setInputFormatClass(cls);
//就使用用戶設置的
//不然使用默認的 Textconf.getClass(INPUT_FORMAT_CLASS_ATTR,TextInputFormat.class);
}
複製代碼
而後咱們繼續看getSplits(job)
方法。這個方法很是重要post
public List<InputSplit> getSplits(JobContext job) throws IOException {
Stopwatch sw = new Stopwatch().start();
//在用戶沒有干預的狀況下, 值爲 1
long minSize = Math.max(getFormatMinSplitSize(),getMinSplitSize(job));
/*
protected long getFormatMinSplitSize() {
return 1;
}
public static long getMinSplitSize(JobContext job) {
若是用戶設置了,用用戶設置的值,不然使用1
//FileInputFormat.setMinInputSplitSize(job, size);
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}
*/
long maxSize = getMaxSplitSize(job);
/*
若是用戶設置了, 去用戶的值, 不然去一個無限大的值
public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration().getLong(SPLIT_MAXSIZE,Long.MAX_VALUE);
}
*/
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
//迭代用戶給的目錄下的全部文件,獲得每一個文件的
//BlockLocations
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus)file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
/*
在用戶沒有干預的狀況下
取 maxSize 和 blockSize 的最小值, 默認狀況下爲 blockSize
取 blockSize 和 minSize 的最大值, 最後結果爲 blockSize
protected long computeSplitSize(long blockSize, long minSize,long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
*/
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
//計算切片屬於哪一個 block
int blkIndex = getBlockIndex(blkLocations, lengthbytesRemaining);
/*
protected int getBlockIndex(BlockLocation[] blkLocations,long offset) {
判斷 offset 在 block 塊的偏移量的哪一個範圍
for (int i = 0 ; i < blkLocations.length; i++) {
// is the offset inside this block?
if ((blkLocations[i].getOffset() <= offset) && (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
return i;
}
}
BlockLocation last =blkLocations[blkLocations.length -1];
long fileLength = last.getOffset() + last.getLength() -1;
throw new IllegalArgumentException("Offset " + offset +" is outside of file (0.." + fileLength + ")");
}
*/
splits.add(makeSplit(path,
length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, lengthbytesRemaining);
//建立切片
//切片信息包括文件名,偏移量,大小,位置信息
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
……
}
} else {
//Create empty hosts array for zero length files
……
}
}
……
return splits;
}
複製代碼
總的來講, 客戶端作了如下幾件事:學習
而後, AppMaster 根據 split 列表信息向 ResourceManager 申請資源, RS 建立 container,然 後 AppMaster 啓動 container, 把 MapReducer 任務放進去。ui