hadoop在存儲有輸入數據(hdfs中的數據)的節點上運行map任務,能夠得到最佳性能,由於他無需使用最寶貴的集羣寬帶資源。java
數據本地化是hadoop數據處理的核心,優點,能夠得到最佳性能。node
何時開始這個數據本地化優點的呢?【-----hadoop版本比價老。2.x以後,有yarn。可是能夠以這篇作參考】數組
1,reduce嗎? 不是,是map任務。一個split切片對應一個map任務的。移動計算比移動數據成本低,因此說移動計算是最好的解決方案。這就實現數據本地化優點。app
2,當時看了一些源碼,也查看了一些博客,其中有篇博客解釋的很清楚【https://blog.csdn.net/dboy1/article/details/6256765】less
//@Override public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException { ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus(); final int numTaskTrackers = clusterStatus.getTaskTrackers(); final int clusterMapCapacity = clusterStatus.getMaxMapTasks(); final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks(); Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue(); // // Get map + reduce counts for the current tracker. // final int trackerMapCapacity = taskTracker.getMaxMapTasks(); final int trackerReduceCapacity = taskTracker.getMaxReduceTasks(); final int trackerRunningMaps = taskTracker.countMapTasks(); final int trackerRunningReduces = taskTracker.countReduceTasks(); // Assigned tasks List<Task> assignedTasks = new ArrayList<Task>(); // // Compute (running + pending) map and reduce task numbers across pool // int remainingReduceLoad = 0; int remainingMapLoad = 0; synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() == JobStatus.RUNNING) { remainingMapLoad += (job.desiredMaps() - job.finishedMaps()); if (job.scheduleReduces()) { remainingReduceLoad += (job.desiredReduces() - job.finishedReduces()); } } } } // Compute the 'load factor' for maps and reduces double mapLoadFactor = 0.0; if (clusterMapCapacity > 0) { mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity; } double reduceLoadFactor = 0.0; if (clusterReduceCapacity > 0) { reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity; } // // In the below steps, we allocate first map tasks (if appropriate), // and then reduce tasks if appropriate. We go through all jobs // in order of job arrival; jobs only get serviced if their // predecessors are serviced, too. // // // We assign tasks to the current taskTracker if the given machine // has a workload that's less than the maximum load of that kind of // task. // However, if the cluster is close to getting loaded i.e. we don't // have enough _padding_ for speculative executions etc., we only // schedule the "highest priority" task i.e. the task from the job // with the highest priority. // final int trackerCurrentMapCapacity = Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), trackerMapCapacity); //這種計算使availableMapSlots<=availableMapSlots int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps; boolean exceededMapPadding = false; if (availableMapSlots > 0) { exceededMapPadding = exceededPadding(true, clusterStatus, trackerMapCapacity); } int numLocalMaps = 0; int numNonLocalMaps = 0; scheduleMaps: for (int i=0; i < availableMapSlots; ++i) { synchronized (jobQueue) { //這裏體現了FIFO 的實現,先進的job 先執行 for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING) { continue; } Task t = null; // Try to schedule a node-local or rack-local Map task t = job.obtainNewLocalMapTask(taskTracker, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { assignedTasks.add(t); ++numLocalMaps; // Don't assign map tasks to the hilt! // Leave some free slots in the cluster for future task-failures, // speculative tasks etc. beyond the highest priority job if (exceededMapPadding) { break scheduleMaps; } // Try all jobs again for the next Map task break; } // Try to schedule a node-local or rack-local Map task t = job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { assignedTasks.add(t); ++numNonLocalMaps; // We assign at most 1 off-switch or speculative task // This is to prevent TaskTrackers from stealing local-tasks // from other TaskTrackers. break scheduleMaps; } } } } int assignedMaps = assignedTasks.size(); // // Same thing, but for reduce tasks // However we _never_ assign more than 1 reduce task per heartbeat //每次心跳分配的reduce 數量不超過1 個 final int trackerCurrentReduceCapacity = Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity), trackerReduceCapacity); //這樣可使availableReduceSlots 不大於集羣的reduce 的比率 final int availableReduceSlots = Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1); boolean exceededReducePadding = false; if (availableReduceSlots > 0) { exceededReducePadding = exceededPadding(false, clusterStatus, trackerReduceCapacity); synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING || job.numReduceTasks == 0) { continue; } Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts() ); if (t != null) { assignedTasks.add(t); //直接break break; } // Don't assign reduce tasks to the hilt! // Leave some free slots in the cluster for future task-failures, // speculative tasks etc. beyond the highest priority job if (exceededReducePadding) { break; } } } } if (LOG.isDebugEnabled()) { LOG.debug("Task assignments for " + taskTracker.getTrackerName() + " --> " + "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " + trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" + (trackerCurrentMapCapacity - trackerRunningMaps) + ", " + assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps + ")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " + trackerCurrentReduceCapacity + "," + trackerRunningReduces + "] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) + ", " + (assignedTasks.size()-assignedMaps) + "]"); } return assignedTasks; }
主要仍是使用JobQueueTaskScheduler設置了一些和集羣有關的常量,獲取jobQueue。jobQueueJobInProgressListener是它對JobTracker註冊的jobInProgressListener。而後獲取還須要加載的map和reduce任務的數量。保存在int remainingReduceLoad 和 int remainingMapLoad 中。以及是否要擴充slot等。ide
int numLocalMaps = 0;oop
int numNonLocalMaps = 0;性能
if (job.getStatus().getRunState() != JobStatus.RUNNING) { continue; }
繼續向下看:this
// Try to schedule a node-local or rack-local Map task t = job.obtainNewLocalMapTask(taskTracker, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());
看到這塊註釋,翻譯過來分配一個本地或者距離較近的一個MapTask.net
經過這個突破口,看看這個obtaionNewLocalMapTask方法, 可是先看一下getNumberOfUniqueHosts()方法,這個方法很簡單,返回向jobtracker請求UniqueHosts的size,咱們暫且不去care它,接着走進obtainNewLocalMapTask。
public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts, int clusterSize, int numUniqueHosts) throws IOException { if (!tasksInited.get()) { LOG.info("Cannot create task split for " + profile.getJobID()); return null; } //maxLevel = 2 , 指的是NetworkTopology.DEFAULT_HOST_LEVEL int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel, status.mapProgress()); if (target == -1) { return null; } Task result = maps[target].getTaskToRun(tts.getTrackerName()); if (result != null) { addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true); } return result; }
咱們發現最重要的是這句
int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel,
status.mapProgress());
private synchronized int findNewMapTask(final TaskTrackerStatus tts, final int clusterSize, final int numUniqueHosts, final int maxCacheLevel, final double avgProgress) { if (numMapTasks == 0) { LOG.info("No maps to schedule for " + profile.getJobID()); return -1; } String taskTracker = tts.getTrackerName(); TaskInProgress tip = null; // // Update the last-known clusterSize // this.clusterSize = clusterSize; if (!shouldRunOnTaskTracker(taskTracker)) { return -1; } // Check to ensure this TaskTracker has enough resources to // run tasks from this job long outSize = resourceEstimator.getEstimatedMapOutputSize(); long availSpace = tts.getResourceStatus().getAvailableSpace(); if(availSpace < outSize) { LOG.warn("No room for map task. Node " + tts.getHost() + " has " + availSpace + " bytes free; but we expect map to take " + outSize); return -1; //see if a different TIP might work better. } // For scheduling a map task, we have two caches and a list (optional) // I) one for non-running task // II) one for running task (this is for handling speculation) // III) a list of TIPs that have empty locations (e.g., dummy splits), // the list is empty if all TIPs have associated locations // First a look up is done on the non-running cache and on a miss, a look // up is done on the running cache. The order for lookup within the cache: // 1. from local node to root [bottom up] // 2. breadth wise for all the parent nodes at max level // We fall to linear scan of the list (III above) if we have misses in the // above caches Node node = jobtracker.getNode(tts.getHost()); // // I) Non-running TIP : // // 1. check from local node to the root [bottom up cache lookup] // i.e if the cache is available and the host has been resolved // (node!=null) if (node != null) { Node key = node; int level = 0; // maxCacheLevel might be greater than this.maxLevel if findNewMapTask is // called to schedule any task (local, rack-local, off-switch or speculative) // tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if findNewMapTask is // (i.e. -1) if findNewMapTask is to only schedule off-switch/speculative // tasks int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel); for (level = 0;level < maxLevelToSchedule; ++level) { List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key); if (cacheForLevel != null) { tip = findTaskFromList(cacheForLevel, tts, numUniqueHosts,level == 0); if (tip != null) { // Add to running cache scheduleMap(tip); // remove the cache if its empty if (cacheForLevel.size() == 0) { nonRunningMapCache.remove(key); } return tip.getIdWithinJob(); } } key = key.getParent(); } // Check if we need to only schedule a local task (node-local/rack-local) if (level == maxCacheLevel) { return -1; } } //2. Search breadth-wise across parents at max level for non-running // TIP if // - cache exists and there is a cache miss // - node information for the tracker is missing (tracker's topology // info not obtained yet) // collection of node at max level in the cache structure Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel(); // get the node parent at max level Node nodeParentAtMaxLevel = (node == null) ? null : JobTracker.getParentNode(node, maxLevel - 1); for (Node parent : nodesAtMaxLevel) { // skip the parent that has already been scanned if (parent == nodeParentAtMaxLevel) { continue; } List<TaskInProgress> cache = nonRunningMapCache.get(parent); if (cache != null) { tip = findTaskFromList(cache, tts, numUniqueHosts, false); if (tip != null) { // Add to the running cache scheduleMap(tip); // remove the cache if empty if (cache.size() == 0) { nonRunningMapCache.remove(parent); } LOG.info("Choosing a non-local task " + tip.getTIPId()); return tip.getIdWithinJob(); } } } // 3. Search non-local tips for a new task tip = findTaskFromList(nonLocalMaps, tts, numUniqueHosts, false); if (tip != null) { // Add to the running list scheduleMap(tip); LOG.info("Choosing a non-local task " + tip.getTIPId()); return tip.getIdWithinJob(); } // // II) Running TIP : // if (hasSpeculativeMaps) { long currentTime = System.currentTimeMillis(); // 1. Check bottom up for speculative tasks from the running cache if (node != null) { Node key = node; for (int level = 0; level < maxLevel; ++level) { Set<TaskInProgress> cacheForLevel = runningMapCache.get(key); if (cacheForLevel != null) { tip = findSpeculativeTask(cacheForLevel, tts, avgProgress, currentTime, level == 0); if (tip != null) { if (cacheForLevel.size() == 0) { runningMapCache.remove(key); } return tip.getIdWithinJob(); } } key = key.getParent(); } } // 2. Check breadth-wise for speculative tasks for (Node parent : nodesAtMaxLevel) { // ignore the parent which is already scanned if (parent == nodeParentAtMaxLevel) { continue; } Set<TaskInProgress> cache = runningMapCache.get(parent); if (cache != null) { tip = findSpeculativeTask(cache, tts, avgProgress, currentTime, false); if (tip != null) { // remove empty cache entries if (cache.size() == 0) { runningMapCache.remove(parent); } LOG.info("Choosing a non-local task " + tip.getTIPId() + " for speculation"); return tip.getIdWithinJob(); } } } // 3. Check non-local tips for speculation tip = findSpeculativeTask(nonLocalRunningMaps, tts, avgProgress, currentTime, false); if (tip != null) { LOG.info("Choosing a non-local task " + tip.getTIPId() + " for speculation"); return tip.getIdWithinJob(); } } return -1; }
findNewMapTask有一個taskTrackerstatus的變量,這個變量是幹嗎的呢?
其實,assginTask是tasktracker每過10秒(默認)向jobtracker發送一次心跳,在這個過程當中,他須要將它的狀態傳遞給jobtracker, 這個變量就是jobtracker須要知道的。好了,下面看看這個方法的具體實現。
獲取tip檢查是否能夠跑或者高效的跑,不然返回-1。而後它會從最近的level中尋找task,通常順序爲local -> rack-local -> off-switch ->speculative,閱讀以後會發現 map任務的獲取是和提交任務時的Node相關的。所以,下面還須要閱讀提交做業的相關代碼。
至此,本地化過程的難點轉移到了提交任務那面了。
下面介於篇幅關係,我就不貼代碼了,直接口述。
在提交做業的時候,會創建一個新的JobInProgress,這段很容易找到再也不多說,而後調度器會有一個線程去調用JobInProgress的initTasks方法,這個方法會去調用readSplitFile,這個SplitFile是怎麼生成的等到下篇在說。總之是一個分割任務的文件,而後會創建RawSplits數組調用crerateCache方法來創建nonRunningMapCache,首先將Splits[i].getlocations爲0的加入到nonLocationMaps。而後創建新的Node。它須要調用jobTracker的resolveAndAddToTopology方法,此方法調用dnsToSwitchMapping.resolve方法。dnsTo..由jobTrakcer在初始化的時候由反響映射創建了ScriptBasedMapping ,他的resolve方法首先會調用runResolveCommand方法,這個方法負責運行一個腳本,是從"topology.script.file.name"屬性中獲得的。這個腳本用來返回一個IP地址對應的機架信息,這個徹底由用戶本身手寫,因此通常這個機架感知功能是默認不用的。而後用返回信息的第一個參數做爲參數,傳入NodeBase.normalize。這裏沒腳本的話,返回的是NetworkTopology.DEFAULT_RACK,最後調用addHostToNodeMapping添加host,node到hostnameToNodeMap,再返回到createCache方法,它獲得了機架信息之後,遞歸的將TaskInProgress添加到node中,到此,map tasks就初始化完成了,而後jobtrakcer.initJob將創建cleanip和setup的map和reduce任務的TIP,而後返回。
後面的敘述不知道你們可否看懂,總結一下就是在做業初始化的時候,文件信息和位置信息已經記錄下來,在調度做業的時候,直接給tasktracker最近的分片就好了,