Mapreduce實例——去重

 

"數據去重"主要是爲了掌握和利用並行化思想來對數據進行有意義的篩選。統計大數據集上的數據種類個數、從網站日誌中計算訪問地等這些看似龐雜的任務都會涉及數據去重。 java

MaprReduce去重流程以下圖所示: linux

數據去重的最終目標是讓原始數據中出現次數超過一次的數據在輸出文件中只出現一次。在MapReduce流程中,map的輸出<key,value>通過shuffle過程彙集成<key,value-list>後交給reduce。咱們天然而然會想到將同一個數據的全部記錄都交給一臺reduce機器,不管這個數據出現多少次,只要在最終結果中輸出一次就能夠了。具體就是reduce的輸入應該以數據做爲key,而對value-list則沒有要求(能夠設置爲空)。當reduce接收到一個<key,value-list>時就直接將輸入的key複製到輸出的key中,並將value設置成空值,而後輸出<key,value>。 apache

實驗環境 網絡

Linux Ubuntu 14.04 app

jdk-7u75-linux-x64 eclipse

hadoop-2.6.0-cdh5.4.5 函數

hadoop-2.6.0-eclipse-cdh5.4.5.jar oop

eclipse-java-juno-SR2-linux-gtk-x86_64 大數據

實驗內容 網站

現有一個某電商網站的數據文件,名爲buyer_favorite1,記錄了用戶收藏的商品以及收藏的日期,文件buyer_favorite1中包含(用戶id,商品id,收藏日期)三個字段,數據內容以"\t"分割,因爲數據很大,因此爲了方便統計咱們只截取它的一部分數據,內容以下:

  1. 用戶id   商品id    收藏日期  
  2. 10181   1000481   2010-04-04 16:54:31  
  3. 20001   1001597   2010-04-07 15:07:52  
  4. 20001   1001560   2010-04-07 15:08:27  
  5. 20042   1001368   2010-04-08 08:20:30  
  6. 20067   1002061   2010-04-08 16:45:33  
  7. 20056   1003289   2010-04-12 10:50:55  
  8. 20056   1003290   2010-04-12 11:57:35  
  9. 20056   1003292   2010-04-12 12:05:29  
  10. 20054   1002420   2010-04-14 15:24:12  
  11. 20055   1001679   2010-04-14 19:46:04  
  12. 20054   1010675   2010-04-14 15:23:53  
  13. 20054   1002429   2010-04-14 17:52:45  
  14. 20076   1002427   2010-04-14 19:35:39  
  15. 20054   1003326   2010-04-20 12:54:44  
  16. 20056   1002420   2010-04-15 11:24:49  
  17. 20064   1002422   2010-04-15 11:35:54  
  18. 20056   1003066   2010-04-15 11:43:01  
  19. 20056   1003055   2010-04-15 11:43:06  
  20. 20056   1010183   2010-04-15 11:45:24  
  21. 20056   1002422   2010-04-15 11:45:49  
  22. 20056   1003100   2010-04-15 11:45:54  
  23. 20056   1003094   2010-04-15 11:45:57  
  24. 20056   1003064   2010-04-15 11:46:04  
  25. 20056   1010178   2010-04-15 16:15:20  
  26. 20076   1003101   2010-04-15 16:37:27  
  27. 20076   1003103   2010-04-15 16:37:05  
  28. 20076   1003100   2010-04-15 16:37:18  
  29. 20076   1003066   2010-04-15 16:37:31  
  30. 20054   1003103   2010-04-15 16:40:14  
  31. 20054   1003100   2010-04-15 16:40:16  

要求用Java編寫MapReduce程序,根據商品id進行去重,統計用戶收藏商品中都有哪些商品被收藏。結果數據以下:

  1. 商品id  
  2. 1000481  
  3. 1001368  
  4. 1001560  
  5. 1001597  
  6. 1001679  
  7. 1002061  
  8. 1002420  
  9. 1002422  
  10. 1002427  
  11. 1002429  
  12. 1003055  
  13. 1003064  
  14. 1003066  
  15. 1003094  
  16. 1003100  
  17. 1003101  
  18. 1003103  
  19. 1003289  
  20. 1003290  
  21. 1003292  
  22. 1003326  
  23. 1010178  
  24. 1010183  
  25. 1010675  

實驗步驟

1.切換到/apps/hadoop/sbin目錄下,開啓Hadoop。

  1. cd /apps/hadoop/sbin  
  2. ./start-all.sh  

2.在Linux本地新建/data/mapreduce2目錄。

  1. mkdir -p /data/mapreduce2  

3.切換到/data/mapreduce1目錄下,自行創建文本文件buyer_favorite1。

依然在/data/mapreduce1目錄下,使用wget命令,從

網絡下載hadoop2lib.tar.gz,下載項目用到的依賴包。

將hadoop2lib.tar.gz解壓到當前目錄下。

  1. tar -xzvf hadoop2lib.tar.gz  

4.首先在HDFS上新建/mymapreduce2/in目錄,而後將Linux本地/data/mapreduce2目錄下的buyer_favorite1文件導入到HDFS的/mymapreduce2/in目錄中。

view plain copy

  1. hadoop fs -mkdir -p /mymapreduce2/in  
  2. hadoop fs -put /data/mapreduce2/buyer_favorite1 /mymapreduce2/in  

