hadoop_數據本地化優點

hadoop在存儲有輸入數據(hdfs中的數據)的節點上運行map任務,能夠得到最佳性能,由於他無需使用最寶貴的集羣寬帶資源。java

數據本地化是hadoop數據處理的核心,優點,能夠得到最佳性能。node

何時開始這個數據本地化優點的呢?【-----hadoop版本比價老。2.x以後,有yarn。可是能夠以這篇作參考】數組

1,reduce嗎? 不是,是map任務。一個split切片對應一個map任務的。移動計算比移動數據成本低,因此說移動計算是最好的解決方案。這就實現數據本地化優點。app

2,當時看了一些源碼,也查看了一些博客,其中有篇博客解釋的很清楚【https://blog.csdn.net/dboy1/article/details/6256765】less

//@Override
  public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
      throws IOException {
    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
    final int numTaskTrackers = clusterStatus.getTaskTrackers();
    final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
    final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();
    Collection<JobInProgress> jobQueue =
      jobQueueJobInProgressListener.getJobQueue();
    //
    // Get map + reduce counts for the current tracker.
    //
    final int trackerMapCapacity = taskTracker.getMaxMapTasks();
    final int trackerReduceCapacity = taskTracker.getMaxReduceTasks();
    final int trackerRunningMaps = taskTracker.countMapTasks();
    final int trackerRunningReduces = taskTracker.countReduceTasks();
    // Assigned tasks
    List<Task> assignedTasks = new ArrayList<Task>();
    //
    // Compute (running + pending) map and reduce task numbers across pool
    //
    int remainingReduceLoad = 0;
    int remainingMapLoad = 0;
    synchronized (jobQueue) {
      for (JobInProgress job : jobQueue) {
        if (job.getStatus().getRunState() == JobStatus.RUNNING) {
          remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
          if (job.scheduleReduces()) {
            remainingReduceLoad += 
              (job.desiredReduces() - job.finishedReduces());
          }
        }
      }
    }
    // Compute the 'load factor' for maps and reduces
    double mapLoadFactor = 0.0;
    if (clusterMapCapacity > 0) {
      mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
    }
    double reduceLoadFactor = 0.0;
    if (clusterReduceCapacity > 0) {
      reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
    }
        
    //
    // In the below steps, we allocate first map tasks (if appropriate),
    // and then reduce tasks if appropriate.  We go through all jobs
    // in order of job arrival; jobs only get serviced if their 
    // predecessors are serviced, too.
    //
    //
    // We assign tasks to the current taskTracker if the given machine 
    // has a workload that's less than the maximum load of that kind of
    // task.
    // However, if the cluster is close to getting loaded i.e. we don't
    // have enough _padding_ for speculative executions etc., we only 
    // schedule the "highest priority" task i.e. the task from the job 
    // with the highest priority.
    //
    
    final int trackerCurrentMapCapacity = 
      Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), 
                              trackerMapCapacity);
	//這種計算使availableMapSlots<=availableMapSlots
    int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;
    boolean exceededMapPadding = false;
    if (availableMapSlots > 0) {
      exceededMapPadding = 
        exceededPadding(true, clusterStatus, trackerMapCapacity);
    }
    
    int numLocalMaps = 0;
    int numNonLocalMaps = 0;
    scheduleMaps:
    for (int i=0; i < availableMapSlots; ++i) {
      synchronized (jobQueue) {
		//這裏體現了FIFO 的實現,先進的job 先執行
        for (JobInProgress job : jobQueue) {
          if (job.getStatus().getRunState() != JobStatus.RUNNING) {
            continue;
          }
          Task t = null;
          
          // Try to schedule a node-local or rack-local Map task
          t = 
            job.obtainNewLocalMapTask(taskTracker, numTaskTrackers,
                                      taskTrackerManager.getNumberOfUniqueHosts());
          if (t != null) {
            assignedTasks.add(t);
            ++numLocalMaps;
            
            // Don't assign map tasks to the hilt!
            // Leave some free slots in the cluster for future task-failures,
            // speculative tasks etc. beyond the highest priority job
            if (exceededMapPadding) {
              break scheduleMaps;
            }
           
            // Try all jobs again for the next Map task 
            break;
          }
          
          // Try to schedule a node-local or rack-local Map task
          t = 
            job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers,
                                   taskTrackerManager.getNumberOfUniqueHosts());
          
          if (t != null) {
            assignedTasks.add(t);
            ++numNonLocalMaps;
            
            // We assign at most 1 off-switch or speculative task
            // This is to prevent TaskTrackers from stealing local-tasks
            // from other TaskTrackers.
            break scheduleMaps;
          }
        }
      }
    }
    int assignedMaps = assignedTasks.size();
    //
    // Same thing, but for reduce tasks
    // However we _never_ assign more than 1 reduce task per heartbeat
    //每次心跳分配的reduce 數量不超過1 個
    final int trackerCurrentReduceCapacity = 
      Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity), 
               trackerReduceCapacity);
	//這樣可使availableReduceSlots 不大於集羣的reduce 的比率
    final int availableReduceSlots = 
      Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);
    boolean exceededReducePadding = false;
    if (availableReduceSlots > 0) {
      exceededReducePadding = exceededPadding(false, clusterStatus, 
                                              trackerReduceCapacity);
      synchronized (jobQueue) {
        for (JobInProgress job : jobQueue) {
          if (job.getStatus().getRunState() != JobStatus.RUNNING ||
              job.numReduceTasks == 0) {
            continue;
          }
          Task t = 
            job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
                                    taskTrackerManager.getNumberOfUniqueHosts()
                                    );
          if (t != null) {
            assignedTasks.add(t);
			//直接break
            break;
          }
          
          // Don't assign reduce tasks to the hilt!
          // Leave some free slots in the cluster for future task-failures,
          // speculative tasks etc. beyond the highest priority job
          if (exceededReducePadding) {
            break;
          }
        }
      }
    }
    
    if (LOG.isDebugEnabled()) {
      LOG.debug("Task assignments for " + taskTracker.getTrackerName() + " --> " +
                "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " + 
                trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" + 
                (trackerCurrentMapCapacity - trackerRunningMaps) + ", " +
                assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps + 
                ")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " + 
                trackerCurrentReduceCapacity + "," + trackerRunningReduces + 
                "] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) + 
                ", " + (assignedTasks.size()-assignedMaps) + "]");
    }
    return assignedTasks;
  }

    主要仍是使用JobQueueTaskScheduler設置了一些和集羣有關的常量,獲取jobQueue。jobQueueJobInProgressListener是它對JobTracker註冊的jobInProgressListener。而後獲取還須要加載的map和reduce任務的數量。保存在int remainingReduceLoad 和 int remainingMapLoad 中。以及是否要擴充slot等。ide

    int numLocalMaps = 0;oop

    int numNonLocalMaps = 0;性能

