TaskTracker執行map或reduce任務的過程(二)

  上次說到,當MapLauncher或ReduceLancher(用於執行任務的線程,它們擴展自TaskLauncher),從它們所維護的LinkedList也即隊列中獲取到TaskInProgress,而且TaskTracker有空閒的slot時,該線程就調用了TaskTracker的startNewTask(tip)方法,以下所示: spa

 public void run() {
      while (!Thread.interrupted()) {
        try {
          TaskInProgress tip;
          Task task;
          synchronized (tasksToLaunch) {
            while (tasksToLaunch.isEmpty()) {
              tasksToLaunch.wait();//當隊列爲空時唄阻塞,知道有新的tip到來纔會被喚醒
            }
            //get the TIP
            tip = tasksToLaunch.remove(0);
            task = tip.getTask();
      ......//當有空閒的slot時執行啓動一個任務
          startNewTask(tip);
      ......
      }
    }

  接下了來就讓咱們看下startNewTask(tip)的神祕面紗吧,因爲在其內部經過實習Runnable建立了一個線程,咱們只需分析線程體的run方法便可,關鍵代碼以下,爲便於說明,給3個核心語句分別標識爲**1,**2:
線程

public void run() {
        try {
          RunningJob rjob = localizeJob(tip);        //**1
          tip.getTask().setJobFile(rjob.getLocalizedJobConf().toString()); 
          // task本地化已經完成,此刻若是rjob.jobConf或者rjob.ugi爲空的話,會拋出異常
      launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob); //**2
......

} }

  **1的源碼以下,code

    Task t = tip.getTask();
    JobID jobId = t.getJobID();
    RunningJob rjob = addTaskToJob(jobId, tip);
    InetSocketAddress ttAddr = getTaskTrackerReportAddress();

  從中咱們能夠看出,首先建立了一個該任務所屬的RunningJob,並把它放入到一個該TaskTracker所維護的TreeMap<jobId,RunningJob>中,同時在RunningJob中記錄將要執行的task,也即把tip放入到RunningJob.tasks(一個HashSet<TaskInProgress>)中。由此,咱們能夠知道,每一個TaskTracker都維護者一個TreeMap用以記錄它正在執行的哪一個做業的哪些任務(map、reduce任務)。xml

  接下來localizeJob(tip)要作的就是調用initializeJob(t, rjob, ttAddr)初始化工做目錄,並下載相應的job.xml以及job.jar(TaskController負責)文件,TaskController最後調用RunJar.unJar()將包解壓到相應的工做目錄,,至此初始化工做完成,調用launchTaskForJob開始執行Task。blog

  **2的核心代碼爲:隊列

 protected void launchTaskForJob(TaskInProgress tip, JobConf jobConf,RunningJob rjob) throws IOException {
    synchronized (tip) {
      jobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
                  localStorage.getDirsString());
      tip.setJobConf(jobConf);
      tip.setUGI(rjob.ugi);
      tip.launchTask(rjob);
    }
  }

  由此看出,它主要是調用TaskTracker.TaskInProgress的launchTask()方法,在該方法中它建立了一個TaskRunner線程,並啓這個線程執行這個task,其run方法核心代碼以下:進程

public final void run() {
    //設置工做目錄
final File workDir = new File(new Path(localdirs[rand.nextInt(localdirs.length)], TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(), taskid.toString(), t.isTaskCleanupTask())).toString());
......

// 設置環境變量 List<String> classPaths = getClassPaths(conf, workDir,taskDistributedCacheManager); .......

    //啓動Task子進程 launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir); } }

  未完待續...... ip

相關文章
相關標籤/搜索