從本章節您能夠學習到:wordcount案例。java
package com.zhaoyi.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 4個參數分別對應指定輸入k-v類型以及輸出k-v類型 */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { super.map(key, value, context); // 1. transport the Text to Java String, this is a line. String line = value.toString(); // 2. split to the line by " " String[] words = line.split(" "); // 3. output the word-1 key-val to context. for (String word:words) { // set word as key,number 1 as value // 根據單詞分發,以便於相同單詞會到相同的reducetask中 context.write(new Text(word), new IntWritable(1)); } } }
Mapper類須要經過繼承Mapper類來編寫。咱們能夠查看Mapper的源碼:web
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package org.apache.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; @Public @Stable public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Mapper() { } protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { context.write(key, value); } protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKeyValue()) { this.map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { this.cleanup(context); } } public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Context() { } } }
能夠看到,他須要咱們指定四個形參類型,分別對應Mapper的輸入key-val類型以及輸出key-val類型。apache
咱們處理的邏輯很簡單,單純的根據空格進行單詞劃分。固然,嚴格意義下來講,須要考慮到多個空格的狀況,這些邏輯若是您須要的話能夠在這裏封裝實現。服務器
Reducer類和Mapper的模式大體相同,他也須要指定四個形參類型,輸入的key-val類型對應Mapper的輸出key-val類型。而輸出則是Text、IntWritable類型。至於爲何不用咱們java本身的封裝類型,咱們之後會提到,如今有個大體印象便可。app
package com.zhaoyi.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * 輸入K-V即爲mapper的輸出K-V類型,咱們要的結果是word-count,所以輸出K-V類型是Text-IntWritable */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; // 1.彙總各個key的總數 for (IntWritable value : values) { count += value.get(); } // 2.輸出該key的總數 context.write(key, new IntWritable(count)); } }
該類負責加載Mapper、reducer執行任務。maven
package com.zhaoyi.wordcount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountDriver { public static void main(String[] args) throws Exception { // 0.檢測參數 if(args.length != 2){ System.out.println("Please enter the parameter: data input and output paths..."); System.exit(-1); } // 1.根據配置信息建立任務 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2.設置驅動類 job.setJarByClass(WordCountDriver.class); // 3.指定mapper和reducer類 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 4.設置輸出結果的類型(reducer output) job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 5.設置輸入數據路徑和輸出數據路徑,由程序執行參數指定 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6.提交工做 //job.submit(); boolean result = job.waitForCompletion(true); System.exit(result? 1:0); } }
一、進入咱們的項目目錄,使用maven打包ide
cd word-count mvn install
執行完成後,將會在輸出目錄獲得一個wordcount-1.0-SNAPSHOT.jar文件,將其拷貝到咱們的Hadoop服務器上用戶目錄下。oop
如今咱們在/input目錄下(HDFS目錄)上傳了一個文件,文件內容以下,該文件將會是咱們分析的輸入對象:學習
this is a test just a test Alice was beginning to get very tired of sitting by her sister on the bank and of having nothing to do: once or twice she had peeped into the book her sister was reading, but it had no pictures or conversations in it, `and what is the use of a book,' thought Alice `without pictures or conversation?' So she was considering in her own mind
接下來,直接運行咱們的任務:測試
[root@h133 ~]# hadoop jar wordcount-1.0-SNAPSHOT.jar com.zhaoyi.wordcount.WordCountDriver /input /output ... 19/01/07 10:21:20 INFO client.RMProxy: Connecting to ResourceManager at h134/192.168.102.134:8032 19/01/07 10:21:22 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 19/01/07 10:21:23 INFO input.FileInputFormat: Total input paths to process : 1 19/01/07 10:21:25 INFO mapreduce.JobSubmitter: number of splits:1 19/01/07 10:21:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1546821218045_0001 19/01/07 10:21:27 INFO impl.YarnClientImpl: Submitted application application_1546821218045_0001 19/01/07 10:21:27 INFO mapreduce.Job: The url to track the job: http://h134:8088/proxy/application_1546821218045_0001/ 19/01/07 10:21:27 INFO mapreduce.Job: Running job: job_1546821218045_0001 ...
com.zhaoyi.wordcount.WordCountDriver 是咱們的驅動類的全路徑名,請根據您的實際環境填寫。後面的兩個參數分別是輸入路徑和輸出路徑。
等待執行完成,任務進行的過程也能夠經過web界面http://resource_manager:8088查看執行流程。
最後獲得咱們想要的輸出結果:
[root@h133 ~]# hadoop fs -cat /output/part-r-00000 Alice 2 So 1 `and 1 `without 1 a 3 and 1 and 1 bank 1 beginning 1 book 1 book,' 1 but 1 by 1 considering 1 conversation?' 1 conversations 1 ...