hadoop mapreduce過程分析學習

mapreduce框架學習

第一代mapreduce侷限性

擴展性差:

JobTracker同時具有了資源管理和做業控制兩個功能,制約了hadoop集羣擴展性前端

資源利用率低,mr1採用了基於槽位的資源分配模型,槽位slot是一種細粒度的資源劃分單位,一個任務task不會用完槽位對應的資源,其餘任務也沒法使用這些空閒資源。,hadoop將槽位分爲map slit和 reduce slot,不容許他們之間共享資源。java

不支持多種計算框架,包括內存計算框架,流式框架,迭代計算框架

 

 

JobTracker不夠靈活,

 

負責資源管理和做業控制:linux

 

統一資源管理和調度的平臺典型表明shi yarn (yet another Resource Negotiator)apache

 

RM和AM,

 

yarn實際上採用的是拉式通訊模型。後端

對於maptask 它的生命週期爲scheduled->assigned->completedapi

對於redice task,它的生命週期爲pending->scheduled-assigned->completed,數組

 

Yarn上運行mapreduce須要解決兩個關鍵問題,如何肯定reduce task 啓動時機以及如何完成shuffle功能。緩存

 

 

 

 

 

 

 

 

 

 

mapreduce通訊協議類關係圖安全

 

 

 

JobSubmissionProtocol是Cient與HJobTracker之間的通訊協議,經過該協議查看做業運行狀態。框架

/**

 * jobName 做業id,client能夠爲做業得到惟一的id

 * jobsubmitdir爲做業文件所在的目錄,hdfs上的一個目錄,ts是該做業分哦誒到的祕鑰或者是安全令牌

 *

 * @author user

 *

 */

public interface JobClient {

         // 做業提交

         public JobStatus submitJob(JobID jobName, String jobSubmitDir, Credentials ts) throws IOException;

 

         // 做業控制

         // 修改做業優先級setJobPriority函數;殺死一個做業killJob,殺死一個任務killTask

         // 查看系統狀態和做業運行狀態

         // 當前集羣狀態slot總數,全部正在運行的task數目

         public ClusterStatus getClusterStatus(boolean detailed) throws IOException;

 

         // 得到某個做業的運行狀態

         public JobStatus getJobStatus(JobID jobid) throws IOException;

         //得到全部做業運行狀態

         public JobStatus[] getAllJobs() throws IOException;

 

}

 

InterTarckerProtocol通訊協議

是taskTracker和JobTracker之間的通訊協議,向jobTracker彙報所在節點的資源使用狀況和任務的運行狀況,接收並執行jobTracker發送的命令。

heartbeat週期性地被調用,造成了jobTracker和TaskTracker之間的心跳

//輸入TaskTrackerStatus封裝所在節點資源使用狀況和任務的運行狀況

         //輸出HeartbeatResponse包含一個TaskTrakcerAtion類型的數組,包含了jobtracker向taskTracker傳達的各類命令

         HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact,boolean acceptNewTasks, boolean acceptNewTasks, short responseId) throws IOException;

 

TaskUmbilicalProtocol 通訊協議

Task和taskTracker之間的通訊協議,經過該協議彙報本身的運行狀態或者出錯信息。

 

//週期性調用方法

         boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus, JvmContext jbmContext) throws Exception;

 

         boolean ping(TaskAttemptID taskId, JvmContext jbmContext) throws IOException;

         //按需調用方法

        

         //task初始化,收到啓動命令,LaunchTaskAction,子程序去調用getTask()方法領取對應的task.

        

        

         //task運行中 reportDiagnosticInfo、fsError/fatalError分別彙報出現的Exception/FSEror/Throwble異常和錯誤,對於ReduceTask提供shuffleError彙報shuffle階段出現的 錯誤

         //reprotNextRecordRange getMapCompletionEvent從TaskTracker得到一已經完成的map task列表

         //task運行完成 commitPending,canCommit ,done

hadoop白名單和黑名單, bin/hadoop mradmin –refreshNodes

 

 

 

 

做業提交和初始化

步驟:

1 用戶提交做業

2 將做業配置信息,將運行須要的文件上傳到jobTracker文件系統中

3JobClient調用RPC接口向JobTracker提交做業

4收到做業後後告知TakScheduler,對做業進行初始化、

 

 

 

DisbutedCache負責在hdfs文件系統中文件的上傳和下載。

 

jobClient中文件的split:

InputSplit org.apache.hadoop.maperduce.split

 JobSplit ,JobSplitWriter

 SplitMetaInfoReader

 

做業提交到JobTracker

 

 

JobClient調用rpc方法submitJob將祖業提交到JobTracker端,JobTracker的submitJon中,會有如下操做:

1爲每一個做業建立jobInProgress對象

2 檢查用戶是否有指定隊列的做業提交權限

3 做業配置的內存使用量是否合理  map task,reduce task使用的內存配置

4.通知TaskScheduler初始化做業,按必定的策略去初始化做業

 

 

// 爲每一個做業建立jobInProgress對象,在做業運行時一直存在,跟蹤正在運行做業的運行狀態和進度

         private ConcurrentHashMap<String, Object> jobs;

         private Schedulable taskScheduler;

 

         private synchronized JobStatus addJob(JobID jobId, JobInProgress job) throws Exception {

                   synchronized (jobs) {

                            synchronized (taskScheduler) {

                                     jobs.put(job.getProfile().getJobID(), job);

                                     for(JobInProgressListener listener:jobInProgressListeners){

                                               listener.jobAdded(job);

                                     }

                            }

                   }

         }

         public synchronized void start() throws IOException{

                   super.start();

                   //do otherthing

                  

         }

         public static JobTracker startTracker(JobConf conf,String identifier) throws Exception{

                   //do something

                   result=new JobTracker(conf,indetifier);

                   result.taskScheduler.setTaskTackerManager(result);

                   //do something

         }

 

