Hadoop MapReduce 是一個分佈式計算框架,用於編寫批處理應用程序。編寫好的程序能夠提交到 Hadoop 集羣上用於並行處理大規模的數據集。html
MapReduce 做業經過將輸入的數據集拆分爲獨立的塊,這些塊由 map
以並行的方式處理,框架對 map
的輸出進行排序,而後輸入到 reduce
中。MapReduce 框架專門用於 <key,value>
鍵值對處理,它將做業的輸入視爲一組 <key,value>
對,並生成一組 <key,value>
對做爲輸出。輸出和輸出的 key
和 value
都必須實現Writable 接口。java
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
這裏以詞頻統計爲例進行說明,MapReduce 處理的流程以下:git
input : 讀取文本文件;github
splitting : 將文件按照行進行拆分,此時獲得的 K1
行數,V1
表示對應行的文本內容;shell
List(K2,V2)
,其中 K2
表明每個單詞,因爲是作詞頻統計,因此 V2
的值爲 1,表明出現 1 次;Mapping
操做多是在不一樣的機器上並行處理的,因此須要經過 shuffling
將相同 key
值的數據分發到同一個節點上去合併,這樣才能統計出最終的結果,此時獲得 K2
爲每個單詞,List(V2)
爲可迭代集合,V2
就是 Mapping 中的 V2;Reducing : 這裏的案例是統計單詞出現的總次數,因此 Reducing
對 List(V2)
進行歸約求和操做,最終輸出。apache
MapReduce 編程模型中 splitting
和 shuffing
操做都是由框架實現的,須要咱們本身編程實現的只有 mapping
和 reducing
,這也就是 MapReduce 這個稱呼的來源。編程
InputFormat
將輸出文件拆分爲多個 InputSplit
,並由 RecordReaders
將 InputSplit
轉換爲標準的<key,value>鍵值對,做爲 map 的輸出。這一步的意義在於只有先進行邏輯拆分並轉爲標準的鍵值對格式後,才能爲多個 map
提供輸入,以便進行並行處理。api
combiner
是 map
運算後的可選操做,它其實是一個本地化的 reduce
操做,它主要是在 map
計算出中間文件後作一個簡單的合併重複 key
值的操做。這裏以詞頻統計爲例:服務器
map
在遇到一個 hadoop 的單詞時就會記錄爲 1,可是這篇文章裏 hadoop 可能會出現 n 屢次,那麼 map
輸出文件冗餘就會不少,所以在 reduce
計算前對相同的 key 作一個合併操做,那麼須要傳輸的數據量就會減小,傳輸效率就能夠獲得提高。app
但並不是全部場景都適合使用 combiner
,使用它的原則是 combiner
的輸出不會影響到 reduce
計算的最終輸入,例如:求總數,最大值,最小值時均可以使用 combiner
,可是作平均值計算則不能使用 combiner
。
不使用 combiner 的狀況:
使用 combiner 的狀況:
能夠看到使用 combiner 的時候,須要傳輸到 reducer 中的數據由 12keys,下降到 10keys。下降的幅度取決於你 keys 的重複率,下文詞頻統計案例會演示用 combiner 下降數百倍的傳輸量。
partitioner
能夠理解成分類器,將 map
的輸出按照 key 值的不一樣分別分給對應的 reducer
,支持自定義實現,下文案例會給出演示。
這裏給出一個經典的詞頻統計的案例:統計以下樣本數據中每一個單詞出現的次數。
Spark HBase Hive Flink Storm Hadoop HBase Spark Flink HBase Storm HBase Hadoop Hive Flink HBase Flink Hive Storm Hive Flink Hadoop HBase Hive Hadoop Spark HBase Storm HBase Hadoop Hive Flink HBase Flink Hive Storm Hive Flink Hadoop HBase Hive
爲方便你們開發,我在項目源碼中放置了一個工具類 WordCountDataUtils
,用於模擬產生詞頻統計的樣本,生成的文件支持輸出到本地或者直接寫到 HDFS 上。
項目完整源碼下載地址:hadoop-word-count
想要進行 MapReduce 編程,須要導入 hadoop-client
依賴:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency>
將每行數據按照指定分隔符進行拆分。這裏須要注意在 MapReduce 中必須使用 Hadoop 定義的類型,由於 Hadoop 預約義的類型都是可序列化,可比較的,全部類型均實現了 WritableComparable
接口。
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\t"); for (String word : words) { context.write(new Text(word), new IntWritable(1)); } } }
WordCountMapper
對應下圖的 Mapping 操做:
WordCountMapper
繼承自 Mappe
類,這是一個泛型類,定義以下:
WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { ...... }
mapping
輸入 key 的類型,即每行的偏移量 (每行第一個字符在整個文本中的位置),Long
類型,對應 Hadoop 中的 LongWritable
類型;mapping
輸入 value 的類型,即每行數據;String
類型,對應 Hadoop 中 Text
類型;mapping
輸出的 key 的類型,即每一個單詞;String
類型,對應 Hadoop 中 Text
類型;mapping
輸出 value 的類型,即每一個單詞出現的次數;這裏用 int
類型,對應 IntWritable
類型。在 Reduce 中進行單詞出現次數的統計:
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count += value.get(); } context.write(key, new IntWritable(count)); } }
以下圖,shuffling
的輸出是 reduce 的輸入。這裏的 key 是每一個單詞,values 是一個可迭代的數據類型,相似 (1,1,1,...)
。
組裝 MapReduce 做業,並提交到服務器運行,代碼以下:
/** * 組裝做業 並提交到集羣運行 */ public class WordCountApp { // 這裏爲了直觀顯示參數 使用了硬編碼,實際開發中能夠經過外部傳參 private static final String HDFS_URL = "hdfs://192.168.0.107:8020"; private static final String HADOOP_USER_NAME = "root"; public static void main(String[] args) throws Exception { // 文件輸入路徑和輸出路徑由外部傳參指定 if (args.length < 2) { System.out.println("Input and output paths are necessary!"); return; } // 須要指明 hadoop 用戶名,不然在 HDFS 上建立目錄時可能會拋出權限不足的異常 System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME); Configuration configuration = new Configuration(); // 指明 HDFS 的地址 configuration.set("fs.defaultFS", HDFS_URL); // 建立一個 Job Job job = Job.getInstance(configuration); // 設置運行的主類 job.setJarByClass(WordCountApp.class); // 設置 Mapper 和 Reducer job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 設置 Mapper 輸出 key 和 value 的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 設置 Reducer 輸出 key 和 value 的類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 若是輸出目錄已經存在,則必須先刪除,不然重複運行程序時會拋出異常 FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME); Path outputPath = new Path(args[1]); if (fileSystem.exists(outputPath)) { fileSystem.delete(outputPath, true); } // 設置做業輸入文件和輸出文件的路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, outputPath); // 將做業提交到羣集並等待它完成,參數設置爲 true 表明打印顯示對應的進度 boolean result = job.waitForCompletion(true); // 關閉以前建立的 fileSystem fileSystem.close(); // 根據做業結果,終止當前運行的 Java 虛擬機,退出程序 System.exit(result ? 0 : -1); } }
須要注意的是:若是不設置 Mapper
操做的輸出類型,則程序默認它和 Reducer
操做輸出的類型相同。
在實際開發中,能夠在本機配置 hadoop 開發環境,直接在 IDE 中啓動進行測試。這裏主要介紹一下打包提交到服務器運行。因爲本項目沒有使用除 Hadoop 外的第三方依賴,直接打包便可:
# mvn clean package
使用如下命令提交做業:
hadoop jar /usr/appjar/hadoop-word-count-1.0.jar \ com.heibaiying.WordCountApp \ /wordcount/input.txt /wordcount/output/WordCountApp
做業完成後查看 HDFS 上生成目錄:
# 查看目錄 hadoop fs -ls /wordcount/output/WordCountApp # 查看統計結果 hadoop fs -cat /wordcount/output/WordCountApp/part-r-00000
想要使用 combiner
功能只要在組裝做業時,添加下面一行代碼便可:
// 設置 Combiner job.setCombinerClass(WordCountReducer.class);
加入 combiner
後統計結果是不會有變化的,可是能夠從打印的日誌看出 combiner
的效果:
沒有加入 combiner
的打印日誌:
加入 combiner
後的打印日誌以下:
這裏咱們只有一個輸入文件而且小於 128M,因此只有一個 Map 進行處理。能夠看到通過 combiner 後,records 由 3519
下降爲 6
(樣本中單詞種類就只有 6 種),在這個用例中 combiner 就能極大地下降須要傳輸的數據量。
這裏假設有個需求:將不一樣單詞的統計結果輸出到不一樣文件。這種需求實際上比較常見,好比統計產品的銷量時,須要將結果按照產品種類進行拆分。要實現這個功能,就須要用到自定義 Partitioner
。
這裏先介紹下 MapReduce 默認的分類規則:在構建 job 時候,若是不指定,默認的使用的是 HashPartitioner
:對 key 值進行哈希散列並對 numReduceTasks
取餘。其實現以下:
public class HashPartitioner<K, V> extends Partitioner<K, V> { public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
這裏咱們繼承 Partitioner
自定義分類規則,這裏按照單詞進行分類:
public class CustomPartitioner extends Partitioner<Text, IntWritable> { public int getPartition(Text text, IntWritable intWritable, int numPartitions) { return WordCountDataUtils.WORD_LIST.indexOf(text.toString()); } }
在構建 job
時候指定使用咱們本身的分類規則,並設置 reduce
的個數:
// 設置自定義分區規則 job.setPartitionerClass(CustomPartitioner.class); // 設置 reduce 個數 job.setNumReduceTasks(WordCountDataUtils.WORD_LIST.size());
執行結果以下,分別生成 6 個文件,每一個文件中爲對應單詞的統計結果:
更多大數據系列文章能夠參見 GitHub 開源項目: 大數據入門指南