上次說到,當MapLauncher或ReduceLancher(用於執行任務的線程,它們擴展自TaskLauncher),從它們所維護的LinkedList也即隊列中獲取到TaskInProgress,而且TaskTracker有空閒的slot時,該線程就調用了TaskTracker的startNewTask(tip)方法,以下所示: spa
public void run() { while (!Thread.interrupted()) { try { TaskInProgress tip; Task task; synchronized (tasksToLaunch) { while (tasksToLaunch.isEmpty()) { tasksToLaunch.wait();//當隊列爲空時唄阻塞,知道有新的tip到來纔會被喚醒 } //get the TIP tip = tasksToLaunch.remove(0); task = tip.getTask(); ......//當有空閒的slot時執行啓動一個任務 startNewTask(tip); ...... } }
接下了來就讓咱們看下startNewTask(tip)的神祕面紗吧,因爲在其內部經過實習Runnable建立了一個線程,咱們只需分析線程體的run方法便可,關鍵代碼以下,爲便於說明,給3個核心語句分別標識爲**1,**2:
線程
public void run() { try { RunningJob rjob = localizeJob(tip); //**1 tip.getTask().setJobFile(rjob.getLocalizedJobConf().toString()); // task本地化已經完成,此刻若是rjob.jobConf或者rjob.ugi爲空的話,會拋出異常
launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob); //**2
......
} }
**1的源碼以下,code
Task t = tip.getTask(); JobID jobId = t.getJobID(); RunningJob rjob = addTaskToJob(jobId, tip); InetSocketAddress ttAddr = getTaskTrackerReportAddress();
從中咱們能夠看出,首先建立了一個該任務所屬的RunningJob,並把它放入到一個該TaskTracker所維護的TreeMap<jobId,RunningJob>中,同時在RunningJob中記錄將要執行的task,也即把tip放入到RunningJob.tasks(一個HashSet<TaskInProgress>)中。由此,咱們能夠知道,每一個TaskTracker都維護者一個TreeMap用以記錄它正在執行的哪一個做業的哪些任務(map、reduce任務)。xml
接下來localizeJob(tip)要作的就是調用initializeJob(t, rjob, ttAddr)初始化工做目錄,並下載相應的job.xml以及job.jar(TaskController負責)文件,TaskController最後調用RunJar.unJar()將包解壓到相應的工做目錄,,至此初始化工做完成,調用launchTaskForJob開始執行Task。blog
**2的核心代碼爲:隊列
protected void launchTaskForJob(TaskInProgress tip, JobConf jobConf,RunningJob rjob) throws IOException { synchronized (tip) { jobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY, localStorage.getDirsString()); tip.setJobConf(jobConf); tip.setUGI(rjob.ugi); tip.launchTask(rjob); } }
由此看出,它主要是調用TaskTracker.TaskInProgress的launchTask()方法,在該方法中它建立了一個TaskRunner線程,並啓這個線程執行這個task,其run方法核心代碼以下:進程
public final void run() {
//設置工做目錄 final File workDir = new File(new Path(localdirs[rand.nextInt(localdirs.length)], TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(), taskid.toString(), t.isTaskCleanupTask())).toString());
......
// 設置環境變量 List<String> classPaths = getClassPaths(conf, workDir,taskDistributedCacheManager); .......
//啓動Task子進程 launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir); } }
未完待續...... ip