if (job.getStatus().getRunState() != JobStatus.RUNNING) {
            continue;
          }

繼續向下看:this

// Try to schedule a node-local or rack-local Map task
          t = 
            job.obtainNewLocalMapTask(taskTracker, numTaskTrackers,
                                      taskTrackerManager.getNumberOfUniqueHosts());

看到這塊註釋,翻譯過來分配一個本地或者距離較近的一個MapTask.net

經過這個突破口,看看這個obtaionNewLocalMapTask方法, 可是先看一下getNumberOfUniqueHosts()方法,這個方法很簡單,返回向jobtracker請求UniqueHosts的size,咱們暫且不去care它,接着走進obtainNewLocalMapTask。

public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,
                                                     int clusterSize, 
                                                     int numUniqueHosts)
  throws IOException {
    if (!tasksInited.get()) {
      LOG.info("Cannot create task split for " + profile.getJobID());
      return null;
    }
	//maxLevel = 2 , 指的是NetworkTopology.DEFAULT_HOST_LEVEL
    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel, 
                                status.mapProgress());
    if (target == -1) {
      return null;
    }
    Task result = maps[target].getTaskToRun(tts.getTrackerName());
    if (result != null) {
      addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
    }
    return result;
  }

咱們發現最重要的是這句

 int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel, 

                                status.mapProgress());

