程序使用的測試文本數據:java
Dear River Dear River Bear Spark Car Dear Car Bear Car Dear Car River Car Spark Spark Dear Spark
首先是自定義的Maper類代碼apache
public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //fields:表明着文本一行的的數據: dear bear river String[] words = value.toString().split("\t"); for (String word : words) { // 每一個單詞出現1次,做爲中間結果輸出 context.write(new Text(word), new IntWritable(1)); } } }
這個Map類是一個泛型類型,它有四個形參類型,分別指定map()函數的輸入鍵、輸入值、輸出鍵和輸出值的類型。LongWritable
:輸入鍵類型,Text
:輸入值類型,Text
:輸出鍵類型,IntWritable
:輸出值類型.
String[] words = value.toString().split("\t");
,words
的值爲Dear River Bear River
輸入鍵key是一個長整數偏移量,用來尋找第一行的數據和下一行的數據,輸入值是一行文本Dear River Bear River
,輸出鍵是單詞Bear
,輸出值是整數1
。
Hadoop自己提供了一套可優化網絡序列化傳輸的基本類型,而不直接使用Java內嵌的類型。這些類型都在org.apache.hadoop.io
包中。這裏使用LongWritable
類型(至關於Java的Long
類型)、Text
類型(至關於Java中的String類型)和IntWritable
類型(至關於Java的Integer
類型)。
map()
方法的參數是輸入鍵和輸入值。以本程序爲例,輸入鍵LongWritable key
是一個偏移量,輸入值Text value
是Dear Car Bear Car
,咱們首先將包含有一行輸入的Text
值轉換成Java的String
類型,以後使用substring()
方法提取咱們感興趣的列。map()
方法還提供了Context
實例用於輸出內容的寫入。服務器
public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> { /* (River, 1) (River, 1) (River, 1) (Spark , 1) (Spark , 1) (Spark , 1) (Spark , 1) key: River value: List(1, 1, 1) key: Spark value: List(1, 1, 1,1) */ public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable count : values) { sum += count.get(); } context.write(key, new IntWritable(sum));// 輸出最終結果 }; }
Reduce任務最初按照分區號從Map端抓取數據爲:
(River, 1)
(River, 1)
(River, 1)
(spark, 1)
(Spark , 1)
(Spark , 1)
(Spark , 1)
通過處理後獲得的結果爲:
key: hello value: List(1, 1, 1)
key: spark value: List(1, 1, 1,1)
因此reduce()函數的形參 Iterable<IntWritable> values
接收到的值爲List(1, 1, 1)
和List(1, 1, 1,1)
網絡
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountMain { //若在IDEA中本地執行MR程序,須要將mapred-site.xml中的mapreduce.framework.name值修改爲local public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { if (args.length != 2 || args == null) { System.out.println("please input Path!"); System.exit(0); } //System.setProperty("HADOOP_USER_NAME","hadoop2.7"); Configuration configuration = new Configuration(); //configuration.set("mapreduce.job.jar","/home/bruce/project/kkbhdp01/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar"); //調用getInstance方法,生成job實例 Job job = Job.getInstance(configuration, WordCountMain.class.getSimpleName()); // 打jar包 job.setJarByClass(WordCountMain.class); // 經過job設置輸入/輸出格式 // MR的默認輸入格式是TextInputFormat,因此下兩行能夠註釋掉 // job.setInputFormatClass(TextInputFormat.class); // job.setOutputFormatClass(TextOutputFormat.class); // 設置輸入/輸出路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 設置處理Map/Reduce階段的類 job.setMapperClass(WordCountMap.class); //map combine減小網路傳出量 job.setCombinerClass(WordCountReduce.class); job.setReducerClass(WordCountReduce.class); //若是map、reduce的輸出的kv對類型一致,直接設置reduce的輸出的kv對就行;若是不同,須要分別設置map, reduce的 輸出的kv類型 //job.setMapOutputKeyClass(.class) // job.setMapOutputKeyClass(Text.class); // job.setMapOutputValueClass(IntWritable.class); // 設置reduce task最終輸出key/value的類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 提交做業 job.waitForCompletion(true); } }
首先更改mapred-site.xml文件配置
將mapreduce.framework.name的值設置爲local
而後本地運行:
查看結果:app
首先打包
更改配置文件,改爲yarn模式
添加本地jar包位置:maven
Configuration configuration = new Configuration(); configuration.set("mapreduce.job.jar","C:\\Users\\tanglei1\\IdeaProjects\\Hadooptang\\target");
設置容許跨平臺遠程調用:ide
configuration.set("mapreduce.app-submission.cross-platform","true");
修改輸入參數:
運行結果:函數
將maven項目打包,在服務器端用命令運行mr程序oop
hadoop jar com.kaikeba.hadoop-1.0-SNAPSHOT.jar com.kaikeba.hadoop.wordcount.WordCountMain /tttt.txt /wordcount11