應用開發 java
主要知識點以下:node
Configuration類(支持overwrite,variable $)linux
測試(mock單元測試,本地測試,集羣測試)web
Tool, ToolRunner正則表達式
集羣測試(package, 啓動job, Job web UI for namenode and jobtracker)數據庫
運程調試器(keep.failed.task.files = true, 使用ISolationRunner)apache
做業調優(HPROF)數組
MapReduce工做流 (oozie)緩存
1. 在本地運行測試數據tomcat
public class MaxTemperatureDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
Job job = new Job(getConf(), 「compute max temperature」);
job.setJarByClass();
job.setMapperClass();
job.setReducerClass();
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.addOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true);
}
public static void main(String[] args) {
int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
System.exit(exitCode);
}
}
編譯上面的代碼,在根節點處運行hadoop命令(事先將hadoop進程在本地啓動):
hadoop MaxTemperature –conf conf/hadoop-local.xml input/ncdc max-temp
2. 集羣上運行
使用jar命令將class文件打包,而後使用jar命令上傳並啓動任務(事先將hadoop在集羣中啓動):
%hadoop jar job.jar MaxTempratureDriver –conf conf/hadoop-cluster.xml input output
3. Hadoop守護進程的地址和端口
RPC
namenode RPC地址和端口 hdfs://localhost:8020 (fs.default.name)
jobtracker RPC地址和端口 localhost:8021 (mapred.job.tracker)
datanode TCP/IP服務器(塊傳輸) 50010 (dfs.datanode.address)
datanode RPC 地址和端口 localhost:50020 (dfs.datanode.ipc.address)
tasktracker RPC 地址和端口 (mapred.task.tracker.report.address)
HTTP
jobtracker 50030 (mapred.job.tracker.http.address)
tasktracker 50060 (mapred.task.tracker.http.address)
namenode 50070 (dfs.http.address)
datanode 50075 (dfs.datanode.http.address)
secondary 50090 (dfs.secondary.http.address)
4. 做業調試(計數器和狀態)
在map/reduce程序中能夠經過計數器和狀態來記錄數據中的一些狀態,能夠經過webUI或腳本指令來查看運行後的計數器或狀態。
context.setStatus(「」);
context.incrCounter(String group, String counter, int num);
命令行查詢計數器:
%hadoop job –counter job_201111160811_0003 ‘MaxTemperatureMaper$Temperature’ ENUM
遠程調試器
在集羣上運行做業很難調試,可是能夠配置Hadoop保留做業運行期間產生的全部中間值,以便稍後在調試器上從新運行這些出錯的任務。
1) 設置屬性保留中間數據 keep.failed.task.files = true
2) 運行做業,在web界面上查看故障節點和task_attempt_ID;
3) 經過上面的ID來查找保存的中間數據文件。mapred.local.dir定義了本地緩存目錄,在指定的一個或多個目錄下尋找對應的job_id下的task_temp_id目錄,下面存放着job.xml,map輸入的序列化文件,map輸出備份(在output目錄下),和work目錄(task_attempt的工做目錄)。
4) 在腳本控制檯cd到上面的work目錄,設置運程調試器屬性並啓動hadoop進入debug模式:
%export HADOOP_OPTS=」-agentlib:jdwp=transfport=dt_socket,server=y,suspend=y,address=8787」
%hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml
5) 在運程客戶端啓動Java IDE如Eclipse遠程鏈接上面主機的8787端口,在map/reduce源代碼中設置斷點等待。
上述調試技術不僅適用於失敗的任務,還能夠保留成功完成的任務數據來調試內部邏輯。這是,可將屬性keep.task.files.pattern設置爲一個正則表達式(與保留的任務ID匹配)。
其它一些調試的技巧:
在linux下dump Java thread stack trace
若是是在控制檯中運行,則直接ctrl+\
若是是在後臺運行,能夠先找到運行java的pid,而後kill -QUIT PID,會將thread stack內容輸出到該java進程的標準輸出流裏,例如tomcat就會寫在catalina.out裏。
jstack[-l]pid
若是java程序崩潰生成core文件,jstack工具能夠用來得到core文件的javastack和nativestack的信息,從而能夠輕鬆地知道java程序是如何崩潰和在程序何處發生問題。另外,jstack工具還能夠附屬到正在運行的java程序中,看到當時運行的java程序的javastack和nativestack的信息,若是如今運行的java程序呈現hung的狀態,jstack是很是有用的。
5 做業調優
哪些因素影響做業的運行效率?
mapper的數量:儘可能將輸入數據切分紅數據塊的整數倍。若有太多小文件,則考慮CombineFileInputFormat;
reducer的數量:爲了達到最高性能,集羣中reducer數應該略小於reducer的任務槽數。
combiner: 充分使用合併函數減小map和reduce之間傳遞的數據量,combiner在map後運行;
中間值的壓縮:對map輸出值進行壓縮減小到reduce前的傳遞量(conf.setCompressMapOutput(true)和setMapOutputCompressorClass(GzipCodec.class));
自定義序列:若是使用自定義的Writable對象或自定義的comparator,則必須確保已實現RawComparator
調整shuffle:MapReduce的shuffle過程能夠對一些內存管理的參數進行調整,以彌補性能不足;
另外一個有用的方法是啓用JDK的HPROF分析來獲取程序的CPU和堆棧使用狀況。
conf.setProfileEnabled(true); // 「mapred.task.profile」
conf.setProfileParams(「-agentlib:hprof=cpu=samples,heap=sites,depth=6,force=n,thread=y,verbose=n,file=%s」); // 「mapred.task.profile.params」
conf.setProfileTaskRange(true, 「0-2」); // 第一個參數表示map,false則分析reduce;第二個參數任務ID範圍
將上述程序加入驅動程序後從新運行,分析結果將輸出到做業日誌的末尾。
知識點小結:
shuffle影響性能的因素
1 Map –>buffer –> partition, sort, spill to disk (輸出緩衝區,溢出寫磁盤比例,運行combiner最小溢出寫文件數3, task tracker工做線程數)
2 Reduce
copy (5 threads) –> memory (buffer size) –> disk (threhold) –> merge –> reduce
1 剖析MapReduce做業運行機制
1.1 做業的提交
客戶端經過JobClient.runJob()來提交一個做業到jobtracker,JobClient程序邏輯以下:
a) 向Jobtracker請求一個新的job id (JobTracker.getNewJobId());
b) 檢查做業的輸出說明,如已存在拋錯誤給客戶端;計算做業的輸入分片;
c) 將運行做業所須要的資源(包括做業jar文件,配置文件和計算所得的輸入分片)複製到jobtracker的文件系統中以job id命名的目錄下。做業jar副本較多(mapred.submit.replication = 10);
d) 告知jobtracker做業準備執行 (submit job)。
1.2 做業的初始化
job tracker接收到對其submitJob()方法的調用後,將其放入內部隊列,交由job scheduler進行調度,並對其進行初始化,包括建立一個正在運行做業的對象(封裝任務和記錄信息)。
爲了建立任務運行列表,job scheduler首先從共享文件系統中獲取JobClient已計算好的輸入分片信息,而後爲每一個分片建立一個map任務;建立的reduce任務數量由JobConf的mapred.reduce.task屬性決定,schedule建立相應數量的reduce任務。任務此時被執行ID。
1.3 任務的分配
jobtacker應該先選擇哪一個job來運行?這個由job scheduler來決定,下面會詳細講到。
jobtracker如何選擇tasktracker來運行選中做業的任務呢?
每一個tasktracker按期發送心跳給jobtracker,告知本身還活着,是否能夠接受新的任務。jobtracker以此來決定將任務分配給誰(仍然使用心跳的返回值與tasktracker通訊)。每一個tasktracker會有固定數量的任務槽來處理map和reduce(好比2,表示tasktracker能夠同時運行兩個map和reduce),由機器內核的數量和內存大小來決定。job tracker會先將tasktracker的map槽填滿,而後分配reduce任務到tasktracker。
jobtracker選擇哪一個tasktracker來運行map任務須要考慮網絡位置,它會選擇一個離輸入分片較近的tasktracker,優先級是數據本地化(data-local)–>機架本地化(rack-local)。
對於reduce任務,沒有什麼標準來選擇哪一個tasktracker,由於沒法考慮數據的本地化。map的輸出始終是須要通過整理(切分排序合併)後經過網絡傳輸到reduce的,可能多個map的輸出會切分出一部分送給一個reduce,因此reduce任務沒有必要選擇和map相同或最近的機器上。
1.4 任務的執行
1. tasktracker分配到一個任務後,首先從HDFS中把做業的jar文件複製到tasktracker所在的本地文件系統(jar本地化用來啓動JVM)。同時將應用程序所須要的所有文件從分佈式緩存複製到本地磁盤。
2. 接下來tasktracker爲任務新建一個本地工做目錄work,並把jar文件的內容解壓到這個文件夾下。
3. tasktracker新建一個taskRunner實例來運行該任務。TaskRunner啓動一個新的JVM來運行每一個任務,以便客戶的map/reduce不會影響tasktracker守護進程。但在不一樣任務之間重用JVM仍是可能的。子進程經過umbilical接口(?什麼含義,暫時未知)與父進程進行通訊。任務的子進程每隔幾秒便告知父進程的進度,直到任務完成。
Streaming和Pipes是用來運行其它語言編寫的map和reduce。Streaming任務特指任務使用標準輸入輸出steaming與進程通訊,能夠是任何語言編寫的。pipes特指C++語言編寫的任務,其經過socket來通訊(persistent socket connection)。
1.5 進度和狀態的更新
一個做業和每一個任務都有一個狀態信息,包括:做業或任務的運行狀態(running, successful, failed),map和reduce的進度,計數器值,狀態消息或描述。
這些信息經過必定的時間間隔由child JVM –> task tracker –> job tracker匯聚。job tracker將產生一個代表全部運行做業及其任務狀態的全局試圖。你能夠經過Web UI查看。同時JobClient經過每秒查詢jobtracker來得到最新狀態。
1.6 做業的完成
1.7 做業的失敗
2. 做業的調度
默認調度器 – 基於隊列的FIFO調度器
公平調度器(Fair Scheduler)- 每一個用戶都有本身的做業池,用map和reduce的任務槽數來定製做業池的最小容量,也能夠設置每一個池的權重。Fair Scheduler支持搶佔,若是一個池在特定的一段時間內未獲得公平的資源共享,它會停止運行池獲得過多資源的任務,以便把任務槽讓給運行資源不足的池。啓動步驟:
1) 拷貝contrib/fairscheduler下的jar複製到lib下;
2) mapred.jobtracker.taskScheduler = org.apache.hadoop.mapred.FairScheduler
3) 重啓節點hadoop
能力調度器(Capacity Scheduler)-
3. shuffle和排序
shuffle特指map輸出後到reduce運行前獲得輸入的整個過程,它是MapReduce的心臟,屬於不斷被優化和改進的代碼庫的一部分,下面主要針對0.20版本。
Map端
1)Map輸出首先放在內存緩衝區(io.sort.mb屬性定義,默認100MB);
2)守護進程會將緩衝區的數據按照目標reducer劃分紅不一樣的分區(partition),同時按鍵進行內排序;若是客戶端定義了combiner,則combiner會在排序後運行,繼續壓縮緩存區的數據;
3)緩衝區上定義了一個閾值(io.sort.spill.percent,默認爲0.8),當存儲內容達到這個值時,緩衝區的值會被寫到本地文件中(mapred.local.dir定義,能夠是一個或多個目錄);這種文件會有多個,每一個的內容都是按照reducer分區且局部排序的。這個過程簡稱spill to disk;
4)Map輸出完畢前,這些中間的輸出文件會合併成一個已分區且已排序的輸出文件中,合併會分屢次,每次合併的中間文件個數有io.sort.factor來定義,默認是10;這個過程也會伴隨着combiner的運行,min.num.spills.for.combine定義了運行combiner以前溢出寫的次數;
5)寫磁盤時能夠壓縮文件。mapred.compress.map.output設置爲true,mapred.map.output.compression.codec指定壓縮實現類;
map任務完成後,會通知父tasktracker狀態已更新,而後tasktracker經過心跳通知jobtracker。下面的reduce所在的tasktracker有一個線程按期詢問jobtracker以便得到map輸出的位置,直到它得到全部輸出的位置。
Reduce端
1)每一個map任務的完成時間可能不一樣,但只要有一個任務完成,reduce任務得知後就開始複製對應它的輸出,複製線程數由mapred.reduce.parallel.copies定義,默認爲5;
2)若是map輸出至關小,則不用複製到文件中,而是reduce tasktracker的內存中。緩衝區大小由mapred.job.shuffle.input.buffer.percent定義用於此用途的堆空間的百分比,默認0.7;一旦內存緩衝區達到閾值大小(由mapred.iob.shuffle.merge.percent,默認值爲0.66)或達到reduce輸出閾值(mapred.inmem.merge.threshold,默認值爲1000),則合併後溢出寫到磁盤中;
3)隨着磁盤上副本的增多,後臺線程會將它們合併爲更大的排好序的文件。爲了合併,壓縮的map輸出必須在內存中被解壓縮;
4) 複製完全部的map輸出後,reduce任務進入合併階段(sort phase,合併多個文件,並按鍵排序)。io.sort.factor定義了每次合併數,默認爲10,即每10個map輸出合併一次。會有不少個合併後的中間文件。
5)最後直接把中間文件數據輸入給reduce函數,對已排序輸出中的每一個鍵都要調用reduce函數,此階段的輸出直接寫到HDFS中。
配置的調優
總原則:給shuffle過程儘可能多提供內存空間,但也要確保map函數和reduce函數能獲得足夠的內存。
運行map和reduce任務的JVM內存大小有mapred.child.java.opts屬性設置。
在map端,避免屢次溢出寫磁盤來得到最佳性能。計數器spilled.records計算在做業運行整個階段中溢出寫磁盤的記錄數,大則代表寫磁盤太頻繁;
在reduce端,中間數據所有駐留在內存中就能獲得最佳性能。若是reduce函數的內存需求不大,那麼把mapred.inmem.merg.threshold設置爲0,把mapred.job.reduce.input.buffer.percent設置爲1會帶來性能的提高。
4. 任務的執行
Hadoop發現一個任務運行比預期慢的時候,它會盡可能檢測,並啓動另外一個相同的任務做爲備份,即「推測執行」(speculative execution)。
推測執行是一種優化措施,並不能使做業運行更可靠。默認啓用,但能夠單獨爲map/reduce任務設置,mapred.map.tasks.speculative.execution和mapred.reduce.tasks.speculative.execution。開啓此功能會減小整個吞吐量,在集羣中傾向於關閉此選項,而讓用戶根據個別做業須要開啓該功能。
Hadoop爲每一個任務啓動一個新JVM須要耗時1秒,對於大量超短任務若是重用JVM會提高性能。當啓用JVM重用後,JVM不會同時運行多個任務,而是順序執行。tasktracker能夠一次啓動多個JVM而後同時運行,接着重用這些JVM。控制任務重用JVM的屬性是mapred.job.reuse.jvm.num.tasks,它指定給定做業每一個JVM運行的任務的最大數,默認爲1,即無重用;-1表示無限制即該做業的全部的任務都是有一個JVM。
在map/reduce程序中,能夠經過某些環境屬性(Configuration)得知做業和任務的信息。
mapred.job.id 做業ID,如job_201104121233_0001
mapred.tip.id 任務ID,如task_201104121233_0001_m_000003
mapred.task.id 任務嘗試ID,如attempt_201104121233_0001_m_000003_0
mapred.task.partition 做業中任務的ID,如3
mapred.task.is.map 此任務是否爲map任務,如true
1. MapReduce的類型
map(K1, V1) –> list (K2, V2) // 對輸入數據進行抽取過濾排序等操做
combine(K2, list(V2)) –> list(K2, V2) // 爲了減小reduce的輸入,須要在map端對輸出進行預處理,相似reduce。不是全部的reduce都在部分數據集上有效,好比求平均值就不能簡單用於combine
partition(K2, V2) –> integer //將中間鍵值對劃分到一個reduce分區,返回分區索引號。分區內的鍵會排序,相同的鍵的全部值會合成一個組(list(V2))
reduce(K2, list(V2)) –> list(K3, V3) //每一個reduce會處理具備某些特性的鍵,每一個鍵上都有值的序列,是經過對全部map輸出的值進行統計得來的;當得到一個分區後,tasktracker會對每條記錄調用reduce。
默認的map和reduce函數是IdentityMapper和IdentityReducer,均是泛型類型,簡單的將全部輸入寫到輸出中。默認的 partitioner是HashPartitioner,對天天記錄的鍵進行哈希操做以決定該記錄屬於那個分區讓reduce處理。
輸入數據的類型有輸入格式(InputFormat類)進行設置,其它的類型經過JobConf上的方法顯示設置。這裏顯式設置中間和最終輸出類型的緣由是由於Java語言的泛型實現是type erasure。另外若是K2和K3是相同類型,就不須要調用setMapOutputKeyClass(),由於它將調用setOutputKeyClass()來設置。
2. 輸入格式
2.1輸入分片與記錄
一個輸入分片(split)是由單個map處理的輸入塊(分片個數即map所需的tasktracker個數),每一個分片包含若干記錄(key+value),map函數依次處理每條記錄。輸入分片表示爲InputSplit接口,其包含一個以字節爲單位的長度和一組存儲位置,分片不包含數據自己,而是指向數據的引用。
InputSplit是由InputFormat建立的,通常無需應用開發人員處理。InputFormat負責產生輸入分片並將它們分割成記錄。
1) JobClient調用InputFormat.getSplites()方法,傳入預期的map任務數(只是一個參考值);
2)InputFormat計算好分片數後,客戶端將它們發送到jobtracker,jobtracker便使用其存儲位置信息來調度map任務從而在tasktracker上處理這些分片數據。
3)在tasktracker上,map任務把輸入分片傳給InputFormat的getRecordReader()方法來得到這個分片的RecordReader;RecordReader基本上就是記錄上的迭代器,map任務用一個RecordReader來生成記錄的鍵值對,而後在傳給map函數。
2.2 FileInputFormat
輸入路徑可由多個函數FileInputFormat.addInputPath()指定,還能夠利用FileInputFormat.setInputPathFilter()設置過濾器。輸入分片的大小有上個屬性控制:分片最小字節數,分片最大字節數和HDFS數據塊字節數。
mapred.min.split.size, mapred.max.split.size, dfs.block.size
計算公式是:
max(minSplitSize, min(maxSplitSize, blockSize))
沒有特殊需求,應該儘可能讓分片大小和數據塊大小一致。若是HDFS中存在大批量的小文件,則須要使用CombineFileInputFormat將多個文件打包到一個分片中,以便mapper能夠處理更多的數據。一個能夠減小大量小文件的方法(適合於小文件在本地文件系統,在上傳至HDFS以前將它們合併成大文件)是使用SequenceFile將小文件合併成一個或多個大文件,能夠將文件名做爲鍵,文件內容做爲值。
有時候不但願輸入文件被切分,只需覆蓋InputFormat的isSplitable()方法返回false便可。
有時候map程序想知道正在處理的分片信息,能夠經過Configuration中的屬性獲得,包括map.input.file(正在處理的輸入文件的路徑),map.input.start(分片開始處的字節偏移量), map.input.length(分片的字節長度)。
有時候map想訪問一個文件的全部內容,須要一個RecordReader來讀取文件內容做爲record的值。可行的方法是實現一個FileInputFormat的子類,將文件標記爲不可切分,同時指定一個特定的RecordReader;該RecordReader只是在第一次next()時返回文件的內容。
2.3 文本輸入
TextInputFormat是默認的InputFormat。每條記錄是一行輸入。鍵是LongWritable類型,存儲該行在整個文件中的字節偏移量;值是這行的內容,不包括任何行終止符(換行符和回車符),Text類型。因爲一行的長度不定,因此極易出現split分片會跨越HDFS的數據塊。
KeyValueTextInputFormat將文件的每一行看做一個鍵值對,使用某個分界符進行分隔,好比製表符。Hadoop默認輸出的TextOutputFormat格式即鍵值對爲一行組成一個文件,處理這類文件就可使用鍵值文本輸入格式。
NLineInputFormat能夠保證map收到固定行數的輸入分片,鍵是文件中行的字節偏移量,值是行內容。默認爲1,即一行爲一個分片,送給每一個map。
2.4 二進制輸入
SequenceFileInputFormat存儲二進制的鍵值對的序列。順序文件SequenceFile是可分割的,也支持壓縮,很符合MapReduce數據的格式。
2.5 多種輸入
Hadoop也支持在一個做業中對不一樣的數據集進行鏈接(join),即定義多個不一樣的數據輸入源,每一個源對應不一樣的目錄、輸入格式和Map函數。
MultipleInputs.addInputpath(conf, inputPath, TextInputFormat.class, MaxTemperatureMapper.class);
2.6 數據庫輸入和輸出
DBInputFormat用於使用JDBC從關係數據庫中讀取數據,但只適合少許的數據集。若是須要與來自HDFS的大數據集鏈接,要使用MultipleInputs。
在關係數據庫和HDFS之間移動數據的另外一個方法是Sqoop。
HBase和HDFS之間移動數據使用TableInputFormat和TableOutputFormat。
3. 輸出格式
TextOutputFormat是默認的輸出格式,它把每條記錄寫爲文本行,鍵和值能夠是任意類型。
SequenceFileOutputFormat將輸出寫入一個順序文件,是二進制格式。MapFileOutputFormat把MapFile做爲輸出,鍵必須順序添加,因此必須確保reducer輸出的鍵已經排好序。
FileOutputFormat及其子類產生的文件放在輸出目錄下,每一個reducer一個文件而且文件由分區號命名,如part-00000,part-00001等。有時候須要對文件名進行控制,或讓每一個reduce輸出多個文件,則可以使用MultipleOutputFormat和MultipleOutputs類。
MultipleFileOuputFormat能夠將數據寫到多個文件,關鍵是如何控制輸出文件的命名。它有兩個子類:MultipleTextOutputFormat和MultipleSequenceFileOutputFormat。在使用多文件輸出時,只需實現它們任何一個的子類,並覆蓋generateFileNameForKeyValue()返回輸出文件名。
MultipleOutputs類不一樣的是,能夠爲不一樣的輸出產生不一樣的類型。
MultipleOutputs.addMultiNameOutput(conf, 「name」, TextOutputFormat.class, KeyClass, valueClass);
新版本Hadoop中上述兩個多輸出類也合併。
FileOutputFormat的子類會產生輸出文件,即便文件是空的。可使用LazyOutputFormat來去除空文件。
這章主要總結MapReduce的高級特性,包括計數器,數據集的排序和鏈接。
1. 計數器
計數器是一種收集做業統計信息的有效手段,因爲質量控制或應用統計。計數器還可輔助診斷系統故障。
Hadoop爲每一個做業維護若干內置計數器,以描述該做業的各項指標。計數器由關聯任務維護,並按期(3秒)傳到tasktracker,再由tasktracker傳給jobtracker(5秒,心跳)。一個任務的計數器值每次都是完整傳輸的,而非增量值。
MapReduce容許用戶編寫程序定義計數器,通常是由一個Java枚舉(enum)類型定義。枚舉類型的名稱即計數器組名稱,枚舉類型的字段即計數器名稱。計數器在做業實例級別是全局的,MapReduce框架會跨全部的map和reduce來統計這些計數器,並在做業結束時產生一個最終的結果。
enum Temperature {
MISSING, MAlFORMED
}
…
context.incrCounter(Temperature.MISSING, 1);
MapReduce同時支持非枚舉類型的動態計數器。
context.incrContext(String group, String counter, int amount);
計數器能夠經過不少方式獲取,Web界面和命令行(hadoop job -counter指令)以外,用戶能夠用Java API獲取計數器的值。
RunningJob job = jobClient.getJob(JobID.forName(id));
Counters counters = job.getCounters();
long missing = counters.getCounter(MaxTemperatue.Temperature.MISSING);
2. 排序
排序是MapReduce的核心技術,儘管應用程序自己不須要對數據排序,但可使用MapReduce的排序功能來組織數據。默認狀況下,MapReduce根據輸入記錄的鍵對數據排序。鍵的排列順序是由RawComparator控制的,規則以下:
1)若屬性mapred.output.key.comparator.class已設置,則使用該類的實例;
2)不然鍵必須是WritableComparable的子類,並使用針對該鍵類的已登記的comparator;
3)若是尚未已登記的comparator,則使用RawComparator將字節流反序列化爲一個對象,再由WritableComparable的compareTo()方法進行操做。
全排序
如何用Hadoop產生一個鍵全局排序的文件?(最好的回答是使用Pig或Hive,二者都可使用一條指令進行排序)
大體方法是,想辦法建立一系列排好序的文件,並且這些文件直接也是排序的,比方說第一個文件的值都不第二個文件的值小,則簡單的拼裝這些文件就能夠獲得全局排序的結果。問題是如何劃分這些文件,並把原始文件的值放入這些排序的文件中?可使用map的partition來將某一範圍的鍵放入對於的reduce,每一個reduce的輸入能夠保證已排序(局部排序),默認直接輸出到part-000×,那全部這些輸出組合成一個文件就是全局排序的。爲了獲得合適的範圍,須要對全部輸入數據進行統計,實際作法是經過抽樣,Hadoop提供InputSampler和IntervalSampler。使用抽樣函數事先對input數據進行抽樣,獲得抽樣範圍,而後將範圍寫入分佈式緩存,供集羣上其它任務使用。
DistributedCache.addCacheFile(cacheFile, conf);
DistributedCache.createSymlink(conf);
輔助排序
MapReduce框架在記錄達到reducer以前按鍵對記錄排序,但鍵所對應的值並無排序。大多狀況下不需考慮值在reduce函數中的出現順序,可是,有時也須要經過對鍵進行排序和分組等以實現對值的排序。
例子:設計一個MapReduce程序以計算每一年最高氣溫。
1)使用組合鍵IntPair,將年份和睦溫都做爲鍵;
2)按照年份來分區和分組,但排序須要按照年份升序和睦溫降序。
conf.setPartitionerClass();
conf.setOutputKeyComparatorClass();
conf.setOutputValueGroupingComparator();
3 鏈接
MapReduce能執行大型數據集間的「鏈接」操做。
Map端鏈接指在數據到達map函數以前就執行鏈接操做。爲達到此目的,各map的輸入數據必須先分區而且以特定方式排序。各個數據集被劃分紅相同數量的分區,而且均按相同的鍵(鏈接鍵)排序。同一鍵的全部記錄均會放在同一分區之中。
map鏈接操做能夠鏈接多個做業的輸出,只要這些做業的reduce數量相同,鍵相同,而且輸出文件是不可切分的(如小於HDFS塊大小,或gzip壓縮)。利用org.apache.mapred.join包中的CompositeInputFormat類來運行一個map端鏈接,其輸入源和鏈接類型(內鏈接或外鏈接)能夠經過一個鏈接表達式進行配置。
Reduce鏈接不要求數據集符合特定結構,所以比Map鏈接更爲經常使用。可是,因爲數據集均通過mapReduce的shuffle過程,因此reduce端鏈接的效率每每更低一些。基本思路是mapper爲各個記錄標記源,而且使用鏈接鍵做爲map輸出鍵,使鍵相同的記錄放在同一個reducer中。
1)可使用MultipleInputs來解析和標註各個源;
2)先將某一個數據源傳輸到reduce。舉天氣數據爲例,氣象站信息(氣象站id和名字)以氣象站ID+「0」爲組合鍵,名字爲值,可是按照ID來分區和分組;氣象站天氣狀況(氣象站id,時間和睦溫)以氣象站ID+「1」爲組合鍵,氣溫爲值,可是按照ID來分區和分組。兩組數據通過不一樣的map以後,具備相同的ID的記錄被合併做爲一個記錄輸入reduce程序,值列表中的第一個是氣象站名稱,其他的記錄都是溫度信息。reduce程序只須要取出一個值,並將其做爲後續每條輸出記錄的一部分寫到輸出文件便可。
conf.setPartitionerClass();
conf.setOutputValueGroupingComparator(Textpair.FirstComparator.class);
4 邊數據分佈(side data)
邊數據是做業所需的額外的只讀數據,已輔助處理主數據集。面臨的挑戰是如何讓全部的map和reduce都能方便高效地使用邊數據。
1)若是僅需向任務傳遞少許元數據,則能夠經過Configuration來設置每一個job的屬性,則map/reduce能夠覆蓋configure()方法來獲取這些元數據值。若是你設置的值是複雜對象,則須要處理序列化工做。在幾百個做業同在一個系統中運行的狀況下,這種方法會增多內存開銷,並且元數據信息在全部節點都緩存,即便在不須要它的jobtracker和tasktracker上。
2)針對小數據量邊數據的經常使用辦法是將在map/reduce數據緩存在內存中,並經過重用JVM使tasktracker上同一個做業的後續任務共享這些數據。
3)分佈式緩存 (-files, -archives)
a)啓動做業時,使用files或archives傳入元數據文件路徑,
%hadoop jar job.jar MaxTempratureSample –file input/metadata/stations-fixed-width.txt input/all output
b)當tasktracker得到任務後,首先將jobtracker中的上述文件複製到本地磁盤,具體在${mapred.local.dir}/taskTracker/archive,緩存的容量是有限的,默認10GB,能夠經過local.cache.size來設置。
c)在map/reduce程序中,直接讀取「stations-fixed-width.txt」文件。同時能夠經過JobConf.getLocalCacheFiles()和JobConf.getLocalCacheArchives()來獲取本地文件路徑的數組。
5 MapReduce類庫
Hadoop還提供了一個MapReduce類庫,方便完成經常使用的功能。
ChainMapper, ChainReducer 在一個MapReduce中運行多個mapper或reducer。(M+RM*)
IntSumReducer, LongSumReducer 對各鍵的全部整數值進行求和操做的reducer
TokenCounterMapper 輸出各單詞及其出現的次數
RegexMapper 檢查輸入值是否匹配某正則表達式,輸出匹配字符串和計數器值