private synchronized int findNewMapTask(final TaskTrackerStatus tts, 
                                          final int clusterSize,
                                          final int numUniqueHosts,
                                          final int maxCacheLevel,
                                          final double avgProgress) {
    if (numMapTasks == 0) {
      LOG.info("No maps to schedule for " + profile.getJobID());
      return -1;
    }
    String taskTracker = tts.getTrackerName();
    TaskInProgress tip = null;
    
    //
    // Update the last-known clusterSize
    //
    this.clusterSize = clusterSize;
    if (!shouldRunOnTaskTracker(taskTracker)) {
      return -1;
    }
    // Check to ensure this TaskTracker has enough resources to 
    // run tasks from this job
    long outSize = resourceEstimator.getEstimatedMapOutputSize();
    long availSpace = tts.getResourceStatus().getAvailableSpace();
    if(availSpace < outSize) {
      LOG.warn("No room for map task. Node " + tts.getHost() + 
               " has " + availSpace + 
               " bytes free; but we expect map to take " + outSize);
      return -1; //see if a different TIP might work better. 
    }
    
    
    // For scheduling a map task, we have two caches and a list (optional)
    //  I)   one for non-running task
    //  II)  one for running task (this is for handling speculation)
    //  III) a list of TIPs that have empty locations (e.g., dummy splits),
    //       the list is empty if all TIPs have associated locations
    // First a look up is done on the non-running cache and on a miss, a look 
    // up is done on the running cache. The order for lookup within the cache:
    //   1. from local node to root [bottom up]
    //   2. breadth wise for all the parent nodes at max level
    // We fall to linear scan of the list (III above) if we have misses in the 
    // above caches
    Node node = jobtracker.getNode(tts.getHost());
    
    //
    // I) Non-running TIP :
    // 
    // 1. check from local node to the root [bottom up cache lookup]
    //    i.e if the cache is available and the host has been resolved
    //    (node!=null)
    if (node != null) {
      Node key = node;
      int level = 0;
      // maxCacheLevel might be greater than this.maxLevel if findNewMapTask is
      // called to schedule any task (local, rack-local, off-switch or speculative)
      // tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if findNewMapTask is
      //  (i.e. -1) if findNewMapTask is to only schedule off-switch/speculative
      // tasks
      int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
      for (level = 0;level < maxLevelToSchedule; ++level) {
        List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
        if (cacheForLevel != null) {
          tip = findTaskFromList(cacheForLevel, tts, 
              numUniqueHosts,level == 0);
          if (tip != null) {
            // Add to running cache
            scheduleMap(tip);
            // remove the cache if its empty
            if (cacheForLevel.size() == 0) {
              nonRunningMapCache.remove(key);
            }
            return tip.getIdWithinJob();
          }
        }
        key = key.getParent();
      }
      
      // Check if we need to only schedule a local task (node-local/rack-local)
      if (level == maxCacheLevel) {
        return -1;
      }
    }
    //2. Search breadth-wise across parents at max level for non-running 
    //   TIP if
    //     - cache exists and there is a cache miss 
    //     - node information for the tracker is missing (tracker's topology
    //       info not obtained yet)
    // collection of node at max level in the cache structure
    Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
    // get the node parent at max level
    Node nodeParentAtMaxLevel = 
      (node == null) ? null : JobTracker.getParentNode(node, maxLevel - 1);
    
    for (Node parent : nodesAtMaxLevel) {
      // skip the parent that has already been scanned
      if (parent == nodeParentAtMaxLevel) {
        continue;
      }
      List<TaskInProgress> cache = nonRunningMapCache.get(parent);
      if (cache != null) {
        tip = findTaskFromList(cache, tts, numUniqueHosts, false);
        if (tip != null) {
          // Add to the running cache
          scheduleMap(tip);
          // remove the cache if empty
          if (cache.size() == 0) {
            nonRunningMapCache.remove(parent);
          }
          LOG.info("Choosing a non-local task " + tip.getTIPId());
          return tip.getIdWithinJob();
        }
      }
    }
    // 3. Search non-local tips for a new task
    tip = findTaskFromList(nonLocalMaps, tts, numUniqueHosts, false);
    if (tip != null) {
      // Add to the running list
      scheduleMap(tip);
      LOG.info("Choosing a non-local task " + tip.getTIPId());
      return tip.getIdWithinJob();
    }
    //
    // II) Running TIP :
    // 
 
    if (hasSpeculativeMaps) {
      long currentTime = System.currentTimeMillis();
      // 1. Check bottom up for speculative tasks from the running cache
      if (node != null) {
        Node key = node;
        for (int level = 0; level < maxLevel; ++level) {
          Set<TaskInProgress> cacheForLevel = runningMapCache.get(key);
          if (cacheForLevel != null) {
            tip = findSpeculativeTask(cacheForLevel, tts, 
                                      avgProgress, currentTime, level == 0);
            if (tip != null) {
              if (cacheForLevel.size() == 0) {
                runningMapCache.remove(key);
              }
              return tip.getIdWithinJob();
            }
          }
          key = key.getParent();
        }
      }
      // 2. Check breadth-wise for speculative tasks
      
      for (Node parent : nodesAtMaxLevel) {
        // ignore the parent which is already scanned
        if (parent == nodeParentAtMaxLevel) {
          continue;
        }
        Set<TaskInProgress> cache = runningMapCache.get(parent);
        if (cache != null) {
          tip = findSpeculativeTask(cache, tts, avgProgress, 
                                    currentTime, false);
          if (tip != null) {
            // remove empty cache entries
            if (cache.size() == 0) {
              runningMapCache.remove(parent);
            }
            LOG.info("Choosing a non-local task " + tip.getTIPId() 
                     + " for speculation");
            return tip.getIdWithinJob();
          }
        }
      }
      // 3. Check non-local tips for speculation
      tip = findSpeculativeTask(nonLocalRunningMaps, tts, avgProgress, 
                                currentTime, false);
      if (tip != null) {
        LOG.info("Choosing a non-local task " + tip.getTIPId() 
                 + " for speculation");
        return tip.getIdWithinJob();
      }
    }
    
    return -1;
  }

