mapReduce編程模型的總結:java
MapReduce的開發一共有八個步驟其中map階段分爲2個步驟,shuffle階段4個步驟,reduce階段分爲2個步驟node
第一步:設置inputFormat類,將咱們的數據切分紅key,value對,輸入到第二步apache
第二步:自定義map邏輯,處理咱們第一步的輸入數據,而後轉換成新的key,value對進行輸出編程
第三步:對輸出的key,value對進行分區。相同key的數據發送到同一個reduce裏面去,相同key合併,value造成一個集合windows
第四步:對不一樣分區的數據按照相同的key進行排序網絡
第五步:對分組後的數據進行規約(combine操做),下降數據的網絡拷貝(可選步驟)app
第六步:對排序後的額數據進行分組,分組的過程當中,將相同key的value放到一個集合當中maven
第七步:對多個map的任務進行合併,排序,寫reduce函數本身的邏輯,對輸入的key,value對進行處理,轉換成新的key,value對進行輸出ide
第八步:設置outputformat將輸出的key,value對數據進行保存到文件中函數
hadoop沒有沿用java當中基本的數據類型,而是本身進行封裝了一套數據類型,其本身封裝的類型與java的類型對應以下
Java類型 |
Hadoop Writable類型 |
Boolean |
BooleanWritable |
Byte |
ByteWritable |
Int |
IntWritable |
Float |
FloatWritable |
Long |
LongWritable |
Double |
DoubleWritable |
String |
Text |
Map |
MapWritable |
Array |
ArrayWritable |
byte[] |
BytesWritable |
一、添加須要的依賴包(pom.xml文件):添加如下內容
<repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-mr1-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.0-cdh5.14.2</version> </dependency> <!-- https://mvnrepository.com/artifact/junit/junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <version>7.0.0</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> <!-- <verbal>true</verbal>--> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <minimizeJar>true</minimizeJar> </configuration> </execution> </executions> </plugin> </plugins> </build>
二、建立如下三個文件
三、編寫MyMapper類
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 自定義mapper類須要繼承Mapper,有四個泛型, * keyin: k1 行偏移量 Long * valuein: v1 一行文本內容 String * keyout: k2 每個單詞 String * valueout : v2 1 int * 在hadoop當中沒有沿用Java的一些基本類型,使用本身封裝了一套基本類型 * long ==>LongWritable * String ==> Text * int ==> IntWritable * */ public class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable> { /** * 繼承mapper以後,覆寫map方法,每次讀取一行數據,都會來調用一下map方法 * @param key:對應k1 * @param value:對應v1 * @param context 上下文對象。承上啓下,承接上面步驟發過來的數據,經過context將數據發送到下面的步驟裏面去 * @throws IOException * @throws InterruptedException * k1 v1 * 0;hello,world * * k2 v2 * hello 1 * world 1 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //獲取咱們的一行數據 String line = value.toString(); String[] split = line.split(","); Text text = new Text(); IntWritable intWritable = new IntWritable(1); for (String word : split) { //將每一個單詞出現都記作1次 //key2 Text類型 //v2 IntWritable類型 text.set(word); //將咱們的key2 v2寫出去到下游 context.write(text,intWritable); } } }
四、編寫MyReduce類
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> { //第三步:分區 相同key的數據發送到同一個reduce裏面去,相同key合併,value造成一個集合 /** * 繼承Reducer類以後,覆寫reduce方法 * @param key * @param values * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int result = 0; for (IntWritable value : values) { //將咱們的結果進行累加 result += value.get(); } //繼續輸出咱們的數據 IntWritable intWritable = new IntWritable(result); //將咱們的數據輸出 context.write(key,intWritable); } }
五、編寫WordCount類
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /* 這個類做爲mr程序的入口類,這裏面寫main方法 */ public class WordCount extends Configured implements Tool { /** * 實現Tool接口以後,須要實現一個run方法, * 這個run方法用於組裝咱們的程序的邏輯,其實就是組裝八個步驟 * * @param args * @return * @throws Exception */ @Override public int run(String[] args) throws Exception { //獲取Job對象,組裝咱們的八個步驟,每個步驟都是一個class類 Configuration conf = super.getConf(); Job job = Job.getInstance(conf, "mrdemo1"); //實際工做當中,程序運行完成以後通常都是打包到集羣上面去運行,打成一個jar包 //若是要打包到集羣上面去運行,必須添加如下設置 job.setJarByClass(WordCount.class); //第一步:讀取文件,解析成key,value對,k1:行偏移量 v1:一行文本內容 job.setInputFormatClass(TextInputFormat.class); //指定咱們去哪個路徑讀取文件 TextInputFormat.addInputPath(job, new Path("file:///E:\\BigDataCode\\Java\\data\\wc\\input")); //第二步:自定義map邏輯,接受k1 v1 轉換成爲新的k2 v2輸出 job.setMapperClass(MyMapper.class); //設置map階段輸出的key,value的類型,其實就是k2 v2的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //第三步到六步:分區,排序,規約,分組都省略 //第七步:自定義reduce邏輯 job.setReducerClass(MyReducer.class); //設置key3 value3的類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //第八步:輸出k3 v3 進行保存 job.setOutputFormatClass(TextOutputFormat.class); //必定要注意,輸出路徑是須要不存在的,若是存在就報錯 TextOutputFormat.setOutputPath(job, new Path("file:///E:\\BigDataCode\\Java\\data\\wc\\out_result")); //提交job任務 boolean b = job.waitForCompletion(true); return b ? 0 : 1; /*** * 第一步:讀取文件,解析成key,value對,k1 v1 * 第二步:自定義map邏輯,接受k1 v1 轉換成爲新的k2 v2輸出 * 第三步:分區。相同key的數據發送到同一個reduce裏面去,key合併,value造成一個集合 * 第四步:排序 對key2進行排序。字典順序排序 * 第五步:規約 combiner過程 調優步驟 可選 * 第六步:分組 * 第七步:自定義reduce邏輯接受k2 v2 轉換成爲新的k3 v3輸出 * 第八步:輸出k3 v3 進行保存 * * */ } /* 做爲程序的入口類 */ public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.set("hello", "world"); //提交run方法以後,獲得一個程序的退出狀態碼 int run = ToolRunner.run(configuration, new WordCount(), args); //根據咱們 程序的退出狀態碼,退出整個進程 System.exit(run); } }
一、將WordCount類中的路徑更改爲集羣上的路徑
二、打 jar 包
三、拷貝jar包到集羣上
四、運行jar包,執行如下語句(後面那個com.yyy.wordcount.WordCount路徑根據本身的設置:能夠在本身IDEA中的文件上右鍵->Copy Reference)
yarn jar yarn-1.0-SNAPSHOT.jar com.yyy.wordcount.WordCount
五、查看運行 http://node01:8088/cluster
六、查看運行結果
能夠下載下來看一下對不對。
而後結束。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。