JobTracker同時具有了資源管理和做業控制兩個功能,制約了hadoop集羣擴展性前端
資源利用率低,mr1採用了基於槽位的資源分配模型,槽位slot是一種細粒度的資源劃分單位,一個任務task不會用完槽位對應的資源,其餘任務也沒法使用這些空閒資源。,hadoop將槽位分爲map slit和 reduce slot,不容許他們之間共享資源。java
負責資源管理和做業控制:linux
統一資源管理和調度的平臺典型表明shi yarn (yet another Resource Negotiator)apache
yarn實際上採用的是拉式通訊模型。後端
對於maptask 它的生命週期爲scheduled->assigned->completedapi
對於redice task,它的生命週期爲pending->scheduled-assigned->completed,數組
Yarn上運行mapreduce須要解決兩個關鍵問題,如何肯定reduce task 啓動時機以及如何完成shuffle功能。緩存
mapreduce通訊協議類關係圖安全
JobSubmissionProtocol是Cient與HJobTracker之間的通訊協議,經過該協議查看做業運行狀態。框架
/**
* jobName 做業id,client能夠爲做業得到惟一的id
* jobsubmitdir爲做業文件所在的目錄,hdfs上的一個目錄,ts是該做業分哦誒到的祕鑰或者是安全令牌
*
* @author user
*
*/
public interface JobClient {
// 做業提交
public JobStatus submitJob(JobID jobName, String jobSubmitDir, Credentials ts) throws IOException;
// 做業控制
// 修改做業優先級setJobPriority函數;殺死一個做業killJob,殺死一個任務killTask
// 查看系統狀態和做業運行狀態
// 當前集羣狀態slot總數,全部正在運行的task數目
public ClusterStatus getClusterStatus(boolean detailed) throws IOException;
// 得到某個做業的運行狀態
public JobStatus getJobStatus(JobID jobid) throws IOException;
//得到全部做業運行狀態
public JobStatus[] getAllJobs() throws IOException;
}
是taskTracker和JobTracker之間的通訊協議,向jobTracker彙報所在節點的資源使用狀況和任務的運行狀況,接收並執行jobTracker發送的命令。
heartbeat週期性地被調用,造成了jobTracker和TaskTracker之間的心跳
//輸入TaskTrackerStatus封裝所在節點資源使用狀況和任務的運行狀況
//輸出HeartbeatResponse包含一個TaskTrakcerAtion類型的數組,包含了jobtracker向taskTracker傳達的各類命令
HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact,boolean acceptNewTasks, boolean acceptNewTasks, short responseId) throws IOException;
TaskUmbilicalProtocol 通訊協議
Task和taskTracker之間的通訊協議,經過該協議彙報本身的運行狀態或者出錯信息。
//週期性調用方法
boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus, JvmContext jbmContext) throws Exception;
boolean ping(TaskAttemptID taskId, JvmContext jbmContext) throws IOException;
//按需調用方法
//task初始化,收到啓動命令,LaunchTaskAction,子程序去調用getTask()方法領取對應的task.
//task運行中 reportDiagnosticInfo、fsError/fatalError分別彙報出現的Exception/FSEror/Throwble異常和錯誤,對於ReduceTask提供shuffleError彙報shuffle階段出現的 錯誤
//reprotNextRecordRange getMapCompletionEvent從TaskTracker得到一已經完成的map task列表
//task運行完成 commitPending,canCommit ,done
hadoop白名單和黑名單, bin/hadoop mradmin –refreshNodes
步驟:
1 用戶提交做業
2 將做業配置信息,將運行須要的文件上傳到jobTracker文件系統中
3JobClient調用RPC接口向JobTracker提交做業
4收到做業後後告知TakScheduler,對做業進行初始化、
DisbutedCache負責在hdfs文件系統中文件的上傳和下載。
jobClient中文件的split:
InputSplit org.apache.hadoop.maperduce.split
JobSplit ,JobSplitWriter
SplitMetaInfoReader
做業提交到JobTracker
JobClient調用rpc方法submitJob將祖業提交到JobTracker端,JobTracker的submitJon中,會有如下操做:
1爲每一個做業建立jobInProgress對象
2 檢查用戶是否有指定隊列的做業提交權限
3 做業配置的內存使用量是否合理 map task,reduce task使用的內存配置
4.通知TaskScheduler初始化做業,按必定的策略去初始化做業
// 爲每一個做業建立jobInProgress對象,在做業運行時一直存在,跟蹤正在運行做業的運行狀態和進度
private ConcurrentHashMap<String, Object> jobs;
private Schedulable taskScheduler;
private synchronized JobStatus addJob(JobID jobId, JobInProgress job) throws Exception {
synchronized (jobs) {
synchronized (taskScheduler) {
jobs.put(job.getProfile().getJobID(), job);
for(JobInProgressListener listener:jobInProgressListeners){
listener.jobAdded(job);
}
}
}
}
public synchronized void start() throws IOException{
super.start();
//do otherthing
}
public static JobTracker startTracker(JobConf conf,String identifier) throws Exception{
//do something
result=new JobTracker(conf,indetifier);
result.taskScheduler.setTaskTackerManager(result);
//do something
}
taskScheduler調度器遠程調用JobTracker中的initJob()對新做業進行初始化,主要工做是構造mao task和reduce task 並對他們進行初始化。
TaskInProgress對象
setupTask
map task
reduce task: map task 達到必定的數目纔開始reduce task
cleanup task 刪除運行過程當中的一些臨時目錄(好比說_temporay目錄),一旦該任務運行成功後,做業由Running變爲succeeded狀態
每個做業運行時都會佔用一個slot
Hadoop DistributedCache分發文檔到taskTracker節點org.apache.hadoop.filecache
void addCacheArchive(URI uri, Configuration conf);
void setCacheArchives(URI[] archives, Configuration conf);
void addCacheFile(URI uri, Configuration conf);
void setCacheFile(URI[] files, Configuration conf);
其中的一些方法
小結:做業的提交和初始化,涉及到三個重要的組件,JobClinet,jobTracker,TaskScheduler
JobInProgress TaskInProgress,完成後表示整個做業就完成了。
JObTarcker啓動過程:
ACLsManager類,權限管理類 隊列權限 和做業權限
其中的各類線程
expireTracjersThread 清理
retireJonsThread
expireLaunchingTaskThread
completedJonsStireThread
JobTracker爲關鍵事件記錄日誌,包括做業提交,做業建立,做業開始運行,做業運行完成,做業運行失敗,做業被殺死,經過日誌恢復這些做業的運行狀態。
JoobTracker和TaskTracker之間是拉模型,
心跳:判斷taskTracker是否活着
及時讓JObTracker得到各個節點上的資源使用狀況和任務運行狀態
爲TaskTracjer分配任務
jobTracker會賦予每一個做業一個惟一的ID,id由三部分組成:做業前綴字符串,JobTracker啓動時間和做業提交順序,各部分經過+_+鏈接起來組成一個完整的做業ID :job ,201208071506,009(jobTracker運行以來的第9個做業)。
jobinprogress 做業靜態信息,已經肯定好的屬性信息,
做業動態信息
taskinprogress
taskAttempt
jobTracker容錯機制
IFile存儲格式:支持行壓縮的存儲格式
排序:map task 和reduce task都會有排序
map task 中會將結果暫時放到緩存區中,是環形的,當緩存區使用率達到必定的閾值後,再對緩存區的數據進行一次排序,將有序的數據以IFile文件的形式寫到磁盤,完成後,會將全部文件進行一次合併,成爲一個有序的大文件。
Reduce task 從遠程每一個maptask拷貝相應的數據,先放到內存,達到閾值後再合併生成更大的文件,若是內存中文件大小和數目超過必定的閾值,就會將數據寫到磁盤,當全部數據拷貝完成後,對全部數據進行一次合併。
java和hadoop中計數器的實現
基於枚舉類型和計數器類型的計數器api
public abstract void incrCounter(Enum<?> key,long amount);
public abstract void incrCounter(String group,String counter,long amount);
HadoopPipes::TaskContext::Counter*mapCounter;//定義
mapCOunter=context.getCounter(「counterGroup」,」mapCounter」)//註冊
cotnext.incrementCounter(mapCounter,1);//使用
單向緩存區,雙向緩存區,環形緩存區
key是排序的關鍵字,一般須要交給RawConparator排序,要求排序關鍵字在內存咋紅必須連續存儲
經過內存複製解決不連續的問題,複製到前端,複製到後端,可能key或者value太大,以致於整個緩存區都不能容納它,拋出異常,並將該記錄單獨輸出到一個文件中
是有SpillThread線程完成。是kvbuffer的消費者
spillLock.lock();
while(true){
spollDone.signal();
while(kvstart=kvend){
spillReady.await();
}
spillLock.unlock();
sortAndSpill();
spillLock.lock();
if(bufend<bufindex&&bufindex<bufstart){
bufvoid=kvbuffer.length;
}
vstart=kvend;
bufstart=bufend;
}
spiilLock.unlock();
當全部數據完成拷貝後,再對全部數據進行一次排序,並將key相同的記錄分組依次交給reduce()程序處理。
從幾個角度來看:
管理員角度:
硬件
linux操做系統參數調優
JVM參數調優
從用戶角度進行調優
合理使用DIstributedCache
當應用程序須要使用外部文件時,獲得外部文件的方法有兩種,一種是與jar包一塊兒放到客戶端,看成業提交時傳到hdfs的某個目錄下,而後經過DistributedCache分發到各個節點上,另外一個方法是將外部文件直接放到hdfs上,第二種方法更高效。
跳過壞記錄
提升做業優先級