MapReduce 是一種運行於成百上千臺機器上的處理數據的框架,目前被google,Hadoop等多家公司或社區普遍使用。這種計算框架是很是強大,但它沒有提供一個處理所謂「big data」的通用,廣泛的情形,因此它能很好的解決一些問題,在處理某些問題上也存在挑戰。這本書教給你在什麼問題上適合使用MapReduce和怎樣高效的使用它。html
初次接觸時,不少人沒有意識到MapReduce是一種計算框架而不只僅是一個工具。你必須找到合適的map和reduce框架做爲合適的解決方案,而這種map和reduce可能並不適用於其它情形。MapReduce算不上是特徵,更確切的說是一種規則。java
這使得問題解決起來更容易也更難了。它明確告訴你能作什麼不能作什麼,比起一般使用的方法,它提供的可選方式也少了。同時,理解怎樣用規則解決問題須要靈活的頭腦和思惟的轉變。python
開始學習MapReduce很是像遞歸的學習:在找出問題的遞歸情形上存在挑戰,當你一旦發現,問題也就變得清晰,簡明,優雅。在不少狀況下,你必須考慮到被MapReduce job,特別是內部集羣網絡使用的的系統資源的利用率。這裏要權衡的是:指定的MapReduce框架是否有能力在分佈式計算中處理數據,而不去考慮併發,魯棒性,數據規模及其它的挑戰。使用獨特的系統,獨特的解決問題的方式,便引入了設計模式。c++
什麼是MapReduce的設計模式?它是一種使用MapReduce解決常規數據處理問題的模板。模式不會限定於特定的領域,好比:文本處理或圖形分析。它只是一種處理問題的通用手段。web
使用設計模式就是使用被久經考驗並證實是好的設計原則來更好的構建你的軟件。算法
因爲一些緣由,設計一款好的軟件存在挑戰性,想在MapReduce裏實現一種好的設計也是類似的。就像好的程序猿可以把創做出很差的軟件歸因於糟糕的設計,好的程序猿也能創做出差的MapReduce算法。使用MapReduce時,咱們不只要保證代碼的整潔和可維護性,還要保證分佈於成百上千各節點上所要計算的TB甚至PB數量級數據的job的性能。除此以外,這個job還可能存在與共享集羣上其餘job存在競爭。這使得使用MapReduce時選擇一種正確的設計變得極其重要,用的好的話,在性能上能得到幾個數量級的提升。在咱們深刻討論設計模式以前,先說說怎樣和爲何設計模式和MapReduce一塊兒使用會頗有意義,還有須要具有的知識及從哪得到。sql
衆多的設計模式已經讓開發者們悠哉悠哉了不少年。他們是通用,可複用的解決問題的工具,因此開發者能夠在短期內想出克服障礙的方法並進行下面的開發。它們也是有經驗的開發者們用一種簡潔的方式把他們的知識傳遞給下一代的手段。數據庫
在軟件工程設計模式領域,一個重要的里程碑就是這本書:Design Patterns: Elements of Reusable Object-Oriented Software, by Gamma etal. (Addison-Wesley Professional, 1995)。衆人所知的「Gang of Four」book。在這本很是流行的書裏,沒有一個模式是新的,而且已經使用了好多年。它仍然很是有影響力的緣由是:做者花時間證實了最重要的設計模式在面向對象編程中廣泛適用。自從這本書1994年發行至今,不少對設計感興趣的人讓這些模式口口相傳,或出如今各類學術會議,期刊,互聯網。apache
設計模式已經經得起時間考驗並顯示了正確的抽象度:不肯定性,即有太多的的模式要記憶而且很難適用一個問題。不通用性,大量工做涌入一個模式去運行。這種抽象度也帶來一個重要的好處:提供給開發者口頭和書面交流的通用語言。簡單的說「abstract factory」要比去解釋抽象工廠是什麼是什麼要容易的多。而且,看到別人寫的實現抽象工廠的代碼時,也就大致知道要完成什麼樣的工做。編程
MapReduce設計模式在局部的問題和狀況下也適用於這樣的規則。它提供通用的框架用於解決數據計算問題,但不須要指定問題所在的領域。有經驗的MapReduce開發者能把解決通用問題的知識傳遞給初學者。這點是極其重要的,由於MapReduce是一種新興的技術而且被快速的被接受,天天都有不少程序猿加入社區。MapReduce設計模式也提供了一種團隊合做解決MapReduce問題時的公用語言。設想一種情形,開發者說應該使用「reduce-side join」代替「map-side replicated join」比解釋它們各自底層的實現機制更簡明。
如今MapReduce世界的狀態跟1994年前面向對象世界很類似。模式已經經過博客,站點好比StackOverflow,深刻其它書籍中,深刻全球先進的技術開發團隊,組織中。寫這本書的目的不是提供如今尚未人見過的開創性的MapReduce問題的處理方式,而是收集了各自領域有經驗的開發者分享的模式。
註釋:目前提供的設計模式,真實的MapReduce範例方面的經驗在使用的時候仍然須要深刻理解。當你用這本書或其餘地方看到的模式解決一個新的問題的時候,密切注意」Applicability」部分。
對大多數部分,書中的模式是平臺無關性的。MapReduce被Google以一種沒有任何實際源代碼的範例發佈,無論做爲獨立系統(例如:Hadoop,Disco,Amazon Elastic MapReduce)仍是查詢語言系統(例如:MongoDB, Greenplum DB, Aster Data),已經被實現了好幾回。爲了讓設計模式更傾向於通用,咱們站在hadoop角度寫這本書。不少模式能夠應用於其餘系統,好比MoongoDB,由於他們概念上的架構是一致的。可是,很對技術細節上因爲實現的不一樣而不一樣。「Gang of Four「的設計模式使用c++寫的,但開發者更傾向於但願用比較流行的語言好比ruby,python。書中的模式對各類系統是可用的,而不只僅對hadoop自己。你能夠用書中的例子代碼做爲本身開發的嚮導。
爲何說寫本MapReduce設計模式的書是個很好的想法呢?確切的一點,社區的強勢和範例的廣泛使用度已經達到了能夠寫一些綜合的開發者能夠共享的設計模式的程度。幾年前,當hadoop還在搖籃裏的時候,還沒被人們理解它可以勝任什麼。可是MapReduce被採納的速度是值得注意的。它來自於2004年google一篇有趣的論文,並被迅速接受,在2012年成爲業界分佈式數據處理的標準。
MapReduce具體的起源是有爭議的,可是咱們最早想起的是2004年下載的那篇Jeffrey Dean and SanjayGhemawat寫的論文:「MapReduce: Simplified Data Processing on LargeClusters」。這篇文章描述了google如何分割,處理,整合他們使人難以置信的大數據集。
隨後,開源軟件先驅Doug Cutting,開始作MapReduce的實現用於解決可擴展數據,去構建開源搜索引擎Nutch。隨後yahoo也開始投入研究,Hadoop分割出來成爲apache頂級項目。現在,不少我的和組織都在爲hadoop項目作貢獻。每一次新的發行版都有新的功能和性能上的提高。
幾個其餘開源項目都是在hadoop基礎之上建設的,而且數量還在增加。許多比較流行的,好比:pig,hive,hbase,Mahout,zookeeper。Doug Cutting和其餘hadoop專家已經屢次提到,hadoop將會成爲分佈式應用程序運行的分佈式操做系統的核心。在這本書裏,咱們將會用最少通用的規範解釋hadoop ecosystem和java MapReduce的例子。在一些章節模式類似的部分,咱們將會概述pig和hive sql的對比。
這部分的重點是快速複習hadoop中的MapReduce,由於書中的代碼例子是基於hadoop的。初學者能夠進一步參考的資源有Tom White’s Hadoop:The Definitive Guide或者Apache Hadoop website。這些書能夠幫你搭建一個能夠運行書中例子的完整的環境。
Hadoop MapReduce jobs能夠切分紅一系列運行於分佈式集羣中的map和reduce任務,每一個任務只運行所有數據的一個指定的子集,以此達到整個集羣的負載平衡。Map任務一般加載,解析,轉換,過濾數據,每一個reduce處理map輸出的一個子集。Reduce任務會去map任務端copy中間數據來完成分組,聚合。用這樣一種簡明直接的範式,從簡單的數值聚合到複雜的join操做和笛卡爾操做,解決這麼普遍的問題真的使人難以置信。
mapReduce 的輸入是hdfs上存儲的一系列文件集。在hadoop中,這些文件被一種定義瞭如何分割一個文件成分片的input format來分割,一個分片是一個文件基於字節的能夠被一個map任務加載的一個塊。
每一個map任務被分爲如下階段:record reader,mapper,combiner,partitioner。Map任務的輸出叫中間數據,包括keys和values,發送到reduce端。Reduce任務分爲如下階段:shuffle,sort,reduce,output format。運行map任務的節點會盡可能選擇數據所在的節點。這種狀況下,不會出現網絡傳輸,在本地節點就能夠完成計算。
Record reader會把根據input fromat生成輸入分片翻譯成records。Record reader的目的是把數據解析成記錄,而不是解析數據自己。它把數據以鍵值對的形式傳遞給mapper。一般狀況下鍵是偏移量,值是這條記錄的整個字節塊。自定義record reader 超出本書的範圍。咱們假設你有了處理數據適合的record reader。
Map階段,會對每一個從record reader處理完的鍵值對執行用戶代碼,這些鍵值對又叫中間鍵值對。鍵和值的選擇不是任意的,而且對MapReduce job的成功很是重要。鍵會用來分組,值是reducer端用來分析的數據。這本書會在設計模式方面提供大量的細節去解釋鍵值對的選擇。設計模式之間一個主要的區別是鍵值對的語義。
Combiner 是一個map階段分組數據,可選的,局部reducer。它根據用戶提供的方法在一個mapper範圍內根據中間鍵去聚合值。例如:數的總和是各個部分數量的和,你能夠先計算中間的數目,最後再把全部中間數目加起來。不少狀況下,這樣能減小數據的網絡傳輸量。發送(hello world,1)三次很顯然要比發送(hello world,3)須要更多的網絡傳輸字節量。Combiners能夠被普遍的模式替換。不少hadoop開發者忽視combiner,但能得到更好的性能。咱們須要指出的是哪種模式用combiner有好處,哪種不能用combiner。Combiner不會保證總會執行,因此它是一個總體邏輯。
Partitioner會獲取從mapper(或combiner)來的鍵值對,並分割成分片,每一個reducer一個分片。默認用哈希值,典型使用md5sum。而後partitioner根據reduce的個數執行取餘運算:key.hashCode() % (number of reducers)。這樣能隨即均勻的根據key分發數據到reduce,但仍然要保證不一樣mapper的相同key要到同一個reduce。Partitioner也能夠自定義,使用更高級的樣式,例如排序。然而,更改partitioner不多用。Partitioner的每一個map的數據會寫到本地磁盤,並等待對應的reducer檢測,拿走數據。
Reduce任務開始於shuffle和sort階段。這一階段獲取partitioner的輸出文件,並下載到reduce運行的本地機器。這些分片數據會根據key合併,排序成一個大的數據文件。排序的目的是讓相同的key相鄰,方便在reduce階段值得迭代處理。這一階段不能自定義,由框架自動處理。須要作的只是key的選擇和能夠自定義個用於分組的比較器。
Reduce 任務會把分組的數據做爲輸入並對每一個key組執行reduce方法代碼。方法會傳遞key和能夠相關的全部值得迭代集合。不少的處理會在這個方法裏執行,也就會有不少的模式。一旦reduce方法完成,會發送0或多個鍵值對到output format。跟map同樣,不一樣的reduce依據不一樣的邏輯情形而不一樣。
Output format會把reduce階段的輸出鍵值對根據record writer寫到文件裏。默認用tab分割鍵值對,用換行分割不一樣行。這裏也能夠自定義爲更豐富的輸出格式,最後,數據被寫到hdfs,自定義 output format也超出本書範圍。
如今你已經清楚了整個mapreduce的過程,下面看一個簡單的例子。單詞計數程序是mapreduce界權威的例子。簡單直接代表mapreduce處理的高效。不少人抱怨單詞計數例子被過分使用,希望書中後面的部分能彌補這一點。
在這個例子中,咱們使用StackOverflow上用戶提交的word count。拿出文本域的一部分處理,咱們就能夠看到每一個單詞出現的次數。其中一條記錄是這樣的:
<row Id="8189677" PostId="6881722" Text="Have you looked at Hadoop?"
CreationDate="2011-07-30T07:29:33.343" UserId="831878" />
Id是8189677,郵編6881722,用戶編碼831878。郵編和用戶編碼能夠做爲其它數據部分的外鍵。後面的join模式中會講怎樣作數據集的join。
下面一段代碼是一段框架代碼,使用它能夠編寫不一樣的mapreduce job並運行。這段代碼是通用的,被譽爲「boiler plate」。你會看到咱們所講的大部分模式都用這套框架。
這段代碼來自hadoop中「word count」例子:
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Map;
import com.sql.bigdata.hadoop.common.MRDPUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.commons.lang.StringEscapeUtils;
public class CommentWordCount {
public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {
...
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
...
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: CommentWordCount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "StackOverflow Comment Word Count");
job.setJarByClass(CommentWordCount.class);
job.setMapperClass(WordCountMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
框架的目的是編排job。Main方法的前幾行用於解析命令行參數。而後用計算類,輸入輸出路徑設置job。大體如此。僅僅保證類名正確和輸出key,value types匹配reduce(此處原文是map,本人認爲有誤)的輸出類型。
你將看到不一樣模式之間在job.setCombinerClass上會有不一樣,在一些狀況下,combiner因爲reduce的特殊性而不能使用。還有的狀況下,combiner類不一樣於reduce類。在「word count」中,combiner的使用簡單,高效。
下面是解析,準備數據的mapper代碼。先清除掉單引號,非字符用空格代替等文本清洗後,文本被分割成單詞列表,中間key就是單詞自己,value爲1。這意味着每一個單詞都會看到一次。若是一行裏某單詞出現兩次,咱們也會在單詞列表中看到這個鍵值對「單詞:1「兩次。最後,全部的數據都會加到每一個單詞的全局計數字段裏去。
public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // Parse the input string into a nice map Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString()); // Grab the "Text" field, since that is what we are counting over String txt = parsed.get("Text"); // .get will return null if the key is not there if (txt == null) { // skip this record return; } // Unescape the HTML because the data is escaped. txt = StringEscapeUtils.unescapeHtml(txt.toLowerCase()); // Remove some annoying punctuation txt = txt.replaceAll("'", "");// remove single quotes (e.g., can't) txt = txt.replaceAll("[^a-zA-Z]", " "); // replace the rest with a space // Tokenize the string by splitting it up on whitespace into // something we can iterate over, // then send the tokens away StringTokenizer itr = new StringTokenizer(txt); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
第一個方法, MRDPUtils.transformXmlToMap,是一個解析數據時出於友好提示的打印幫助信息的方法。你會看到咱們的例子常常用到。它主要獲取一行xml格式的記錄(有着透明的格式)並根據屬性把值放入map中。
下面,要把注意力放到wordcountMapper類,代碼有一點複雜。大多數工做都在這裏作。第一個要注意的是父類的類型:
Mapper<Object,Text,Text, IntWritable>
它們分別是,輸入鍵,輸入值,輸出鍵,輸出值。咱們不關心輸入鍵,因此用Object。由於咱們一行一行以文本格式讀入數據,因此輸入值爲Text(hadoop中特殊的string類型)。輸出鍵和值是Text和IntWritable是由於咱們使用單詞做爲鍵,出現次數做爲值。
Notice:mapper的輸入key和value是由job的配置類FileInputFormat指定的。默認實現類是TextInputformat,字節偏移量做爲鍵,是LongWritable類型,文本值做爲值,是Text類型。使用不一樣的input format會是不一樣的鍵值類型。
直到代碼底部StringTokenizer的使用,咱們僅僅是清洗文本。咱們反轉義數據,是由於文本被存儲成轉義的方式以方便xml解析。下一步,咱們刪除雜散的標點符號以便統一認爲「hadoop!」」hadoop?」」hadoop」是相同的單詞。最後,給每一個單詞賦值1,表示出現次數。Mapreduce框架而後運行shuffle和sort,輸出的鍵值對給reduce任務。
最後看下reducer代碼,相對來講比較簡單。Reduce方法,每次以相同key爲一組來調用,在這裏是以每一個單詞爲一組。經過迭代數值類型的值得集合,來計算求和。最終的結果就是每一個單詞出現的次數。
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
跟mapper同樣,咱們經過父類模板指定了輸入輸出類型。輸入鍵值必須匹配map的輸出鍵值。Reduce的輸出鍵值必須匹配job配置的FileOutputFormat中的內容。在這裏,咱們使用默認的TextOutputFormat,能將任意兩個writable類型對象做爲輸出。
Reduce方法跟map的簽名不一樣,那就是,在全部值(同一key)上進行迭代而不是僅處理一個值。那是由於你能夠迭代同一個key的全部值。Reduce中的key是很是重要的,相比於map中的key來講。
傳遞給context.Write的數據會寫到輸出文件。每一個reduce一個文件,若是你想把它們合起來還必須進一步處理。
Hive和pig在hadoop生態系統中對MapReduce設計模式來說不是很須要。然而,咱們也要找機會解釋爲何設計模式仍然很重要。
Pig和hive是更高級抽象的MapReduce。它們提供的接口裏對map或reduce什麼都不作,可是系統會解析高級語言成爲mapreduce jobs。跟關係型數據庫中解析sql爲具體數據操做的行爲的執行計劃很類似,hive和pig翻譯它們的語句爲MapReduce操做。
正如在這本書裏相同的部分咱們將要看到的,pig和hiveql相比於原生java寫的hadoop程序來講,顯得更爲整潔。例如,用java解釋全局排序要寫幾頁的代碼,但用pig僅僅幾行代碼。
爲何在有可選擇的hive和pig時,咱們任然寫java mapreduce呢?爲何本書的做者花了不少時間解釋怎麼用上百行代碼實現一件事情,而相同的事情也能夠用幾行代碼解決?這裏有兩個主要的緣由。
第一,這樣能夠從更底層理解MapReduce的工做原理。若是開發者理解了pig是怎樣運行reduce端join,他將會作出更明智的選擇。不理解MapReduce原理而去使用pig和hive能夠致使危險的行爲。你能夠從更高一層的接口受益並不表明你能夠忽視某些細節。在大型MapReduce集羣上更應遵照必要的規則。
第二,到目前爲止,hive和pig尚未徹底實現應有的功能和成熟。很明顯還沒達到他們的所有潛能。如今,他們不能把用java寫的MapReduce徹底實現。這將隨着發行版本的不斷髮行而改變一些。好比說,在pig0.6版本,你的團隊50%的分析能用pig,而在0.9版本中,會達到90%。伴隨着每一版的發行,隨之而來的也有更高一層的抽象。有趣的事情是,像軟件工程這類事物的趨勢,剩下的10%問題不能用更高級的抽象解決會受到到多數人的批評和反對。這就像,java語言已經成爲最流行的工具語言,而一些人實際上還在用匯編語言。
當你可以用hive或pig寫MapReduce語言的時候,一些主要的高級抽象帶來的好處有,可讀性,可維護性,開發週期和自動優化。不多出現常見的性能問題是間接的嚴重的緣由致使的。這些分析成批運行並已經運行了幾分鐘,那麼多一兩分鐘又有什麼關係?有些時候,hive或pig中的執行計劃的優化比你寫的代碼要好的多。在處理小部分數據時,hive或pig會增長的額外的時間有點多,這時你應該寫java MapReduce。
Pig和hive比其它東西更能影響MapReduce的設計模式。他們出現的新特性能夠成爲MapReduce中的設計模式。一樣的,更多的設計模式也會因MapReduce而發展,不少流行的模式會成爲更高層次優秀的抽象。
Pig和hive有他們本身的模式,隨着專家們解決愈來愈多的問題,他們也會記錄它們。Hive得益因而用了幾十年的sql樣式,但不是全部的sql在hive中都適用,反過來也同樣。可能隨着這些平臺愈來愈流行,cookbook和design pattern類的書會寫它們。
摘錄地址:http://blog.csdn.net/cuirong1986/article/details/8443841