做業初始化過程詳解

taskScheduler調度器遠程調用JobTracker中的initJob()對新做業進行初始化,主要工做是構造mao task和reduce task 並對他們進行初始化。

 

TaskInProgress對象

setupTask

map task

reduce task: map task 達到必定的數目纔開始reduce task

cleanup task 刪除運行過程當中的一些臨時目錄(好比說_temporay目錄),一旦該任務運行成功後,做業由Running變爲succeeded狀態

每個做業運行時都會佔用一個slot

 

Hadoop DistributedCache分發文檔到taskTracker節點org.apache.hadoop.filecache

         void addCacheArchive(URI uri, Configuration conf);

 

         void setCacheArchives(URI[] archives, Configuration conf);

 

         void addCacheFile(URI uri, Configuration conf);

        

         void setCacheFile(URI[] files, Configuration conf);

其中的一些方法

 

 

小結:做業的提交和初始化,涉及到三個重要的組件,JobClinet,jobTracker,TaskScheduler

 

 

JobTracker內部實現

 

 

JobInProgress TaskInProgress,完成後表示整個做業就完成了。

JObTarcker啓動過程:

 

ACLsManager類,權限管理類 隊列權限 和做業權限

 

其中的各類線程

expireTracjersThread 清理

retireJonsThread

expireLaunchingTaskThread

completedJonsStireThread

JobTracker爲關鍵事件記錄日誌,包括做業提交,做業建立,做業開始運行,做業運行完成,做業運行失敗,做業被殺死,經過日誌恢復這些做業的運行狀態。

 

JoobTracker和TaskTracker之間是拉模型,

心跳:判斷taskTracker是否活着

及時讓JObTracker得到各個節點上的資源使用狀況和任務運行狀態

爲TaskTracjer分配任務

jobTracker會賦予每一個做業一個惟一的ID,id由三部分組成:做業前綴字符串,JobTracker啓動時間和做業提交順序,各部分經過+_+鏈接起來組成一個完整的做業ID :job ,201208071506,009(jobTracker運行以來的第9個做業)。

 

jobinprogress 做業靜態信息,已經肯定好的屬性信息,

做業動態信息

 

 taskinprogress

 

 taskAttempt

 

 

 

 

 

 

 

 

jobTracker容錯機制

 

 

Task運行過程分析

IFile存儲格式:支持行壓縮的存儲格式

排序:map task 和reduce task都會有排序

map task 中會將結果暫時放到緩存區中,是環形的,當緩存區使用率達到必定的閾值後,再對緩存區的數據進行一次排序,將有序的數據以IFile文件的形式寫到磁盤,完成後,會將全部文件進行一次合併,成爲一個有序的大文件。

Reduce task 從遠程每一個maptask拷貝相應的數據,先放到內存,達到閾值後再合併生成更大的文件,若是內存中文件大小和數目超過必定的閾值,就會將數據寫到磁盤,當全部數據拷貝完成後,對全部數據進行一次合併。

 

 

 

 

計數器

java和hadoop中計數器的實現

基於枚舉類型和計數器類型的計數器api

public abstract void incrCounter(Enum<?> key,long amount);

public abstract void incrCounter(String group,String counter,long amount);

HadoopPipes::TaskContext::Counter*mapCounter;//定義

mapCOunter=context.getCounter(「counterGroup」,」mapCounter」)//註冊

cotnext.incrementCounter(mapCounter,1);//使用

MapTask

單向緩存區,雙向緩存區,環形緩存區

key是排序的關鍵字,一般須要交給RawConparator排序,要求排序關鍵字在內存咋紅必須連續存儲

經過內存複製解決不連續的問題,複製到前端,複製到後端,可能key或者value太大,以致於整個緩存區都不能容納它,拋出異常,並將該記錄單獨輸出到一個文件中

Spill溢寫過程

是有SpillThread線程完成。是kvbuffer的消費者

 

spillLock.lock();

         while(true){

                   spollDone.signal();

                   while(kvstart=kvend){

                            spillReady.await();

                   }

                   spillLock.unlock();

                   sortAndSpill();

                   spillLock.lock();

                   if(bufend<bufindex&&bufindex<bufstart){

                            bufvoid=kvbuffer.length;

                   }

                   vstart=kvend;

                   bufstart=bufend;

                 

         }

         spiilLock.unlock();

 

當全部數據完成拷貝後,再對全部數據進行一次排序,並將key相同的記錄分組依次交給reduce()程序處理。

Hadoop性能調優

從幾個角度來看:

管理員角度:

硬件

linux操做系統參數調優

JVM參數調優

 

從用戶角度進行調優

合理使用DIstributedCache

當應用程序須要使用外部文件時,獲得外部文件的方法有兩種,一種是與jar包一塊兒放到客戶端,看成業提交時傳到hdfs的某個目錄下,而後經過DistributedCache分發到各個節點上,另外一個方法是將外部文件直接放到hdfs上,第二種方法更高效。

跳過壞記錄

提升做業優先級

 

 

HADOOP安全機制

sasl

相關文章
相關標籤/搜索