MapReduce高級特性

計數器

由於計數器的查看每每比查看集羣日誌來的方便快捷
因此有些狀況下計數器信息比集羣日誌更加有效java

用戶自定義的計數器

關於Hadoop的內置計數器的介紹能夠參考Hadoop權威指南第九章MapReduce Features中的Build-in Counts小節
這裏限於篇幅再也不說明node

MapReduce容許用戶在程序中使用枚舉或者字符串的格式類自定義計數器
一個做業能夠定義的計數器不限,使用枚舉類型時
枚舉類型的名稱即爲組名,枚舉類型的字段即爲計數器名
計數器是全局的,會跨越全部Mapper和Reducer進行使用,並在做業結束的時候產生一個結果web

例如,現有枚舉類型以下:apache

enum Temperature{
    MISSING,
    MALFORMAT
}

在MapReduce程序中能夠這樣來使用計數器:數組

context.getCounter(Temperature.MISSING).increment(1);
context.getCounter(Temperature.MALFORMAT).increment(1);

動態計數器

因爲枚舉類型在編譯的時候就肯定了全部字段,可是某些狀況下咱們可能要根據未知的名稱來命名計數器
這個時候就可使用動態計數器來實現:緩存

context.getCounter("計數器組名","計數器名").increment(1);

這裏的計數器名的得到方式能夠是任意的,例如動態獲取的字段值等
可是大部分狀況下,枚舉類型能夠足夠使用了,並且枚舉類型閱讀性較強,易於使用,並且是類型安全的
因此推薦儘量的使用枚舉類型安全

在代碼中獲取計數器的值

除了經過Web UI、CLI和-counter參數得到做業的計數器,用戶也能夠經過代碼在程序中獲取計數器的值:app

String jobId = args[0];
Cluster cluster = new Cluster(getConf());
Job job = cluster.getJob(JobId.forName(jobId));
if(job == null){
    System.err.println("No job whih ID %s found",jobId);
    return -1;
}
if(!job.isComplete()){
    System.err.println("Job %s is not complete",jobId);
    return -1;
}
Counters counters = job.getCounters();
//關鍵代碼
long missing = conters.findCounter(Temperature.MISSING).getValue();
long total = counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();

排序

部分排序

部分排序是指在map階段,對每一個分區中的數據進行排序的過程dom

Hadoop提交做業自定義排序和分組中能夠看到
MapReduce中控制部分排序的方法不僅有一種,控制排序的順序以下:分佈式

1.若是設置了mapreduce.job.output.key.comparator.class屬性或者setComparatorClass()方法,則使用設置的類進行部分排序
2.不然,鍵必須是WritableComparable的子類,並使用針對該鍵類型的已經註冊的comparator
3.不然,使用RawComparator將字節流反序列化爲對象,並調用WritableComparable的comparaTo()方法

咱們在自定義數據類型的時候繼承自WritableComparable,並重寫了comparaTo方法,這裏的設置是最後纔會使用的
若是定義了RawComparator/WritableComparator的具體實現類,那麼將會優先使用這個設置,由於其能夠直接對比字節流數組

全排序

MapReduce Shuffle階段的排序只針對各個單獨的分區,也就是以前討論到的部分排序
對於每一個分區,其數據是有序的,可是從數據的整體來看,是無序的
如何讓MapReduce產生全局有序的數據呢?
最簡單的辦法是隻使用一個分區,可是這就喪失了MapReduce並行計算的特性,必須在單臺機器上處理全部數據

事實上,除了使用一個分區,還有另一種方式既能夠實現全局有序,也能夠充分利用到MapReduce的並行計算能力
可是這個方法須要作一些額外的工做

思考一下,在部分排序中,每一個分區內的數據都是有序的,可是從分區的角度看就是無序的了
若是咱們可以確保分區也是有序的呢?,例如分區1保存1-100的數據,分區2保存101-200的數據,一次類推
那麼從分區的角度看,各個分區之間是有序的,而分區內部的數據也是天然有序的
從而就作到了數據的全局有序

可是在這個過程當中須要注意一個狀況:如何確保每一個分區的數據量分配是均勻的?
由於在實際場景中,1-100中包含的數據可能有1000個,而101-200的數據只有50個,這就形成了數據傾斜的問題

爲了解決這個問題,咱們一般須要深刻的瞭解數據的組成特性
可是在海量數據的狀況下,不可能對所有數據進行檢查
這時咱們可使用採樣的方式來進行

採樣的核心思想是隻查看一小部分的鍵,得到鍵的近似分佈由此構建分區

Hadoop中已經內置了若干的採樣器,接口以下:

public interface Sampler<K,V>{
    K[] getSample(InputFormat<K,V> inf,Job job) throw IOException,InterruptedException;
}

可是一般不會直接使用這個getSample接口,而是由InputSampler的writePartitionFile方法調用
目的是建立一個SequenceFile來存儲定義分區的鍵

public static <K,V> writePartitionFile(Job job,Sampler<K,V> sampler) throw IOException,ClassNotFoundException,InterruptedException

該SequenceFile會被TotalOrderPartitioner使用來爲做業建立分區:

//設置分區類爲TotalOrderPartitioner
job.setPartitionerClass(TotalOrderPartitioner.class);
//使用隨機採樣器,採樣率爲0.1,最大樣本數和最大分區數爲10000何10,任意一個條件知足以後即刻中止採樣
InputSampler.Sampler<IntWritable,Text> sampler = new InputSampler.RandomSampler<IntWritable,Text>(0.1,10000,10);
//使用該採樣器建立定義分區鍵的SequenceFile
InputSampler.writePartitionFile(job,sampler);
///得到該SequenceFile並加入分佈式緩存中共享
String partitionFile = TotalOrderPartitioner.getPartitionFile(conf);
URI uri = new URI(partitionFile);
jov.addCacheFile(uri);

這個採樣器將會運行在客戶端,因此會從集羣上下載數據,須要注意下載的數據量不要太大否則運行時間好久
使用該方法還能夠自由的設置reducer的任務數,即分區數,經過mapreduce.job.reducers來設置最後須要產生多少個均勻的分區

RandomSampler是一種比較通用的採樣器,除了它,還有另一些例如:

  • SplitSampler:只採樣一個分片中的前n條記錄,沒有從所有分片中普遍採樣,因此不適合已經排好序的數據
  • IntervalSampler:以必定的間隔從分片中選擇鍵,所以很適合排過序的數據

二次排序

二次排序即爲對數據的值進行排序,其實在Hadoop I/O的序列化小節中
就已經討論過這個問題了,具體案例能夠參考:Hadoop提交做業自定義排序和分組

Join鏈接

使用MapReduce進行鏈接操做的方式和技巧取決於數據集的規模和結構
若是一個數據集很大,另一個很小,徹底可使用MapReduce中的DistributedCache
將小數據集分發到各個節點上

若是兩個數據集都很大,那麼又能夠分爲Map端的Join和Reduce端的Join

Map端的Join

Map端的Join操做會在數據到達map函數以前執行
爲了達到這個目的,Map端的輸入數據必須:

1.兩個數據集被劃分爲數量相同的分區
2.兩個數據集按照相同的鍵進行排序

因爲Map能夠設置以前執行的多個做業的輸出爲其輸入,按照以上條件
此時輸入數據應該知足:

1.兩個做業有相同的reduce數量
2.鍵是相同的且不可分割

知足Map端Join操做的要求以後,能夠利用org.apache.hadoop.mapreduce.join包中的ComsiteInputFormat類在map函數以前執行join操做

Reduce端的Join

比起Map端,Reduce端的Join對數據的要求沒有那麼高,利用Shuffle相同鍵的記錄會被輸入到同一個reducer(分區)中的特性
Reducer端能夠自然進行Join操做,可是因爲數據要通過Shuffle過程,因此效率每每比Map端的Join要低

並且在Reduce端的Join中,還能夠利用到以前討論的二次排序
有時候join鏈接須要一個數據集先於另外一個數據集到達reduce函數,這時候咱們能夠聽過二次排序對數據的值作一個標號
先要達到的數據標號設置爲0,另一個數據集設置爲1,而後根據這個標號進行排序就能夠實現讓想要的數據集先一步到達reduce

邊數據分佈

所謂的邊數據(Side Data)能夠理解爲MapReduce做業執行過程當中
全部任務都有可能要使用到的只讀的的數據,用以輔助處理主數據

使用JobConfiguration

Configuration類的各類setter方法能夠方便的設置一些鍵值對類型的數據
用戶能夠經過getConfiguration方法得到配置的信息

這種方式足以應對不少只須要設置一些屬性的場合
可是其缺點是:

  • 只適合相似屬性設置的小數據
  • 對於很複雜的對象,用戶須要本身設置序列化和反序列化
  • 每次讀取配置的時候全部設置都將讀取內存,無論有沒有用到

DistributedCache

分佈式緩存機制在做業運行以前將用戶設置的數據拷貝到各個節點中以供使用
緩存的容量大小默認爲10G,能夠經過yarn.nodemanager.localizer.cache.target-size-mb來配置(以字節爲單位)

具體的使用方式參考:MapReduce中的DistributedCache

做者:@小黑