原文地址: itweknow.cn/detail?id=6… ,歡迎你們訪問。java
MapReduce是一種編程模型,"Map(映射)"和"Reduce(歸約)",是它們的主要思想,咱們經過Map函數來分佈式處理輸入數據,而後經過Reduce彙總結果並輸出。其實這個概念有點相似於咱們Java8中的StreamApi,有興趣的同窗也能夠去看看。
MapReduce任務過程分爲兩個處理階段,map階段和reduce階段。每一個階段都以鍵-值對做爲輸入輸出,鍵和值的類型由咱們本身指定。一般狀況map的輸入內容鍵是LongWritable類型,爲某一行起始位置相對於文件起始位置的偏移量;值是Text類型,爲該行的文本內容。linux
- 一個maven項目。
- 一臺運行着hadoop的linux機器或者虛擬機,固然了hadoop集羣也能夠,若是你尚未的話能夠戳這裏。
咱們編寫一個MapReduce程序的通常步驟是:(1)map程序。(2)reduce程序。(3)程序驅動。下面咱們就根據這個順序來寫一個簡單的示例,這個例子是用來統計文件中每一個字符出現的次數並輸出。git
咱們先來解決一下依賴問題,在pom.xml
中添加以下內容。github
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
複製代碼
咱們繼承Mapper
類並重寫了其map方法。Map階段輸入的數據是從hdfs中拿到的原數據,輸入的key爲某一行起始位置相對於文件起始位置的偏移量,value爲該行的文本。輸出的內容一樣也爲鍵-值對,這個時候輸出數據的鍵值對的類型能夠本身指定,在本例中key是Text類型的,value是LongWritable類型的。輸出的結果將會被髮送到reduce函數進一步處理。apache
public class CharCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 將這一行文本轉爲字符數組
char[] chars = value.toString().toCharArray();
for (char c : chars) {
// 某個字符出現一次,便輸出其出現1次。
context.write(new Text(c + ""), new LongWritable(1));
}
}
}
複製代碼
咱們繼承Reducer
類並重寫了其reduce方法。在本例中Reduce階段的輸入是Map階段的輸出,輸出的結果能夠做爲最終的輸出結果。相信你也注意到了,reduce方法的第二個參數是一個Iterable,MapReduce會將map階段中相同字符的輸出彙總到一塊兒做爲reduce的輸入。編程
public class CharCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
context.write(key, new LongWritable(count));
}
}
複製代碼
到目前爲止,咱們已經有了map程序和reduce程序,咱們還須要一個驅動程序來運行整個做業。能夠看到咱們在這裏初始化了一個Job對象。Job對象指定整個MapReduce做業的執行規範。咱們用它來控制整個做業的運做,在這裏咱們指定了jar包位置還有咱們的Map
程序、Reduce
程序、Map
程序的輸出類型、整個做業的輸出類型還有輸入輸出文件的地址。數組
public class CharCountDriver {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// Hadoop會自動根據驅動程序的類路徑來掃描該做業的Jar包。
job.setJarByClass(cn.itweknow.mr.CharCountDriver.class);
// 指定mapper
job.setMapperClass(CharCountMapper.class);
// 指定reducer
job.setReducerClass(CharCountReducer.class);
// map程序的輸出鍵-值對類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 輸出鍵-值對類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 輸入文件的路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 輸入文件路徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
複製代碼
你會發現咱們初始化了一個空的Configuration,可是並無進行任何的配置,其實當咱們將其運行在一個運行着hadoop的機器上時,它會默認使用咱們機器上的配置。在後續的文章中我也會寫一下如何在程序中進行配置。bash
打包做業,咱們須要將咱們的MapReduce程序打成jar包。app
mvn package -Dmaven.test.skip=true
複製代碼
生成的jar包咱們能夠在target目錄下找到。maven
將jar包複製到hadoop機器上。
在HDFS上準備好要統計的文件,我準備的文件在HDFS上的/mr/input/
目錄下,內容以下。
hello hadoop hdfs.I am coming.
複製代碼
執行jar
hadoop jar mr-test-1.0-SNAPSHOT.jar cn.itweknow.mr.CharCountDriver /mr/input/ /mr/output/out.txt
複製代碼
查看結果
先查看輸出目錄,結果以下,最終輸出的結果就存放在/mr/output/part-r-00000
文件中。
root@test:~# hadoop fs -ls /mr/output
Found 2 items
-rw-r--r-- 1 root supergroup 0 2018-12-24 10:33 /mr/output/_SUCCESS
-rw-r--r-- 1 root supergroup 68 2018-12-24 10:33 /mr/output/part-r-00000
複製代碼
查看結果文件的具體內容:
root@test:~# hadoop fs -cat /mr/output/part-r-00000
4
. 2
I 1
a 2
c 1
d 2
e 1
f 1
g 1
h 3
i 1
l 2
m 2
n 1
o 4
p 1
s 1
複製代碼
最後,送上本文的源碼地址,戳這裏哦。