- 編寫可擴展、分佈式的數據密集型程序和基礎知識
- 理解Hadoop和MapReduce
- 編寫和運行一個基本的MapReduce程序
一、什麼是Hadoop
Hadoop是一個開源的框架,可編寫和運行分佈式應用處理大規模數據。
Hadoop不同凡響之處在於如下幾點:
- 方便——Hadoop運行在由通常商用機器構成的大型集羣上,或者雲計算服務之上;
- 健壯——Hadoop致力於在通常商用硬件上運行,其架構假設硬件會頻繁地出現失效;
- 可擴展——Hadoop經過增長集羣節點,能夠線性地擴展以處理更大的數據集;
- 簡單——Hadoop運行用戶快速編寫出高效的並行代碼。
二、瞭解分佈式系統和Hadoop
理解分佈式系統(向外擴展)和大型單機服務器(向上擴展)之間的對比,考慮現有I/O技術的性價比。
理解Hadoop和其餘分佈式架構(SETI@home)的區別:
Hadoop設計理念是代碼向數據遷移,而SETI@home設計理念是數據遷移。
要運行的程序在規模上比數據小几個數量級,更容易移動;此外,在網絡上移動數據要比在其上加載代碼更花時間,不如讓數據不動而將可執行代碼移動到數據所在機器上去。
三、比較SQL數據庫和Hadoop
SQL(結構化查詢語言)是針對結構化數據設計的,而Hadoop最初的許多應用針對的是文本這種非結構化數據。讓咱們從特定的視角將Hadoop與典型SQL數據庫作更詳細的比較:
- 用向外擴展代替向上擴展——擴展商用關係型數據庫的代價會更加昂貴的
- 用鍵/值對代替關係表——Hadoop使用鍵/值對做爲基本數據單元,可足夠靈活地處理較少結構化的數據類型
- 用函數式編程(MapReduce)代替聲明式查詢(SQL)——在MapReduce中,實際的數據處理步驟是由你指定的,很相似於SQL引擎的一個執行計劃
- 用離線處理代替在線處理——Hadoop是專爲離線處理和大規模數據分析而設計的,並不適合那種對幾個記錄隨機讀寫的在線事務處理模式
四、理解MapReduce
MapReduce是一種數據處理模型,最大的優勢是容易擴展到多個計算節點上處理數據;
在MapReduce模型中,數據處理原語被稱爲mapper和reducer;
分解一個數據處理應用爲mapper和reducer有時是繁瑣的,可是一旦一MapReduce的形式寫好了一個應用程序,僅需修改配置就能夠將它擴展到集羣中幾百、幾千,甚至幾萬臺機器上運行。
[動手擴展一個簡單程序]
少許文檔處理方式:對於每一個文檔,使用分詞過程逐個提取單詞;對於每一個單詞,在多重集合wordcount中的相應項上加1;最後display()函數打印出wordcount中的全部條目。
大量文檔處理方式:將工做分佈到多臺機器上,每臺機器處理這些文檔的不一樣部分,當全部機器都完成時,第二個處理階段將合併這些結果。
一些細節可能會妨礙程序按預期工做,如文檔讀取過量致使中央存儲服務器的帶寬性能跟不上、多重集合wordcount條目過多超過計算機的內存容量。此外,第二階段只有一個計算機處理wordcount任務,容易出現瓶頸,因此能夠採用分佈的方式運轉,以某種方式將其分割到多臺計算機上,使之可以獨立運行,即須要在第一階段後將wordcount分區,使得第二階段的每臺計算機僅需處理一個分區。
爲了使它工做在一個分佈式計算機集羣上,須要添加如下功能:
- 存儲文件到許多計算機上(第一階段)
- 編寫一個基於磁盤的散列表,使得處理不受內存容量限制
- 劃分來自第一階段的中間數據(即wordcount)
- 洗牌這些分區到第二階段中合適的計算機上
MapReduce程序執行分爲兩個主要階段,爲mapping和reducing,每一個階段均定義爲一個數據處理函數,分別稱爲mapper和reducer。在mapping階段,MapReduce獲取輸入數據並將數據單元裝入mapper;在reduce階段,reducer處理來自mapper的全部輸出,並給出最終結果。簡而言之,mapper意味着將輸入進行過濾與轉換,使reducer能夠完成聚合。
另外,爲了擴展分佈式的單詞統計程序,不得不編寫了partitioning和shuffling函數。
在MapReduce框架中編寫應用程序就是定製化mapper和reducer的過程,如下是完整的數據流:
- 應用的輸入必須組織爲一個鍵/值對的列表list(<k1,v1>);
- 含有鍵/值對的列表被拆分,進而經過調用mapper的map函數對每一個單獨的鍵/值對<k1,v1>進行處理;
- 全部mapper的輸出被聚合到一個包含<k2,v2>對的巨大列表中;
- 每一個reducer分別處理每一個被聚合起來的<k2,list(v2)>,並輸出<k3,v3>。
五、用Hadoop統計單詞——運行第一個程序
- Linux操做系統
- JDK1.6以上運行環境
- Hadoop操做環境
Usage:hadoop [—config configdir] COMMAND
這裏COMMAND爲下列其中一個:
namenode -format 格式化DFS文件系統
secondarynamenode 運行DFS的第二個namenode
namenode 運行DFS的namenode
datanode 運行一個DFS的datanode
dfsadmin 運行一個DFS的admin客戶端
fsck 運行一個DFS文件系統的檢查工具
fs 運行一個普通的文件系統用戶客戶端
balancer 運行一個集羣負載均衡工具
jobtracker 運行MapReduce的jobtracker節點
pipes 運行一個pipes做業
tasktracker 運行一個MapReduce的tasktracker節點
job 處理MapReduce做業
version 打印版本
jar <jar> 運行一個jar文件
distcp <srcurl> <desturl> 遞歸地複製文件或者目錄
archive -archiveName NAME <src>* <dest> 生成一個Hadoop檔案
daemonlog 獲取或設置每一個daemon的log級別
CLASSNAME 運行名爲CLASSNAME的類大多數命令會在使用w/o參數
時打出幫助信息。
運行單詞統計示例程序的命令形式以下:
hadoop jar hadoop-*-examples.jar wordcount [-m <maps>] [-r reduces] input output
編譯修改後的單詞統計程序的命令形式以下:
javac -classpath hadoop-*-core.jar -d playground/classes playground/src/WordCount.java
jar -cvf playground/src/wordcount.jar -C playground/classes/
運行修改後的單詞統計程序的命令形式以下:
hadoop jar playground/wordcount.jar org.apache.hadoop.examples.WordCount input output
代碼清單 WordCount.java
1 public class WordCount {
2
3 public static class TokenizerMapper
4 extends Mapper<Object, Text, Text, IntWritable>{
5
6 private final static IntWritable one = new IntWritable(1);
7 private Text word = new Text();
8
9 public void map(Object key, Text value, Context context
10 ) throws IOException, InterruptedException {
11 StringTokenizer itr = new StringTokenizer(value.toString()); //(1)使用空格進行分詞
12 while (itr.hasMoreTokens()) {
13 word.set(itr.nextToken()); //(2)把Token放入Text對象中
14 context.write(word, one);
15 }
16 }
17 }
18
19 public static class IntSumReducer
20 extends Reducer<Text,IntWritable,Text,IntWritable> {
21 private IntWritable result = new IntWritable();
22
23 public void reduce(Text key, Iterable<IntWritable> values,
24 Context context
25 ) throws IOException, InterruptedException {
26 int sum = 0;
27 for (IntWritable val : values) {
28 sum += val.get();
29 }
30 result.set(sum);
31 context.write(key, result); //(3)輸出每一個Token的統計結果
32 }
33 }
34
35 public static void main(String[] args) throws Exception {
36 Configuration conf = new Configuration();
37 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
38 if (otherArgs.length < 2) {
39 System.err.println("Usage: wordcount <in> [<in>...] <out>");
40 System.exit(2);
41 }
42 Job job = new Job(conf, "word count");
43 job.setJarByClass(WordCount.class);
44 job.setMapperClass(TokenizerMapper.class);
45 job.setCombinerClass(IntSumReducer.class);
46 job.setReducerClass(IntSumReducer.class);
47 job.setOutputKeyClass(Text.class);
48 job.setOutputValueClass(IntWritable.class);
49 for (int i = 0; i < otherArgs.length - 1; ++i) {
50 FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
51 }
52 FileOutputFormat.setOutputPath(job,
53 new Path(otherArgs[otherArgs.length - 1]));
54 System.exit(job.waitForCompletion(true) ? 0 : 1);
55 }
56 }
在(1)的位置上wordcount以默認配置使用了Java的StringTokenizer,這裏僅基於空格來分詞。爲了在分詞過程當中忽略標準的標點符號,將它們加入到stringTokenizer的定界符列表中:
StringTokenizer itr = new StringTokenizer(value.toString(),」 \t\n\r\f,.:;?![]’");
由於但願單詞統計忽略大小寫,把它們轉換爲Text對象前先將全部的單詞都變成小寫:
word.set(itr.nextToken().toLowerCase());
但願僅僅顯示出現次數大於4次的單詞:
if (sum > 4) context.write(key, result);
六、hadoop歷史
創始人:Doug Cutting
2004年左右——Google發表了兩篇論文來論述Google文件系統(GFS)和MapReduce框架。
2006年1月——雅虎聘用Doug,讓他和一個專項團隊一塊兒改進Hadoop,並將其做爲一個開源項目。
[轉載請註明] http://www.cnblogs.com/zhengrunjian/