TaskTracker獲取並執行map或reduce任務的過程(一)

  咱們知道TaskTracker在默認狀況下,每一個3秒就行JobTracker發送一個心跳包,也就是在這個心跳包中包含對任務的請求。JobTracker返回給TaskTracker的心跳包中包含有各類action(任務),若是有知足在此TaskTracker上執行的任務的話,該任務也就包含在心跳包的響應中。在TaskTracker端有線程專門等待map或reduce任務,並從隊列中取出執行。函數

1. TaskTracker發送心跳包

  TaskTracker是做爲一個單獨的JVM運行的,它啓動之後一直處於offerService()函數中,每隔3秒就執行一次transmitHeartBeat函數,以下所示:ui

HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);

  該函數具體代碼爲:this

  HeartbeatResponse transmitHeartBeat(long now) throws IOException {
  ......
if (status == null) { synchronized (this) { status = new TaskTrackerStatus(taskTrackerName, localHostname, httpPort, cloneAndResetRunningTaskStatuses( sendCounters), failures, maxMapSlots, maxReduceSlots); } } // // 檢查是否能夠接受新的任務 // boolean askForNewTask; long localMinSpaceStart; synchronized (this) { askForNewTask = ((status.countOccupiedMapSlots() < maxMapSlots || status.countOccupiedReduceSlots() < maxReduceSlots) && acceptNewTasks); localMinSpaceStart = minSpaceStart; }
......
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted, justInited, askForNewTask, heartbeatResponseId); ...... return heartbeatResponse; }

  咱們從中能夠看出,TaskTracker首先建立一個TaskTrackerStatus對象,其中包含有TaskTracker的各類信息,好比,map slot的數目,reducer slot槽的數目,TaskTracker所在的主機名等信息。而後,對TaskTracker的空閒的slot以及磁盤空間進行檢查,若是知足相應的條件時,最終就會經過JobClient(爲JobTracker的代理)將心跳信息發送給JobTracker,並獲得JobTracker的響應HeartbeatResponse。以下所示,JobClient是InterTrackerProtocol的一個實例,而JobTracker實現了InterTrackerProtocol這個接口。spa

    this.jobClient = (InterTrackerProtocol) 
    UserGroupInformation.getLoginUser().doAs(
        new PrivilegedExceptionAction<Object>() {
      public Object run() throws IOException {
        return RPC.waitForProxy(InterTrackerProtocol.class,
            InterTrackerProtocol.versionID,
            jobTrackAddr, fConf);
      }
    });

    那麼,TaskTracker怎樣經過JobTracker的代理與JobTracker進行通訊呢?它是經過RPC調用JobTracker的heartbeat(......)方法而實現的。線程

2. TaskTracker端獲取任務

  TaskTracker接收到任務後,會將它們放入到相應的LinkedList中,LinkedList實現了List和Queue接口,它是基於鏈表實現的FIFO的隊列。代理

heartbeatInterval = heartbeatResponse.getHeartbeatInterval();if (actions != null){ 
          for(TaskTrackerAction action: actions) {
            if (action instanceof LaunchTaskAction) {
              addToTaskQueue((LaunchTaskAction)action);
         ......
          }
        }
  ......

  private void addToTaskQueue(LaunchTaskAction action) {
    if (action.getTask().isMapTask()) {
      mapLauncher.addToTaskQueue(action);
    } else {
      reduceLauncher.addToTaskQueue(action);
    }
    }code

 

  TaskTracker啓動的時候,建立了兩個線程:mapLauncher和reduceLauncher,它們分別處理map任務和reduce任務,map任務有mapLauncher負責將其放入到LinkedList中,reduce任務有reducerLauncher負責將其放入到它維護的LinkedList中。orm

  public void addToTaskQueue(LaunchTaskAction action) {
      synchronized (tasksToLaunch) {
        TaskInProgress tip = registerTask(action, this);
        tasksToLaunch.add(tip);
        tasksToLaunch.notifyAll();
      }
    }

  mapLauncher或者是reducerLauncher根據接收到的action,建立對應的TaskTracker.TaskInProgress對象,並放入到隊列中,喚醒等待的線程進行處理。 以下所示,該線程負責從taskToLaunch中獲取task,當有空間的slot時,執行這個task。對象

  synchronized (tasksToLaunch) {
            while (tasksToLaunch.isEmpty()) {
              tasksToLaunch.wait();
            }
            //get the TIP
            tip = tasksToLaunch.remove(0);
            task = tip.getTask();
            LOG.info("Trying to launch : " + tip.getTask().getTaskID() + 
                     " which needs " + task.getNumSlotsRequired() + " slots");
          }
.....
          //獲得空閒的slot後,啓動這個task
          startNewTask(tip);

  這樣,TaskTracker就獲得了待處理的任務,具體如何執行請參考下一篇博客。blog

相關文章
相關標籤/搜索