以前運行過了hadoop官方自帶的第一個例子wordcount,此次咱們本身手寫一個,這個至關因而編程語言中的helloworld同樣. 首先咱們瞭解一下咱們要寫的MapReduce是處理的哪一個部分,咱們知道hadoop處理文件是先將要處理的文件拆分紅不少個部分,分別處理完成,最後再將結果給匯聚起來, 造成最終的處理結果.(也就是分治法的思想)咱們接下來舉個單詞統計的例子,看看咱們寫的代碼是整個MapReduce過程當中的哪些部分.html
首先我們有這麼一個文件,文件內容以下:java
hello world hello java
hello hadoop
複製代碼
很簡單的一個文件就兩行.那麼hadoop是怎麼作單詞統計的呢?咱們用步驟來描述下:
第一步:讀取這個文件,按行來將這個文件每一行的單詞給拆分出來,而後造成不少key/value的結果,處理完就是這樣
<hello,1>
<world,1>
<hello,1>
<java,1>
<hello,1>
<hadoop,1>
第二步:排序
排序完會變成這樣的結果
<hadoop,1>
<hello,1>
<hello,1>
<hello,1>
<java,1>
<world,1>
第三步:合併
合併後的結果以下
<hadoop,1>
<hello,1,1,1>
<java,1>
<world,1>
第四步:匯聚結果
<hadoop,1>
<hello,3>
<java,1>
<world,1>apache
到第四步完成,單詞統計其實也就完成了.看完這個具體的實例,想必你們對mapreduce的處理過程有了一個比較清晰的理解. 而後咱們要知道第二步和第三步是hadoop框架幫助咱們完成的,咱們實際上須要寫代碼的地方是第一步和第四步. 第一步對應的就是Map的過程,第四步對應的是Reduce的過程.編程
如今咱們要作的就是完成第一步和第四步的代碼 1.建立項目 bash
3.引入包完成之後,咱們建立一個叫WordCount的java文件,而後開始敲代碼 這裏直接貼一下代碼,__要注意import的部分,是否是和我同樣?__由於好些個名字同樣的類,來自於不一樣的jar,容易弄錯.服務器
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
/** * @author wxwwt * @since 2019-09-15 */
public class WordCount {
/** * Object : 輸入文件的內容 * Text : 輸入的每一行的數據 * Text : 輸出的key的類型 * IntWritable : 輸出value的類型 */
private static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
context.write(new Text(itr.nextToken()), new IntWritable(1));
}
}
}
/** * Text : Mapper輸入的key * IntWritable : Mapper輸入的value * Text : Reducer輸出的key * IntWritable : Reducer輸出的value */
private static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable item : values) {
count += item.get();
}
context.write(key, new IntWritable(count));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 建立配置
Configuration configuration = new Configuration();
// 設置hadoop的做業 jobName是WordCount
Job job = Job.getInstance(configuration, "WordCount");
// 設置jar
job.setJarByClass(WordCount.class);
// 設置Mapper的class
job.setMapperClass(WordCountMapper.class);
// 設置Reducer的class
job.setReducerClass(WordCountReducer.class);
// 設置輸出的key和value類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 設置輸入輸出路徑
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 待job執行完 程序退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
複製代碼
Mapper程序:app
/** * Object : 輸入文件的內容 * Text : 輸入的每一行的數據 * Text : 輸出的key的類型 * IntWritable : 輸出value的類型 */
private static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
context.write(new Text(itr.nextToken()), new IntWritable(1));
}
}
}
複製代碼
context是全局的上下文,先使用了StringTokenizer將value(也就是每行的數據)按照空格分紅了不少份,StringTokenizer若是沒有傳入指定的分割符的話,默認會將 " \t\n\r\f" 空格製表符換行符等符號做爲分隔符,而後使用nextToken()來遍歷這個按照空格分割的字符串.context.write(new Text(itr.nextToken()), new IntWritable(1)); 的意思就是將key/value寫入到上下文中.
注:在hadoop編程中String是Text,Integer是IntWritable.這是hadoop本身封裝的類.記住就行了,使用起來和原來的類差很少
這裏就是寫入了key爲Text的單詞,和value爲Writable的1(統計數量).框架
Reduce程序:編程語言
/** * Text : Mapper輸入的key * IntWritable : Mapper輸入的value * Text : Reducer輸出的key * IntWritable : Reducer輸出的value */
private static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable item : values) {
count += item.get();
}
context.write(key, new IntWritable(count));
}
}
複製代碼
reduce完成的是第四步的內容,咱們看看上面的實例過程就回知道此時的輸入參數大概是這樣
<hello,1,1,1>
因此這裏會有一個遍歷values的過程,就是將這三個1給累加起來了.ide
程序入口:
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 建立配置
Configuration configuration = new Configuration();
// 設置hadoop的做業 jobName是WordCount
Job job = Job.getInstance(configuration, "WordCount");
// 設置jar
job.setJarByClass(WordCount.class);
// 設置Mapper的class
job.setMapperClass(WordCountMapper.class);
// 設置Reducer的class
job.setReducerClass(WordCountReducer.class);
// 設置輸出的key和value類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 設置輸入輸出路徑
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 待job執行完 程序退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
複製代碼
程序入口這裏其實看註釋就已經比較清楚了,都是設置一些mapreduce須要的參數和路徑之類的, 照着寫就好了.這裏稍微要注意一點的就是
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
複製代碼
咱們回顧一下以前運行hadoop的第一個程序的時候,命令大概是 hadoop jar WordCount.jar /input/wordcount/file1 /output/wcoutput 後面的兩個參數就是文件的輸入路徑和輸出路徑,若是我們代碼修改了參數的位置或者有其餘參數的操做. 就要對應好args下標的位置.
4.指定jar包運行的入口 代碼完成後我們就能夠打包了 先選擇File -> Project Structure -> Artifacts -> + -> JAR -> From modules with dependencies
有可能直接運行
hadoop jar WordCount.jar /input/wordcount/file1 /output/wcoutput
複製代碼
會失敗,報一個異常:
Exception in thread "main" java.io.IOException: Mkdirs failed to create /XXX/XXX
at org.apache.hadoop.util.RunJar.ensureDirectory(RunJar.java:106)
at org.apache.hadoop.util.RunJar.main(RunJar.java:150)
複製代碼
相似上面這樣的.
這時候須要刪除掉jar包裏面的License文件夾和裏面的東西,能夠參考這個連接:stackoverflow 查看下jar中license的文件和文件夾 jar tvf XXX.jar | grep -i license 而後刪除掉 META-INF/LICENSE裏面的內容 zip -d XXX.jar META-INF/LICENSE
1.瞭解了mapReduce的運行步驟,這樣知道了咱們只須要寫map和reduce的過程,中間步驟hadoop框架已經作了處理,之後其餘的程序也能夠參考這個步驟來寫
2.hadoop中String是Text,Integer是IntWritable這個要記住,用錯了會報異常的
3.報 Mkdirs failed to create /XXX/XXX異常的時候先檢查是否是路徑有問題,沒有的話就刪除掉jar包中的META-INF/LICENSE
1.hadoop.apache.org/docs/stable…
2.stackoverflow.com/questions/1…