"數據去重"主要是爲了掌握和利用並行化思想來對數據進行有意義的篩選。統計大數據集上的數據種類個數、從網站日誌中計算訪問地等這些看似龐雜的任務都會涉及數據去重。 java
MaprReduce去重流程以下圖所示: linux
數據去重的最終目標是讓原始數據中出現次數超過一次的數據在輸出文件中只出現一次。在MapReduce流程中,map的輸出<key,value>通過shuffle過程彙集成<key,value-list>後交給reduce。咱們天然而然會想到將同一個數據的全部記錄都交給一臺reduce機器,不管這個數據出現多少次,只要在最終結果中輸出一次就能夠了。具體就是reduce的輸入應該以數據做爲key,而對value-list則沒有要求(能夠設置爲空)。當reduce接收到一個<key,value-list>時就直接將輸入的key複製到輸出的key中,並將value設置成空值,而後輸出<key,value>。 apache
實驗環境 網絡
Linux Ubuntu 14.04 app
jdk-7u75-linux-x64 eclipse
hadoop-2.6.0-cdh5.4.5 函數
hadoop-2.6.0-eclipse-cdh5.4.5.jar oop
eclipse-java-juno-SR2-linux-gtk-x86_64 大數據
實驗內容 網站
現有一個某電商網站的數據文件,名爲buyer_favorite1,記錄了用戶收藏的商品以及收藏的日期,文件buyer_favorite1中包含(用戶id,商品id,收藏日期)三個字段,數據內容以"\t"分割,因爲數據很大,因此爲了方便統計咱們只截取它的一部分數據,內容以下:
-
用戶id 商品id 收藏日期
-
10181 1000481 2010-04-04 16:54:31
-
20001 1001597 2010-04-07 15:07:52
-
20001 1001560 2010-04-07 15:08:27
-
20042 1001368 2010-04-08 08:20:30
-
20067 1002061 2010-04-08 16:45:33
-
20056 1003289 2010-04-12 10:50:55
-
20056 1003290 2010-04-12 11:57:35
-
20056 1003292 2010-04-12 12:05:29
-
20054 1002420 2010-04-14 15:24:12
-
20055 1001679 2010-04-14 19:46:04
-
20054 1010675 2010-04-14 15:23:53
-
20054 1002429 2010-04-14 17:52:45
-
20076 1002427 2010-04-14 19:35:39
-
20054 1003326 2010-04-20 12:54:44
-
20056 1002420 2010-04-15 11:24:49
-
20064 1002422 2010-04-15 11:35:54
-
20056 1003066 2010-04-15 11:43:01
-
20056 1003055 2010-04-15 11:43:06
-
20056 1010183 2010-04-15 11:45:24
-
20056 1002422 2010-04-15 11:45:49
-
20056 1003100 2010-04-15 11:45:54
-
20056 1003094 2010-04-15 11:45:57
-
20056 1003064 2010-04-15 11:46:04
-
20056 1010178 2010-04-15 16:15:20
-
20076 1003101 2010-04-15 16:37:27
-
20076 1003103 2010-04-15 16:37:05
-
20076 1003100 2010-04-15 16:37:18
-
20076 1003066 2010-04-15 16:37:31
-
20054 1003103 2010-04-15 16:40:14
-
20054 1003100 2010-04-15 16:40:16
要求用Java編寫MapReduce程序,根據商品id進行去重,統計用戶收藏商品中都有哪些商品被收藏。結果數據以下:
-
商品id
-
1000481
-
1001368
-
1001560
-
1001597
-
1001679
-
1002061
-
1002420
-
1002422
-
1002427
-
1002429
-
1003055
-
1003064
-
1003066
-
1003094
-
1003100
-
1003101
-
1003103
-
1003289
-
1003290
-
1003292
-
1003326
-
1010178
-
1010183
-
1010675
實驗步驟
1.切換到/apps/hadoop/sbin目錄下,開啓Hadoop。
-
cd /apps/hadoop/sbin
-
./start-all.sh
2.在Linux本地新建/data/mapreduce2目錄。
-
mkdir -p /data/mapreduce2
3.切換到/data/mapreduce1目錄下,自行創建文本文件buyer_favorite1。
依然在/data/mapreduce1目錄下,使用wget命令,從
網絡下載hadoop2lib.tar.gz,下載項目用到的依賴包。
將hadoop2lib.tar.gz解壓到當前目錄下。
-
tar -xzvf hadoop2lib.tar.gz
4.首先在HDFS上新建/mymapreduce2/in目錄,而後將Linux本地/data/mapreduce2目錄下的buyer_favorite1文件導入到HDFS的/mymapreduce2/in目錄中。
view plain copy
-
hadoop fs -mkdir -p /mymapreduce2/in
-
hadoop fs -put /data/mapreduce2/buyer_favorite1 /mymapreduce2/in
5.新建Java Project項目,項目名爲mapreduce2。
在mapreduce2項目下新建包,包名爲mapreduce。
在mapreduce包下新建類,類名爲Filter。
6.添加項目所需依賴的jar包
右鍵項目,新建一個文件夾,命名爲:hadoop2lib,用於存放項目所需的jar包。
將/data/mapreduce2目錄下,hadoop2lib目錄中的jar包,拷貝到eclipse中mapreduce2項目的hadoop2lib目錄下。
選中全部項目hadoop2lib目錄下全部jar包,並添加到Build Path中。
7.編寫程序代碼,並描述其思路
數據去重的目的是讓原始數據中出現次數超過一次的數據在輸出文件中只出現一次。咱們天然想到將相同key值的全部value記錄交到一臺reduce機器,讓其不管這個數據出現多少次,最終結果只輸出一次。具體就是reduce的輸出應該以數據做爲key,而對value-list沒有要求,當reduce接收到一個時,就直接將key複製到輸出的key中,將value設置爲空。
Map代碼
-
public static class Map extends Mapper<Object , Text , Text , NullWritable>
-
//map將輸入中的value複製到輸出數據的key上,並直接輸出
-
{
-
private static Text newKey=new Text(); //從輸入中獲得的每行的數據的類型
-
public void map(Object key,Text value,Context context) throws IOException, InterruptedException
-
//實現map函數
-
{ //獲取並輸出每一次的處理過程
-
String line=value.toString();
-
System.out.println(line);
-
String arr[]=line.split("\t");
-
newKey.set(arr[1]);
-
context.write(newKey, NullWritable.get());
-
System.out.println(newKey);
-
}
-
}
map階段採用Hadoop的默認的做業輸入方式,把輸入的value用split()方法截取,截取出的商品id字段設置爲key,設置value爲空,而後直接輸出<key,value>。
reduce端代碼
-
public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable>{
-
public void reduce(Text key,Iterable<NullWritable> values,Context context) throws IOException, InterruptedException
-
//實現reduce函數
-
{
-
context.write(key,NullWritable.get()); //獲取並輸出每一次的處理過程
-
}
-
}
map輸出的<key,value>鍵值對通過shuffle過程,聚成<key,value-list>後,會交給reduce函數。reduce函數,無論每一個key 有多少個value,它直接將輸入的賦值給輸出的key,將輸出的value設置爲空,而後輸出<key,value>就能夠了。
完整代碼
-
package mapreduce;
-
import java.io.IOException;
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.NullWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
public class Filter{
-
public static class Map extends Mapper<Object , Text , Text , NullWritable>{
-
private static Text newKey=new Text();
-
public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
-
String line=value.toString();
-
System.out.println(line);
-
String arr[]=line.split("\t");
-
newKey.set(arr[1]);
-
context.write(newKey, NullWritable.get());
-
System.out.println(newKey);
-
}
-
}
-
public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable>{
-
public void reduce(Text key,Iterable<NullWritable> values,Context context) throws IOException, InterruptedException{
-
context.write(key,NullWritable.get());
-
}
-
}
-
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
-
Configuration conf=new Configuration();
-
System.out.println("start");
-
Job job =new Job(conf,"filter");
-
job.setJarByClass(Filter.class);
-
job.setMapperClass(Map.class);
-
job.setReducerClass(Reduce.class);
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(NullWritable.class);
-
job.setInputFormatClass(TextInputFormat.class);
-
job.setOutputFormatClass(TextOutputFormat.class);
-
Path in=new Path("hdfs://localhost:9000/mymapreduce2/in/buyer_favorite1");
-
Path out=new Path("hdfs://localhost:9000/mymapreduce2/out");
-
FileInputFormat.addInputPath(job,in);
-
FileOutputFormat.setOutputPath(job,out);
-
System.exit(job.waitForCompletion(true) ? 0 : 1);
-
}
-
}
8.在Filter類文件中,右鍵並點擊=>Run As=>Run on Hadoop選項,將MapReduce任務提交到Hadoop中。
9.待執行完畢後,進入命令模式下,在HDFS中/mymapreduce2/out查看實驗結果。
-
hadoop fs -ls /mymapreduce2/out
-
hadoop fs -cat /mymapreduce2/out/part-r-00000