咱們知道TaskTracker在默認狀況下,每一個3秒就行JobTracker發送一個心跳包,也就是在這個心跳包中包含對任務的請求。JobTracker返回給TaskTracker的心跳包中包含有各類action(任務),若是有知足在此TaskTracker上執行的任務的話,該任務也就包含在心跳包的響應中。在TaskTracker端有線程專門等待map或reduce任務,並從隊列中取出執行。函數
TaskTracker是做爲一個單獨的JVM運行的,它啓動之後一直處於offerService()函數中,每隔3秒就執行一次transmitHeartBeat函數,以下所示:ui
HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
該函數具體代碼爲:this
HeartbeatResponse transmitHeartBeat(long now) throws IOException {
......
if (status == null) { synchronized (this) { status = new TaskTrackerStatus(taskTrackerName, localHostname, httpPort, cloneAndResetRunningTaskStatuses( sendCounters), failures, maxMapSlots, maxReduceSlots); } } // // 檢查是否能夠接受新的任務 // boolean askForNewTask; long localMinSpaceStart; synchronized (this) { askForNewTask = ((status.countOccupiedMapSlots() < maxMapSlots || status.countOccupiedReduceSlots() < maxReduceSlots) && acceptNewTasks); localMinSpaceStart = minSpaceStart; }
......
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted, justInited, askForNewTask, heartbeatResponseId); ...... return heartbeatResponse; }
咱們從中能夠看出,TaskTracker首先建立一個TaskTrackerStatus對象,其中包含有TaskTracker的各類信息,好比,map slot的數目,reducer slot槽的數目,TaskTracker所在的主機名等信息。而後,對TaskTracker的空閒的slot以及磁盤空間進行檢查,若是知足相應的條件時,最終就會經過JobClient(爲JobTracker的代理)將心跳信息發送給JobTracker,並獲得JobTracker的響應HeartbeatResponse。以下所示,JobClient是InterTrackerProtocol的一個實例,而JobTracker實現了InterTrackerProtocol這個接口。spa
this.jobClient = (InterTrackerProtocol) UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<Object>() { public Object run() throws IOException { return RPC.waitForProxy(InterTrackerProtocol.class, InterTrackerProtocol.versionID, jobTrackAddr, fConf); } });
那麼,TaskTracker怎樣經過JobTracker的代理與JobTracker進行通訊呢?它是經過RPC調用JobTracker的heartbeat(......)方法而實現的。線程
TaskTracker接收到任務後,會將它們放入到相應的LinkedList中,LinkedList實現了List和Queue接口,它是基於鏈表實現的FIFO的隊列。代理
heartbeatInterval = heartbeatResponse.getHeartbeatInterval();if (actions != null){ for(TaskTrackerAction action: actions) { if (action instanceof LaunchTaskAction) { addToTaskQueue((LaunchTaskAction)action); ...... } }
......
private void addToTaskQueue(LaunchTaskAction action) {
if (action.getTask().isMapTask()) {
mapLauncher.addToTaskQueue(action);
} else {
reduceLauncher.addToTaskQueue(action);
}
}code
TaskTracker啓動的時候,建立了兩個線程:mapLauncher和reduceLauncher,它們分別處理map任務和reduce任務,map任務有mapLauncher負責將其放入到LinkedList中,reduce任務有reducerLauncher負責將其放入到它維護的LinkedList中。orm
public void addToTaskQueue(LaunchTaskAction action) { synchronized (tasksToLaunch) { TaskInProgress tip = registerTask(action, this); tasksToLaunch.add(tip); tasksToLaunch.notifyAll(); } }
mapLauncher或者是reducerLauncher根據接收到的action,建立對應的TaskTracker.TaskInProgress對象,並放入到隊列中,喚醒等待的線程進行處理。 以下所示,該線程負責從taskToLaunch中獲取task,當有空間的slot時,執行這個task。對象
synchronized (tasksToLaunch) { while (tasksToLaunch.isEmpty()) { tasksToLaunch.wait(); } //get the TIP tip = tasksToLaunch.remove(0); task = tip.getTask(); LOG.info("Trying to launch : " + tip.getTask().getTaskID() + " which needs " + task.getNumSlotsRequired() + " slots"); } ..... //獲得空閒的slot後,啓動這個task startNewTask(tip);
這樣,TaskTracker就獲得了待處理的任務,具體如何執行請參考下一篇博客。blog