原文地址:http://blog.csdn.net/andyelvis/article/details/7706205java
Configuration類是用來訪問hadoop的配置參數的。node
Configuration類首先會經過靜態代碼段加載hadoop的配置文件core-default.xml和和core-site.xml,相關代碼以下:算法
<span style="font-size:16px;">static{ //print deprecation warning if hadoop-site.xml is found in classpath ClassLoader cL = Thread.currentThread().getContextClassLoader(); if (cL == null) { cL = Configuration.class.getClassLoader(); } if(cL.getResource("hadoop-site.xml")!=null) { LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " + "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, " + "mapred-site.xml and hdfs-site.xml to override properties of " + "core-default.xml, mapred-default.xml and hdfs-default.xml " + "respectively"); } addDefaultResource("core-default.xml"); addDefaultResource("core-site.xml"); } </span>
defaultResources是一個ArrayList,用來保存默認的配置文件路徑。若是一個默認的配置文件路徑不在defaultResource裏面,就添加進去,這個邏輯是在api
addDefaultResource方法中實現的。數組
properties是一個Properties對象,保存從配置文件中解析出來的配置屬性,若是多個配置文件有相同的key,後者會覆蓋前者的值。緩存
JobConf類用來配置Map/Reduce做業信息的,繼承自Configuration類。服務器
JobConf類首先會經過靜態代碼段加載mapred-default.xml和mapred-site.xml配置屬性文件。tcp
DEFAULT_MAPRED_TASK_JAVA_OPTS=「-Xmx200m」,默認狀況下Map/Reduce任務的JAVA命令行選項指定的JAVA虛擬機最大內存是200M。分佈式
JobClient類是用戶與JobTracker交互的主要接口,經過它能夠提交jobs,追蹤job的進度,訪問task組件的日誌,查詢集羣的狀態信息等。ide
提交job是經過runJob方法實現的,相關代碼以下:
<span style="font-size:16px;">public static RunningJob runJob(JobConf job) throws IOException { JobClient jc = new JobClient(job); RunningJob rj = jc.submitJob(job); try { if (!jc.monitorAndPrintJob(job, rj)) { LOG.info("Job Failed: " + rj.getFailureInfo()); throw new IOException("Job failed!"); } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } return rj; }</span>
首先建立一個JobClient對象,此對象在構造函數中會根據JobConf對象去鏈接JobTracker。
JobClient與JobTracker通訊是經過jobSubmitClient操做的,jobSubmitClient是JobSubmissionProtocol類型的動態代理類,是經過以下方法產生的:
<span style="font-size:16px;"> private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr, Configuration conf) throws IOException { return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, JobSubmissionProtocol.versionID, addr, UserGroupInformation.getCurrentUser(), conf, NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class)); }</span>
getProxy方法的關鍵是Invoker類,Invoker類實現了InvocationHandler接口,主要有兩個成員變量,remoteId是Client.ConnectionId類型,保存鏈接地址和用戶的ticket,客戶端鏈接服務器由<remoteAddress,protocol,ticket>惟一標識。從這裏咱們也能夠看到一些配置屬性值,默認的rpcTimeout是0,
ipc.client.connection.maxidletime客戶端鏈接的最大空閒時間是10s,
ipc.client.connect.max.retries客戶端同服務器創建鏈接時的最大重試次數是10,
ipc.client.tcpnodelay是否開啓Nagle算法(對TCP/IP進行擁塞控制),若是開啓,會減小延遲,可是會增長小數據報,默認是false。client是Client類,用於IPC通訊。client會經過ClientCache類來緩存,若是緩存中沒有,會新建一個Client,不然原client計數加1。Invoker類主要的方法是invoke方法,invoke方法的功能是調用client的方法而後返回結果。動態代理類代理的對象是Client對象。
submitJobInternal方法是真正用來提交job的,具體步驟以下:
一、初始化staging目錄,staging目錄根目錄是由mapreduce.jobtracker.staging.root.dir配置的,默認是/tmp/hadoop/mapred/staging,具體到某個用戶的staging目錄是$ROOT/userName/.staging。
二、從JobTracker那裏取得新的job id,job id從1開始遞增。
三、得到提交job的目錄submitJobDir=用戶的staging目錄/jobid,而且將這個目錄設置成mapreduce.job.dir的值。
四、copyAndConfigureFiles拷貝和初始化文件,首先從配置屬性mapred.submit.replication取得replication值,默認爲10。而後判斷submitJobDir目錄是否存在,若是存在拋異常;不然建立submitJobDir目錄;取得job的分佈式緩存文件路徑=submitJobDir/files;取得job的分佈式緩存存檔路徑=submitJobDir/archives;取得job的分佈式緩存libjars路徑=submitJobDir/libjars;若是命令行參數有tmpfiles,則將這些文件拷貝到分佈式緩存文件路徑下,同時將這個路徑加入到分佈式緩存中;若是命令行參數有tmpjars,則將這些文件拷貝到分佈式緩存libjars路徑下,同時將這個路徑加入到分佈式緩存中;若是命令行參數有tmparchives,則將這些文件拷貝到分佈式緩存存檔路徑下,同時將這個路徑加入到分佈式緩存中;根據mapred.jar屬性取得jar包的路徑,若是沒有指定job的名字,那麼將使用jar包的名字做爲job名字;取得job jar的存儲路徑=submitJobDir/job.jar;將用戶指定的jar包拷貝到job jar的存儲路徑;設置工做目錄,默認是配置屬性mapred.working.dir指定的值。
五、取得job配置文件的路徑submitJobFile=submitJobDir/job.xml;設置
mapreduce.job.submithostaddress爲本機ip地址,設置
mapreduce.job.submithost爲本機主機名。
六、爲job建立輸入分區,這是由writeSplits方法完成的。以old api爲例,首先調用InputFormat的getSplits方法獲得一個InputSplit分區數組,FileInputFormat類的getSplits方法實現過程以下:
經過listStatus方法取得輸入文件路徑列表,過濾掉_和.開頭的路徑以及根據設置的mapred.input.pathFilter.class過濾;
在JobConf中設置mapreduce.input.num.files爲輸入文件數;
計算出全部輸入文件的總大小totalSize,目標分區大小goalSize=totalSize/numSplits(由mapred.map.tasks配置,默認爲1),最小分區大小minSize=mapred.min.split.size配置和1之間的較大值,對於每個輸入文件,若是這個文件的長度不等於0而且是可切分的,計算分區大小splitSize=Math.max(minSize,Math.min(goalSize,blockSize)),blockSize爲HDFS存儲文件的塊大小,對於每個分區大小,計算對其貢獻最大的主機數組(根據機架以及塊的字節大小肯定),而後將這個分區加入到分區列表;而後根據分區大長度從大到小對分區列表進行排序;而後將分區列表寫入到分區文件,分區文件名=submitJobDir/job.split,分區文件的存儲格式:SPL字節信息,分區版本號,{InputSplit類名,InputSplit類信息}+;SplitMetaInfo數組記錄每一個分區信息在文件中的偏移,主機信息和長度;將分區Meta信息SplitMetaInfo數組寫入到文件submitJobDir/job.splitmetainfo。
七、JobConf設置mapred.map.tasks爲分區數。
八、根據mapred.job.queue.name得到job提交的隊列的名字,默認是default,而後根據這個隊列名字得到訪問控制列表。
九、將從新配置過的JobConf寫入到submitJobDir/job.xml文件。
十、將jobid,submitJobDir信息傳給JobTracker正式提交job,並經過NetworkedJob對象跟蹤job的狀態。
monitorAndPrintJob方法監控job的運行而且實時打印job的狀態。