5.新建Java Project項目,項目名爲mapreduce2。

在mapreduce2項目下新建包,包名爲mapreduce。

在mapreduce包下新建類,類名爲Filter。

6.添加項目所需依賴的jar包

右鍵項目,新建一個文件夾,命名爲:hadoop2lib,用於存放項目所需的jar包。

將/data/mapreduce2目錄下,hadoop2lib目錄中的jar包,拷貝到eclipse中mapreduce2項目的hadoop2lib目錄下。

選中全部項目hadoop2lib目錄下全部jar包,並添加到Build Path中。

7.編寫程序代碼,並描述其思路

數據去重的目的是讓原始數據中出現次數超過一次的數據在輸出文件中只出現一次。咱們天然想到將相同key值的全部value記錄交到一臺reduce機器,讓其不管這個數據出現多少次,最終結果只輸出一次。具體就是reduce的輸出應該以數據做爲key,而對value-list沒有要求,當reduce接收到一個時,就直接將key複製到輸出的key中,將value設置爲空。

Map代碼

  1. public static class Map extends Mapper<Object , Text , Text , NullWritable>  
  2.     //map將輸入中的value複製到輸出數據的key上,並直接輸出  
  3.     {  
  4.     private static Text newKey=new Text();      //從輸入中獲得的每行的數據的類型  
  5.     public void map(Object key,Text value,Context context) throws IOException, InterruptedException  
  6.     //實現map函數  
  7.     {             //獲取並輸出每一次的處理過程  
  8.     String line=value.toString();  
  9.     System.out.println(line);  
  10.     String arr[]=line.split("\t");  
  11.     newKey.set(arr[1]);  
  12.     context.write(newKey, NullWritable.get());  
  13.     System.out.println(newKey);  
  14.     }  
  15.     }  

map階段採用Hadoop的默認的做業輸入方式,把輸入的value用split()方法截取,截取出的商品id字段設置爲key,設置value爲空,而後直接輸出<key,value>。

reduce端代碼

  1. public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable>{  
  2.         public void reduce(Text key,Iterable<NullWritable> values,Context context) throws IOException, InterruptedException  
  3.     //實現reduce函數  
  4.     {  
  5.     context.write(key,NullWritable.get());   //獲取並輸出每一次的處理過程  
  6.     }  
  7.     }  

map輸出的<key,value>鍵值對通過shuffle過程,聚成<key,value-list>後,會交給reduce函數。reduce函數,無論每一個key 有多少個value,它直接將輸入的賦值給輸出的key,將輸出的value設置爲空,而後輸出<key,value>就能夠了。

完整代碼

  1. package mapreduce;  
  2. import java.io.IOException;  
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.fs.Path;  
  5. import org.apache.hadoop.io.IntWritable;  
  6. import org.apache.hadoop.io.NullWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.mapreduce.Job;  
  9. import org.apache.hadoop.mapreduce.Mapper;  
  10. import org.apache.hadoop.mapreduce.Reducer;  
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  12. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  14. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  15. public class Filter{  
  16.     public static class Map extends Mapper<Object , Text , Text , NullWritable>{  
  17.     private static Text newKey=new Text();  
  18.     public void map(Object key,Text value,Context context) throws IOException, InterruptedException{  
  19.     String line=value.toString();  
  20.     System.out.println(line);  
  21.     String arr[]=line.split("\t");  
  22.     newKey.set(arr[1]);  
  23.     context.write(newKey, NullWritable.get());  
  24.     System.out.println(newKey);  
  25.     }  
  26.     }  
  27.     public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable>{  
  28.     public void reduce(Text key,Iterable<NullWritable> values,Context context) throws IOException, InterruptedException{  
  29.         context.write(key,NullWritable.get());  
  30.         }  
  31.         }  
  32.         public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{  
  33.         Configuration conf=new Configuration();  
  34.         System.out.println("start");  
  35.         Job job =new Job(conf,"filter");  
  36.         job.setJarByClass(Filter.class);  
  37.         job.setMapperClass(Map.class);  
  38.         job.setReducerClass(Reduce.class);  
  39.         job.setOutputKeyClass(Text.class);  
  40.         job.setOutputValueClass(NullWritable.class);  
  41.         job.setInputFormatClass(TextInputFormat.class);  
  42.         job.setOutputFormatClass(TextOutputFormat.class);  
  43.         Path in=new Path("hdfs://localhost:9000/mymapreduce2/in/buyer_favorite1");  
  44.         Path out=new Path("hdfs://localhost:9000/mymapreduce2/out");  
  45.         FileInputFormat.addInputPath(job,in);  
  46.         FileOutputFormat.setOutputPath(job,out);  
  47.         System.exit(job.waitForCompletion(true) ? 0 : 1);  
  48.         }  
  49.         }  

8.在Filter類文件中,右鍵並點擊=>Run As=>Run on Hadoop選項,將MapReduce任務提交到Hadoop中。

9.待執行完畢後,進入命令模式下,在HDFS中/mymapreduce2/out查看實驗結果。

  1. hadoop fs -ls /mymapreduce2/out  
  2. hadoop fs -cat /mymapreduce2/out/part-r-00000  

相關文章
相關標籤/搜索