findNewMapTask有一個taskTrackerstatus的變量,這個變量是幹嗎的呢?

其實,assginTask是tasktracker每過10秒(默認)向jobtracker發送一次心跳,在這個過程當中,他須要將它的狀態傳遞給jobtracker, 這個變量就是jobtracker須要知道的。好了,下面看看這個方法的具體實現。

獲取tip檢查是否能夠跑或者高效的跑,不然返回-1。而後它會從最近的level中尋找task,通常順序爲local -> rack-local -> off-switch ->speculative,閱讀以後會發現 map任務的獲取是和提交任務時的Node相關的。所以,下面還須要閱讀提交做業的相關代碼。

至此,本地化過程的難點轉移到了提交任務那面了。

下面介於篇幅關係,我就不貼代碼了,直接口述。

在提交做業的時候,會創建一個新的JobInProgress,這段很容易找到再也不多說,而後調度器會有一個線程去調用JobInProgress的initTasks方法,這個方法會去調用readSplitFile,這個SplitFile是怎麼生成的等到下篇在說。總之是一個分割任務的文件,而後會創建RawSplits數組調用crerateCache方法來創建nonRunningMapCache,首先將Splits[i].getlocations爲0的加入到nonLocationMaps。而後創建新的Node。它須要調用jobTracker的resolveAndAddToTopology方法,此方法調用dnsToSwitchMapping.resolve方法。dnsTo..由jobTrakcer在初始化的時候由反響映射創建了ScriptBasedMapping ,他的resolve方法首先會調用runResolveCommand方法,這個方法負責運行一個腳本,是從"topology.script.file.name"屬性中獲得的。這個腳本用來返回一個IP地址對應的機架信息,這個徹底由用戶本身手寫,因此通常這個機架感知功能是默認不用的。而後用返回信息的第一個參數做爲參數,傳入NodeBase.normalize。這裏沒腳本的話,返回的是NetworkTopology.DEFAULT_RACK,最後調用addHostToNodeMapping添加host,node到hostnameToNodeMap,再返回到createCache方法,它獲得了機架信息之後,遞歸的將TaskInProgress添加到node中,到此,map tasks就初始化完成了,而後jobtrakcer.initJob將創建cleanip和setup的map和reduce任務的TIP,而後返回。

後面的敘述不知道你們可否看懂,總結一下就是在做業初始化的時候,文件信息和位置信息已經記錄下來,在調度做業的時候,直接給tasktracker最近的分片就好了,

相關文章
相關標籤/搜索