hadoop mapreduce實例

一、數據去重

   "數據去重"主要是爲了掌握和利用並行化思想來對數據進行有意義篩選統計大數據集上的數據種類個數從網站日誌中計算訪問地等這些看似龐雜的任務都會涉及數據去重。下面就進入這個實例的MapReduce程序設計。java

1.1 實例描述

  對數據文件中的數據進行去重。數據文件中的每行都是一個數據。算法

  樣例輸入以下所示:數據庫

     1)file1:apache

 

2012-3-1 a數組

2012-3-2 b數據結構

2012-3-3 capp

2012-3-4 d框架

2012-3-5 a函數

2012-3-6 boop

2012-3-7 c

2012-3-3 c

 

     2)file2:

 

2012-3-1 b

2012-3-2 a

2012-3-3 b

2012-3-4 d

2012-3-5 a

2012-3-6 c

2012-3-7 d

2012-3-3 c

 

     樣例輸出以下所示:

 

2012-3-1 a

2012-3-1 b

2012-3-2 a

2012-3-2 b

2012-3-3 b

2012-3-3 c

2012-3-4 d

2012-3-5 a

2012-3-6 b

2012-3-6 c

2012-3-7 c

2012-3-7 d

 

1.2 設計思路

  數據去重最終目標是讓原始數據出現次數超過一次數據輸出文件出現一次咱們天然而然會想到將同一個數據的全部記錄都交給一臺reduce機器,不管這個數據出現多少次,只要在最終結果中輸出一次就能夠了。具體就是reduce的輸入應該以數據做爲key,而對value-list則沒有要求。當reduce接收到一個<key,value-list>時就直接將key複製到輸出的key中,並將value設置成空值

  在MapReduce流程中,map的輸出<key,value>通過shuffle過程彙集成<key,value-list>後會交給reduce。因此從設計好的reduce輸入能夠反推出map的輸出key應爲數據,value任意。繼續反推,map輸出數據的key爲數據,而在這個實例中每一個數據表明輸入文件中的一行內容,因此map階段要完成的任務就是在採用Hadoop默認的做業輸入方式以後,將value設置爲key,並直接輸出(輸出中的value任意)。map中的結果通過shuffle過程以後交給reduce。reduce階段不會管每一個key有多少個value,它直接將輸入的key複製爲輸出的key,並輸出就能夠了(輸出中的value被設置成空了)。

1.3 程序代碼

     程序代碼以下所示:

 

package com.hebut.mr;

 

import java.io.IOException;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

 

public class Dedup {

 

    //map將輸入中的value複製到輸出數據的key上,並直接輸出

    public static class Map extends Mapper<Object,Text,Text,Text>{

        private static Text line=new Text();//每行數據

       

        //實現map函數

        public void map(Object key,Text value,Context context)

                throws IOException,InterruptedException{

            line=value;

            context.write(line, new Text(""));

        }

       

    }

   

    //reduce將輸入中的key複製到輸出數據的key上,並直接輸出

    public static class Reduce extends Reducer<Text,Text,Text,Text>{

        //實現reduce函數

        public void reduce(Text key,Iterable<Text> values,Context context)

                throws IOException,InterruptedException{

            context.write(key, new Text(""));

        }

       

    }

   

    public static void main(String[] args) throws Exception{

        Configuration conf = new Configuration();

        //這句話很關鍵

        conf.set("mapred.job.tracker", "192.168.1.2:9001");

       

        String[] ioArgs=new String[]{"dedup_in","dedup_out"};

     String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();

     if (otherArgs.length != 2) {

     System.err.println("Usage: Data Deduplication <in> <out>");

     System.exit(2);

     }

     

     Job job = new Job(conf, "Data Deduplication");

     job.setJarByClass(Dedup.class);

     

     //設置MapCombineReduce處理類

     job.setMapperClass(Map.class);

     job.setCombinerClass(Reduce.class);

     job.setReducerClass(Reduce.class);

     

     //設置輸出類型

     job.setOutputKeyClass(Text.class);

     job.setOutputValueClass(Text.class);

     

     //設置輸入和輸出目錄

     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

     System.exit(job.waitForCompletion(true) ? 0 : 1);

     }

}

 

1.4 代碼結果

     1)準備測試數據

     經過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下建立輸入文件"dedup_in"文件夾(備註:"dedup_out"不須要建立。)如圖1.4-1所示,已經成功建立。

         

圖1.4-1 建立"dedup_in"                                   圖1.4.2 上傳"file*.txt"

 

     而後在本地創建兩個txt文件,經過Eclipse上傳到"/user/hadoop/dedup_in"文件夾中,兩個txt文件的內容如"實例描述"那兩個文件同樣。如圖1.4-2所示,成功上傳以後。

     從SecureCRT遠處查看"Master.Hadoop"的也能證明咱們上傳的兩個文件。

 

 

    查看兩個文件的內容如圖1.4-3所示:

 

圖1.4-3 文件"file*.txt"內容

2)查看運行結果

     這時咱們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發現多出一個"dedup_out"文件夾,且裏面有3個文件,而後打開雙其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖1.4-4所示。

 

圖1.4-4 運行結果

 

    此時,你能夠對比一下和咱們以前預期的結果是否一致。

二、數據排序

  "數據排序"是許多實際任務執行時要完成的第一項工做,好比學生成績評比數據創建索引等。這個實例和數據去重相似,都是原始數據進行初步處理,爲進一步的數據操做打好基礎。下面進入這個示例。

2.1 實例描述

    對輸入文件中數據進行排序。輸入文件中的每行內容均爲一個數字即一個數據。要求在輸出中每行有兩個間隔的數字,其中,第一個表明原始數據在原始數據集中的位次第二個表明原始數據

    樣例輸入

    1)file1:

 

2

32

654

32

15

756

65223

 

    2)file2:

 

5956

22

650

92

 

    3)file3:

 

26

54

6

 

    樣例輸出

 

1    2

2    6

3    15

4    22

5    26

6    32

7    32

8    54

9    92

10    650

11    654

12    756

13    5956

14    65223

 

2.2 設計思路

  這個實例僅僅要求對輸入數據進行排序,熟悉MapReduce過程的讀者會很快想到在MapReduce過程當中就有排序,是否能夠利用這個默認的排序,而不須要本身再實現具體的排序呢?答案是確定的。

  可是在使用以前首先須要瞭解它的默認排序規則。它是按照key值進行排序的,若是key爲封裝int的IntWritable類型,那麼MapReduce按照數字大小對key排序,若是key爲封裝爲String的Text類型,那麼MapReduce按照字典順序對字符串排序。

  瞭解了這個細節,咱們就知道應該使用封裝int的IntWritable型數據結構了。也就是在map中將讀入的數據轉化成IntWritable型,而後做爲key值輸出(value任意)。reduce拿到<key,value-list>以後,將輸入的key做爲value輸出,並根據value-list元素個數決定輸出的次數。輸出的key(即代碼中的linenum)是一個全局變量,它統計當前key的位次。須要注意的是這個程序中沒有配置Combiner,也就是在MapReduce過程當中不使用Combiner。這主要是由於使用map和reduce就已經可以完成任務了。

2.3 程序代碼

    程序代碼以下所示:

 

package com.hebut.mr;

 

import java.io.IOException;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

 

public class Sort {

 

    //map將輸入中的value化成IntWritable類型,做爲輸出的key

    public static class Map extends

        Mapper<Object,Text,IntWritable,IntWritable>{

        private static IntWritable data=new IntWritable();

       

        //實現map函數

        public void map(Object key,Text value,Context context)

                throws IOException,InterruptedException{

            String line=value.toString();

            data.set(Integer.parseInt(line));

            context.write(data, new IntWritable(1));

        }

       

    }

   

    //reduce將輸入中的key複製到輸出數據的key上,

    //而後根據輸入的value-list中元素的個數決定key的輸出次數

    //用全局linenum來表明key的位次

    public static class Reduce extends

            Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{

       

        private static IntWritable linenum = new IntWritable(1);

       

        //實現reduce函數

        public void reduce(IntWritable key,Iterable<IntWritable> values,Context context)

                throws IOException,InterruptedException{

            for(IntWritable val:values){

                context.write(linenum, key);

                linenum = new IntWritable(linenum.get()+1);

            }

           

        }

 

    }

   

    public static void main(String[] args) throws Exception{

        Configuration conf = new Configuration();

        //這句話很關鍵

        conf.set("mapred.job.tracker", "192.168.1.2:9001");

       

        String[] ioArgs=new String[]{"sort_in","sort_out"};

     String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();

     if (otherArgs.length != 2) {

     System.err.println("Usage: Data Sort <in> <out>");

         System.exit(2);

     }

     

     Job job = new Job(conf, "Data Sort");

     job.setJarByClass(Sort.class);

     

     //設置MapReduce處理類

     job.setMapperClass(Map.class);

     job.setReducerClass(Reduce.class);

     

     //設置輸出類型

     job.setOutputKeyClass(IntWritable.class);

     job.setOutputValueClass(IntWritable.class);

     

     //設置輸入和輸出目錄

     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

     System.exit(job.waitForCompletion(true) ? 0 : 1);

     }

}

 

2.4 代碼結果

1)準備測試數據

    經過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下建立輸入文件"sort_in"文件夾(備註:"sort_out"不須要建立。)如圖2.4-1所示,已經成功建立。

              

圖2.4-1 建立"sort_in"                                                  圖2.4.2 上傳"file*.txt"

 

    而後在本地創建三個txt文件,經過Eclipse上傳到"/user/hadoop/sort_in"文件夾中,三個txt文件的內容如"實例描述"那三個文件同樣。如圖2.4-2所示,成功上傳以後。

    從SecureCRT遠處查看"Master.Hadoop"的也能證明咱們上傳的三個文件。

 

 

查看兩個文件的內容如圖2.4-3所示:

 

圖2.4-3 文件"file*.txt"內容

2)查看運行結果

    這時咱們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發現多出一個"sort_out"文件夾,且裏面有3個文件,而後打開雙其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖2.4-4所示。

 

圖2.4-4 運行結果

三、平均成績

    "平均成績"主要目的仍是在重溫經典"WordCount"例子,能夠說是在基礎上的微變化版,該實例主要就是實現一個計算學平生均成績的例子。

3.1 實例描述

  對輸入文件中數據進行就算學平生均成績。輸入文件中的每行內容均爲一個學生姓名和他相應的成績,若是有多門學科,則每門學科爲一個文件。要求在輸出中每行有兩個間隔的數據,其中,第一個表明學生的姓名第二個表明其平均成績

    樣本輸入

    1)math:

 

張三    88

李四    99

王五    66

趙六    77

 

    2)china:

 

張三    78

李四    89

王五    96

趙六    67

 

    3)english:

 

張三    80

李四    82

王五    84

趙六    86

 

    樣本輸出

 

張三    82

李四    90

王五    82

趙六    76

 

3.2 設計思路

    計算學平生均成績是一個仿"WordCount"例子,用來重溫一下開發MapReduce程序的流程。程序包括兩部分的內容:Map部分和Reduce部分,分別實現了map和reduce的功能。

    Map處理的是一個純文本文件,文件中存放的數據時每一行表示一個學生的姓名和他相應一科成績。Mapper處理的數據是由InputFormat分解過的數據集,其中InputFormat的做用是將數據集切割成小數據集InputSplit,每個InputSlit將由一個Mapper負責處理。此外,InputFormat中還提供了一個RecordReader的實現,並將一個InputSplit解析成<key,value>對提供給了map函數。InputFormat的默認值是TextInputFormat,它針對文本文件,按行將文本切割成InputSlit,並用LineRecordReader將InputSplit解析成<key,value>對,key是行在文本中的位置,value是文件中的一行。

    Map的結果會經過partion分發到Reducer,Reducer作完Reduce操做後,將經過以格式OutputFormat輸出。

    Mapper最終處理的結果對<key,value>,會送到Reducer中進行合併,合併的時候,有相同key的鍵/值對則送到同一個Reducer上。Reducer是全部用戶定製Reducer類地基礎,它的輸入是key和這個key對應的全部value的一個迭代器,同時還有Reducer的上下文。Reduce的結果由Reducer.Context的write方法輸出到文件中。

3.3 程序代碼

    程序代碼以下所示:

 

package com.hebut.mr;

 

import java.io.IOException;

import java.util.Iterator;

import java.util.StringTokenizer;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

 

public class Score {

 

    public static class Map extends

            Mapper<LongWritable, Text, Text, IntWritable> {

 

        // 實現map函數

        public void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {

            // 將輸入的純文本文件的數據轉化成String

            String line = value.toString();

 

            // 將輸入的數據首先按行進行分割

            StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");

 

            // 分別對每一行進行處理

            while (tokenizerArticle.hasMoreElements()) {

                // 每行按空格劃分

                StringTokenizer tokenizerLine = new StringTokenizer(tokenizerArticle.nextToken());

 

                String strName = tokenizerLine.nextToken();// 學生姓名部分

                String strScore = tokenizerLine.nextToken();// 成績部分

 

                Text name = new Text(strName);

                int scoreInt = Integer.parseInt(strScore);

                // 輸出姓名和成績

                context.write(name, new IntWritable(scoreInt));

            }

        }

 

    }

 

    public static class Reduce extends

            Reducer<Text, IntWritable, Text, IntWritable> {

        // 實現reduce函數

        public void reduce(Text key, Iterable<IntWritable> values,

                Context context) throws IOException, InterruptedException {

 

            int sum = 0;

            int count = 0;

 

            Iterator<IntWritable> iterator = values.iterator();

            while (iterator.hasNext()) {

                sum += iterator.next().get();// 計算總分

                count++;// 統計總的科目數

            }

 

            int average = (int) sum / count;// 計算平均成績

            context.write(key, new IntWritable(average));

        }

 

    }

 

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        // 這句話很關鍵

        conf.set("mapred.job.tracker", "192.168.1.2:9001");

 

        String[] ioArgs = new String[] { "score_in", "score_out" };

        String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();

        if (otherArgs.length != 2) {

            System.err.println("Usage: Score Average <in> <out>");

            System.exit(2);

        }

 

        Job job = new Job(conf, "Score Average");

        job.setJarByClass(Score.class);

 

        // 設置MapCombineReduce處理類

        job.setMapperClass(Map.class);

        job.setCombinerClass(Reduce.class);

        job.setReducerClass(Reduce.class);

 

        // 設置輸出類型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

 

        // 將輸入的數據集分割成小數據塊splites,提供一個RecordReder的實現

        job.setInputFormatClass(TextInputFormat.class);

        // 提供一個RecordWriter的實現,負責數據輸出

        job.setOutputFormatClass(TextOutputFormat.class);

 

        // 設置輸入和輸出目錄

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

 

3.4 代碼結果

1)準備測試數據

    經過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下建立輸入文件"score_in"文件夾(備註:"score_out"不須要建立。)如圖3.4-1所示,已經成功建立。

 

            

圖3.4-1 建立"score_in"                                                       圖3.4.2 上傳三門分數

 

    而後在本地創建三個txt文件,經過Eclipse上傳到"/user/hadoop/score_in"文件夾中,三個txt文件的內容如"實例描述"那三個文件同樣。如圖3.4-2所示,成功上傳以後。

    備註:文本文件的編碼爲"UTF-8",默認爲"ANSI",能夠另存爲時選擇,否則中文會出現亂碼

    從SecureCRT遠處查看"Master.Hadoop"的也能證明咱們上傳的三個文件。

 

 

查看三個文件的內容如圖3.4-3所示:

 

圖3.4.3 三門成績的內容

2)查看運行結果

    這時咱們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發現多出一個"score_out"文件夾,且裏面有3個文件,而後打開雙其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖3.4-4所示。

 

圖3.4-4 運行結果

四、單表關聯

    前面的實例都是在數據上進行一些簡單的處理,爲進一步的操做打基礎。"單表關聯"這個實例要求給出的數據尋找關心的數據,它是對原始數據所包含信息的挖掘。下面進入這個實例。

4.1 實例描述

    實例中給出child-parent(孩子——父母)表,要求輸出grandchild-grandparent(孫子——爺奶)表。

    樣例輸入以下所示。

    file:

 

child        parent

Tom        Lucy

Tom        Jack

Jone        Lucy

Jone        Jack

Lucy        Mary

Lucy        Ben

Jack        Alice

Jack        Jesse

Terry        Alice

Terry        Jesse

Philip        Terry

Philip        Alma

Mark        Terry

Mark        Alma

 

    家族樹狀關係譜:

 

 image

圖4.2-1 家族譜

    樣例輸出以下所示。

    file:

 

grandchild        grandparent

Tom              Alice

Tom              Jesse

Jone              Alice

Jone              Jesse

Tom              Mary

Tom              Ben

Jone              Mary

Jone              Ben

Philip              Alice

Philip              Jesse

Mark              Alice

Mark              Jesse

 

4.2 設計思路

       分析這個實例,顯然須要進行單錶鏈接,鏈接的是左表parent列和右表child列,且左表右表同一個表

  鏈接結果除去鏈接的兩列就是所須要的結果——"grandchild--grandparent"表。要用MapReduce解決這個實例,首先應該考慮如何實現自鏈接其次就是鏈接列設置最後結果整理

      考慮到MapReduce的shuffle過程會將相同的key會鏈接在一塊兒,因此能夠將map結果的key設置成待鏈接,而後列中相同的值就天然會鏈接在一塊兒了。再與最開始的分析聯繫起來:

  要鏈接的是左表的parent列和右表的child列,且左表和右表是同一個表,因此在map階段讀入數據分割childparent以後,會將parent設置成keychild設置成value進行輸出,並做爲左表再將同一對childparent中的child設置成keyparent設置成value進行輸出,做爲右表。爲了區分輸出中的左右表,須要在輸出的value加上左右表信息,好比在value的String最開始處加上字符1表示左表,加上字符2表示右表。這樣在map的結果中就造成了左表和右表,而後在shuffle過程當中完成鏈接。reduce接收到鏈接的結果,其中每一個key的value-list就包含了"grandchild--grandparent"關係。取出每一個key的value-list進行解析,將左表中的child放入一個數組右表中的parent放入一個數組,而後對兩個數組求笛卡爾積就是最後的結果了。

4.3 程序代碼

    程序代碼以下所示。

 

package com.hebut.mr;

 

import java.io.IOException;

import java.util.*;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

 

public class STjoin {

 

    public static int time = 0;

 

    /*

     * map將輸出分割childparent,而後正序輸出一次做爲右表,

     * 反序輸出一次做爲左表,須要注意的是在輸出的value中必須

     * 加上左右表的區別標識。

     */

    public static class Map extends Mapper<Object, Text, Text, Text> {

 

        // 實現map函數

        public void map(Object key, Text value, Context context)

                throws IOException, InterruptedException {

            String childname = new String();// 孩子名稱

            String parentname = new String();// 父母名稱

            String relationtype = new String();// 左右表標識

 

            // 輸入的一行預處理文本

            StringTokenizer itr=new StringTokenizer(value.toString());

            String[] values=new String[2];

            int i=0;

            while(itr.hasMoreTokens()){

                values[i]=itr.nextToken();

                i++;

            }

           

            if (values[0].compareTo("child") != 0) {

                childname = values[0];

                parentname = values[1];

 

                // 輸出左表

                relationtype = "1";

                context.write(new Text(values[1]), new Text(relationtype +

                        "+"+ childname + "+" + parentname));

 

                // 輸出右表

                relationtype = "2";

                context.write(new Text(values[0]), new Text(relationtype +

                        "+"+ childname + "+" + parentname));

            }

        }

 

    }

 

    public static class Reduce extends Reducer<Text, Text, Text, Text> {

 

        // 實現reduce函數

        public void reduce(Text key, Iterable<Text> values, Context context)

                throws IOException, InterruptedException {

 

            // 輸出表頭

            if (0 == time) {

                context.write(new Text("grandchild"), new Text("grandparent"));

                time++;

            }

 

            int grandchildnum = 0;

            String[] grandchild = new String[10];

            int grandparentnum = 0;

            String[] grandparent = new String[10];

 

            Iterator ite = values.iterator();

            while (ite.hasNext()) {

                String record = ite.next().toString();

                int len = record.length();

                int i = 2;

                if (0 == len) {

                    continue;

                }

 

                // 取得左右表標識

                char relationtype = record.charAt(0);

                // 定義孩子和父母變量

                String childname = new String();

                String parentname = new String();

 

                // 獲取value-listvaluechild

                while (record.charAt(i) != '+') {

                    childname += record.charAt(i);

                    i++;

                }

 

                i = i + 1;

 

                // 獲取value-listvalueparent

                while (i < len) {

                    parentname += record.charAt(i);

                    i++;

                }

 

                // 左表,取出child放入grandchildren

                if ('1' == relationtype) {

                    grandchild[grandchildnum] = childname;

                    grandchildnum++;

                }

 

                // 右表,取出parent放入grandparent

                if ('2' == relationtype) {

                    grandparent[grandparentnum] = parentname;

                    grandparentnum++;

                }

            }

 

            // grandchildgrandparent數組求笛卡爾兒積

            if (0 != grandchildnum && 0 != grandparentnum) {

                for (int m = 0; m < grandchildnum; m++) {

                    for (int n = 0; n < grandparentnum; n++) {

                        // 輸出結果

                        context.write(new Text(grandchild[m]), new Text(grandparent[n]));

                    }

                }

            }

        }

    }

 

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        // 這句話很關鍵

        conf.set("mapred.job.tracker", "192.168.1.2:9001");

 

        String[] ioArgs = new String[] { "STjoin_in", "STjoin_out" };

        String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();

        if (otherArgs.length != 2) {

            System.err.println("Usage: Single Table Join <in> <out>");

            System.exit(2);

        }

 

        Job job = new Job(conf, "Single Table Join");

        job.setJarByClass(STjoin.class);

 

        // 設置MapReduce處理類

        job.setMapperClass(Map.class);

        job.setReducerClass(Reduce.class);

 

        // 設置輸出類型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

 

        // 設置輸入和輸出目錄

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

 

4.4 代碼結果

1)準備測試數據

    經過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下建立輸入文件"STjoin_in"文件夾(備註:"STjoin_out"不須要建立。)如圖4.4-1所示,已經成功建立。

 

                  

圖4.4-1 建立"STjoin_in"                                       圖4.4.2 上傳"child-parent"表

 

    而後在本地創建一個txt文件,經過Eclipse上傳到"/user/hadoop/STjoin_in"文件夾中,一個txt文件的內容如"實例描述"那個文件同樣。如圖4.4-2所示,成功上傳以後。

    從SecureCRT遠處查看"Master.Hadoop"的也能證明咱們上傳的文件,顯示其內容如圖4.4-3所示:

 

圖4.4-3 表"child-parent"內容

    2)運行詳解

    (1)Map處理:

    map函數輸出結果以下所示。

 

child        parent                àà                    忽略此行

Tom        Lucy                   àà                <Lucy,1+Tom+Lucy>

                                                    <Tom,2+Tom+Lucy >

Tom        Jack                    àà                <Jack,1+Tom+Jack>

                                                    <Tom,2+Tom+Jack>

Jone        Lucy                 àà                <Lucy,1+Jone+Lucy>

                                                    <Jone,2+Jone+Lucy>

Jone        Jack                    àà                <Jack,1+Jone+Jack>

                                                    <Jone,2+Jone+Jack>

Lucy        Mary                   àà                <Mary,1+Lucy+Mary>

                                                    <Lucy,2+Lucy+Mary>

Lucy        Ben                    àà                <Ben,1+Lucy+Ben>

                                                     <Lucy,2+Lucy+Ben>

Jack        Alice                    àà                <Alice,1+Jack+Alice>

                                                      <Jack,2+Jack+Alice>

Jack        Jesse                   àà                <Jesse,1+Jack+Jesse>

                                                      <Jack,2+Jack+Jesse>

Terry        Alice                   àà                <Alice,1+Terry+Alice>

                                                      <Terry,2+Terry+Alice>

Terry        Jesse                  àà                <Jesse,1+Terry+Jesse>

                                                      <Terry,2+Terry+Jesse>

Philip        Terry                  àà                <Terry,1+Philip+Terry>

                                                      <Philip,2+Philip+Terry>

Philip        Alma                   àà                <Alma,1+Philip+Alma>

                                                      <Philip,2+Philip+Alma>

Mark        Terry                   àà                <Terry,1+Mark+Terry>

                                                      <Mark,2+Mark+Terry>

Mark        Alma                 àà                <Alma,1+Mark+Alma>

                                                      <Mark,2+Mark+Alma>

 

    (2)Shuffle處理

    在shuffle過程當中完成鏈接。

 

map函數輸出

排序結果

shuffle鏈接

<Lucy1+Tom+Lucy>

<Tom2+Tom+Lucy>

<Jack1+Tom+Jack>

<Tom2+Tom+Jack>

<Lucy1+Jone+Lucy>

<Jone2+Jone+Lucy>

<Jack1+Jone+Jack>

<Jone2+Jone+Jack>

<Mary1+Lucy+Mary>

<Lucy2+Lucy+Mary>

<Ben1+Lucy+Ben>

<Lucy2+Lucy+Ben>

<Alice1+Jack+Alice>

<Jack2+Jack+Alice>

<Jesse1+Jack+Jesse>

<Jack2+Jack+Jesse>

<Alice1+Terry+Alice>

<Terry2+Terry+Alice>

<Jesse1+Terry+Jesse>

<Terry2+Terry+Jesse>

<Terry1+Philip+Terry>

<Philip2+Philip+Terry>

<Alma1+Philip+Alma>

<Philip2+Philip+Alma>

<Terry1+Mark+Terry>

<Mark2+Mark+Terry>

<Alma1+Mark+Alma>

<Mark2+Mark+Alma>

<Alice1+Jack+Alice>

<Alice1+Terry+Alice>

<Alma1+Philip+Alma>

<Alma1+Mark+Alma>

<Ben1+Lucy+Ben>

<Jack1+Tom+Jack>

<Jack1+Jone+Jack>

<Jack2+Jack+Alice>

<Jack2+Jack+Jesse>

<Jesse1+Jack+Jesse>

<Jesse1+Terry+Jesse>

<Jone2+Jone+Lucy>

<Jone2+Jone+Jack>

<Lucy1+Tom+Lucy>

<Lucy1+Jone+Lucy>

<Lucy2+Lucy+Mary>

<Lucy2+Lucy+Ben>

<Mary1+Lucy+Mary>

<Mark2+Mark+Terry>

<Mark2+Mark+Alma>

<Philip2+Philip+Terry>

<Philip2+Philip+Alma>

<Terry2+Terry+Alice>

<Terry2+Terry+Jesse>

<Terry1+Philip+Terry>

<Terry1+Mark+Terry>

<Tom2+Tom+Lucy>

<Tom2+Tom+Jack>

<Alice1+Jack+Alice

        1+Terry+Alice 

        1+Philip+Alma

        1+Mark+Alma >

<Ben1+Lucy+Ben>

<Jack1+Tom+Jack

        1+Jone+Jack

        2+Jack+Alice

        2+Jack+Jesse >

<Jesse1+Jack+Jesse

        1+Terry+Jesse >

<Jone2+Jone+Lucy

        2+Jone+Jack>

<Lucy1+Tom+Lucy

        1+Jone+Lucy

        2+Lucy+Mary

        2+Lucy+Ben>

<Mary1+Lucy+Mary

        2+Mark+Terry

        2+Mark+Alma>

<Philip2+Philip+Terry

        2+Philip+Alma>

<Terry2+Terry+Alice

        2+Terry+Jesse

        1+Philip+Terry

        1+Mark+Terry>

<Tom2+Tom+Lucy

        2+Tom+Jack>

 

    (3)Reduce處理

    首先由語句"0 != grandchildnum && 0 != grandparentnum"得知,只要在"value-list"中沒有左表或者右表,則不會作處理,能夠根據這條規則去除無效shuffle鏈接

 

無效shuffle鏈接

有效shuffle鏈接

<Alice1+Jack+Alice

        1+Terry+Alice 

        1+Philip+Alma

        1+Mark+Alma >

<Ben1+Lucy+Ben>

<Jesse1+Jack+Jesse

        1+Terry+Jesse >

<Jone2+Jone+Lucy

        2+Jone+Jack>

<Mary1+Lucy+Mary

        2+Mark+Terry

        2+Mark+Alma>

<Philip2+Philip+Terry

        2+Philip+Alma>

<Tom2+Tom+Lucy

        2+Tom+Jack>

<Jack1+Tom+Jack

        1+Jone+Jack

        2+Jack+Alice

        2+Jack+Jesse >

<Lucy1+Tom+Lucy

        1+Jone+Lucy

        2+Lucy+Mary

        2+Lucy+Ben>

<Terry2+Terry+Alice

        2+Terry+Jesse

        1+Philip+Terry

        1+Mark+Terry>

    而後根據下面語句進一步對有效的shuffle鏈接作處理。

 

// 左表,取出child放入grandchildren

if ('1' == relationtype) {

    grandchild[grandchildnum] = childname;

    grandchildnum++;

}

 

// 右表,取出parent放入grandparent

if ('2' == relationtype) {

    grandparent[grandparentnum] = parentname;

    grandparentnum++;

}

 

    針對一條數據進行分析:

 

<Jack,1+Tom+Jack,

        1+Jone+Jack,

        2+Jack+Alice,

        2+Jack+Jesse >

 

    分析結果左表用"字符1"表示,右表用"字符2"表示,上面的<key,value-list>中的"key"表示左表與右表鏈接鍵。而"value-list"表示以"key"鏈接左表與右表相關數據

    根據上面針對左表與右表不一樣的處理規則,取得兩個數組的數據以下所示:

 

grandchild

TomJonegrandchild[grandchildnum] = childname;

grandparent

AliceJessegrandparent[grandparentnum] = parentname;

    

    而後根據下面語句進行處理。

 

for (int m = 0; m < grandchildnum; m++) {

    for (int n = 0; n < grandparentnum; n++) {

        context.write(new Text(grandchild[m]), new Text(grandparent[n]));

    }

}

 

image  

 

處理結果以下面所示:

 

Tom        Jesse

Tom        Alice

Jone        Jesse

Jone        Alice 

    其餘的有效shuffle鏈接處理都是如此

3)查看運行結果

    這時咱們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發現多出一個"STjoin_out"文件夾,且裏面有3個文件,而後打開雙其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖4.4-4所示。

 

圖4.4-4 運行結果

五、多表關聯

    多表關聯和單表關聯相似,它也是經過對原始數據進行必定的處理,從其中挖掘出關心的信息。下面進入這個實例。

5.1 實例描述

    輸入是兩個文件,一個表明工廠表,包含工廠名列和地址編號列;另外一個表明地址表,包含地址名列和地址編號列。要求從輸入數據中找出工廠名地址名對應關係,輸出"工廠名——地址名"表。

    樣例輸入以下所示。

    1)factory:

 

factoryname                    addressed

Beijing Red Star                    1

Shenzhen Thunder                3

Guangzhou Honda                2

Beijing Rising                       1

Guangzhou Development Bank      2

Tencent                        3

Back of Beijing                     1

 

    2)address:

 

addressID    addressname

1            Beijing

2            Guangzhou

3            Shenzhen

4            Xian

 

    樣例輸出以下所示。

 

factoryname                        addressname

Back of Beijing                          Beijing

Beijing Red Star                        Beijing

Beijing Rising                          Beijing

Guangzhou Development Bank          Guangzhou

Guangzhou Honda                    Guangzhou

Shenzhen Thunder                    Shenzhen

Tencent                            Shenzhen

 

5.2 設計思路

    多表關聯和單表關聯類似,都相似於數據庫中的天然鏈接。相比單表關聯,多表關聯的左右表和鏈接列更加清楚。因此能夠採用和單表關聯的相同處理方式,map識別出輸入的行屬於哪一個表以後,對其進行分割,將鏈接的列值保存在key中,另外一列和左右表標識保存在value中,而後輸出。reduce拿到鏈接結果以後,解析value內容,根據標誌將左右表內容分開存放,而後求笛卡爾積,最後直接輸出。

    這個實例的具體分析參考單表關聯實例。下面給出代碼。

5.3 程序代碼

    程序代碼以下所示:

 

package com.hebut.mr;

 

import java.io.IOException;

import java.util.*;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

 

public class MTjoin {

 

    public static int time = 0;

 

    /*

     * map中先區分輸入行屬於左表仍是右表,而後對兩列值進行分割,

     * 保存鏈接列在key值,剩餘列和左右表標誌在value中,最後輸出

     */

    public static class Map extends Mapper<Object, Text, Text, Text> {

 

        // 實現map函數

        public void map(Object key, Text value, Context context)

                throws IOException, InterruptedException {

            String line = value.toString();// 每行文件

            String relationtype = new String();// 左右表標識

 

            // 輸入文件首行,不處理

            if (line.contains("factoryname") == true

                    || line.contains("addressed") == true) {

                return;

            }

 

            // 輸入的一行預處理文本

            StringTokenizer itr = new StringTokenizer(line);

            String mapkey = new String();

            String mapvalue = new String();

            int i = 0;

            while (itr.hasMoreTokens()) {

                // 先讀取一個單詞

                String token = itr.nextToken();

                // 判斷該地址ID就把存到"values[0]"

                if (token.charAt(0) >= '0' && token.charAt(0) <= '9') {

                    mapkey = token;

                    if (i > 0) {

                        relationtype = "1";

                    } else {

                        relationtype = "2";

                    }

                    continue;

                }

 

                // 存工廠名

                mapvalue += token + " ";

                i++;

            }

 

            // 輸出左右表

            context.write(new Text(mapkey), new Text(relationtype + "+"+ mapvalue));

        }

    }

 

    /*

     * reduce解析map輸出,將value中數據按照左右表分別保存,

  * 而後求出笛卡爾積,並輸出。

     */

    public static class Reduce extends Reducer<Text, Text, Text, Text> {

 

        // 實現reduce函數

        public void reduce(Text key, Iterable<Text> values, Context context)

                throws IOException, InterruptedException {

 

            // 輸出表頭

            if (0 == time) {

                context.write(new Text("factoryname"), new Text("addressname"));

                time++;

            }

 

            int factorynum = 0;

            String[] factory = new String[10];

            int addressnum = 0;

            String[] address = new String[10];

 

            Iterator ite = values.iterator();

            while (ite.hasNext()) {

                String record = ite.next().toString();

                int len = record.length();

                int i = 2;

                if (0 == len) {

                    continue;

                }

 

                // 取得左右表標識

                char relationtype = record.charAt(0);

 

                // 左表

                if ('1' == relationtype) {

                    factory[factorynum] = record.substring(i);

                    factorynum++;

                }

 

                // 右表

                if ('2' == relationtype) {

                    address[addressnum] = record.substring(i);

                    addressnum++;

                }

            }

 

            // 求笛卡爾積

            if (0 != factorynum && 0 != addressnum) {

                for (int m = 0; m < factorynum; m++) {

                    for (int n = 0; n < addressnum; n++) {

                        // 輸出結果

                        context.write(new Text(factory[m]),

                                new Text(address[n]));

                    }

                }

            }

 

        }

    }

 

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        // 這句話很關鍵

        conf.set("mapred.job.tracker", "192.168.1.2:9001");

 

        String[] ioArgs = new String[] { "MTjoin_in", "MTjoin_out" };

        String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();

        if (otherArgs.length != 2) {

            System.err.println("Usage: Multiple Table Join <in> <out>");

            System.exit(2);

        }

 

        Job job = new Job(conf, "Multiple Table Join");

        job.setJarByClass(MTjoin.class);

 

        // 設置MapReduce處理類

        job.setMapperClass(Map.class);

        job.setReducerClass(Reduce.class);

 

        // 設置輸出類型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

 

        // 設置輸入和輸出目錄

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

 

5.4 代碼結果

1)準備測試數據

    經過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下建立輸入文件"MTjoin_in"文件夾(備註:"MTjoin_out"不須要建立。)如圖5.4-1所示,已經成功建立。

 

                 

圖5.4-1 建立"MTjoin_in"                                                             圖5.4.2 上傳兩個數據表

 

    而後在本地創建兩個txt文件,經過Eclipse上傳到"/user/hadoop/MTjoin_in"文件夾中,兩個txt文件的內容如"實例描述"那兩個文件同樣。如圖5.4-2所示,成功上傳以後。

    從SecureCRT遠處查看"Master.Hadoop"的也能證明咱們上傳的兩個文件。

 

圖5.4.3 兩個數據表的內容

2)查看運行結果

    這時咱們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發現多出一個"MTjoin_out"文件夾,且裏面有3個文件,而後打開雙其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖5.4-4所示。

 

圖5.4-4 運行結果

六、倒排索引

    "倒排索引"是文檔檢索系統最經常使用數據結構,被普遍地應用於全文搜索引擎。它主要是用來存儲某個單詞(或詞組)一個文檔或一組文檔中的存儲位置映射,即提供了一種根據內容來查找文檔方式。因爲不是根據文檔來肯定文檔所包含的內容,而是進行相反的操做,於是稱爲倒排索引(Inverted Index)。

6.1 實例描述

    一般狀況下,倒排索引由一個單詞(或詞組)以及相關的文檔列表組成,文檔列表中的文檔或者是標識文檔的ID號,或者是指文檔所在位置的URL,如圖6.1-1所示。

 image

圖6.1-1 倒排索引結構

    從圖6.1-1能夠看出,單詞1出如今{文檔1,文檔4,文檔13,……}中,單詞2出如今{文檔3,文檔5,文檔15,……}中,而單詞3出如今{文檔1,文檔8,文檔20,……}中。在實際應用中,還須要每一個文檔添加一個權值,用來指出每一個文檔與搜索內容的相關度,如圖6.1-2所示。

 

 image

圖6.1-2 添加權重的倒排索引

    最經常使用的是使用詞頻做爲權重,即記錄單詞在文檔中出現的次數。以英文爲例,如圖6.1-3所示,索引文件中的"MapReduce"一行表示:"MapReduce"這個單詞在文本T0中出現過1次,T1中出現過1次,T2中出現過2次。當搜索條件爲"MapReduce"、"is"、"Simple"時,對應的集合爲:{T0,T1,T2}∩{T0,T1}∩{T0,T1}={T0,T1},即文檔T0和T1包含了所要索引的單詞,並且只有T0是連續的。

 

 image

圖6.1-3 倒排索引示例

    更復雜的權重還可能要記錄單詞在多少個文檔中出現過,以實現TF-IDF(Term Frequency-Inverse Document Frequency)算法,或者考慮單詞在文檔中的位置信息(單詞是否出如今標題中,反映了單詞在文檔中的重要性)等。

    樣例輸入以下所示。

    1)file1:

 

MapReduce is simple

 

    2)file2:

 

MapReduce is powerful is simple

 

    3)file3:

 

Hello MapReduce bye MapReduce

 

    樣例輸出以下所示。

 

MapReduce      file1.txt:1;file2.txt:1;file3.txt:2;

is            file1.txt:1;file2.txt:2;

simple           file1.txt:1;file2.txt:1;

powerful      file2.txt:1;

Hello          file3.txt:1;

bye            file3.txt:1;

 

6.2 設計思路

    實現"倒排索引"只要關注的信息爲:單詞文檔URL詞頻,如圖3-11所示。可是在實現過程當中,索引文件的格式與圖6.1-3會略有所不一樣,以免重寫OutPutFormat類。下面根據MapReduce的處理過程給出倒排索引設計思路

    1)Map過程

    首先使用默認的TextInputFormat類對輸入文件進行處理,獲得文本中每行偏移量及其內容。顯然,Map過程首先必須分析輸入的<key,value>對,獲得倒排索引中須要的三個信息:單詞、文檔URL和詞頻,如圖6.2-1所示。

 image

圖6.2-1 Map過程輸入/輸出

 

  這裏存在兩個問題第一,<key,value>對只能有兩個值,在不使用Hadoop自定義數據類型的狀況下,須要根據狀況將其中兩個值合併成一個值,做爲key或value值;第二,經過一個Reduce過程沒法同時完成詞頻統計生成文檔列表,因此必須增長一個Combine過程完成詞頻統計

    這裏講單詞和URL組成key值(如"MapReduce:file1.txt"),將詞頻做爲value,這樣作的好處是能夠利用MapReduce框架自帶的Map端排序,將同一文檔相同單詞詞頻組成列表,傳遞給Combine過程,實現相似於WordCount的功能。

    2)Combine過程

    通過map方法處理後,Combine過程將key值相同的value值累加,獲得一個單詞在文檔在文檔中的詞頻,如圖6.2-2所示。若是直接將圖6.2-2所示的輸出做爲Reduce過程的輸入,在Shuffle過程時將面臨一個問題:全部具備相同單詞的記錄(由單詞、URL和詞頻組成)應該交由同一個Reducer處理,但當前的key值沒法保證這一點,因此必須修改key值和value值。此次將單詞做爲key值,URL和詞頻組value值(如"file1.txt:1")。這樣作的好處是能夠利用MapReduce框架默認的HashPartitioner類完成Shuffle過程,將相同單詞全部記錄發送給同一個Reducer進行處理

 

 image

圖6.2-2 Combine過程輸入/輸出

    3)Reduce過程

    通過上述兩個過程後,Reduce過程只需將相同key值的value值組合成倒排索引文件所需的格式便可,剩下的事情就能夠直接交給MapReduce框架進行處理了。如圖6.2-3所示。索引文件的內容除分隔符外與圖6.1-3解釋相同。

    4)須要解決的問題

    本實例設計的倒排索引在文件數目沒有限制,可是單詞文件不宜過大(具體值與默認HDFS塊大小及相關配置有關),要保證每一個文件對應一個split。不然,因爲Reduce過程沒有進一步統計詞頻,最終結可能出現詞頻未統計徹底單詞。能夠經過重寫InputFormat類將每一個文件爲一個split,避免上述狀況。或者執行兩次MapReduce第一次MapReduce用於統計詞頻第二次MapReduce用於生成倒排索引。除此以外,還能夠利用複合鍵值對等實現包含更多信息的倒排索引。

 

 image

圖6.2-3 Reduce過程輸入/輸出

6.3 程序代碼

  程序代碼以下所示:

 

package com.hebut.mr;

 

import java.io.IOException;

import java.util.StringTokenizer;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

 

public class InvertedIndex {

 

    public static class Map extends Mapper<Object, Text, Text, Text> {

 

        private Text keyInfo = new Text(); // 存儲單詞和URL組合

        private Text valueInfo = new Text(); // 存儲詞頻

        private FileSplit split; // 存儲Split對象

 

        // 實現map函數

        public void map(Object key, Text value, Context context)

                throws IOException, InterruptedException {

 

            // 得到<key,value>對所屬的FileSplit對象

            split = (FileSplit) context.getInputSplit();

 

            StringTokenizer itr = new StringTokenizer(value.toString());

 

            while (itr.hasMoreTokens()) {

                // key值由單詞和URL組成,如"MapReducefile1.txt"

                // 獲取文件的完整路徑

                // keyInfo.set(itr.nextToken()+":"+split.getPath().toString());

                // 這裏爲了好看,只獲取文件的名稱。

                int splitIndex = split.getPath().toString().indexOf("file");

                keyInfo.set(itr.nextToken() + ":"

                    + split.getPath().toString().substring(splitIndex));

                // 詞頻初始化爲1

                valueInfo.set("1");

 

                context.write(keyInfo, valueInfo);

            }

        }

    }

 

    public static class Combine extends Reducer<Text, Text, Text, Text> {

 

        private Text info = new Text();

 

        // 實現reduce函數

        public void reduce(Text key, Iterable<Text> values, Context context)

                throws IOException, InterruptedException {

 

            // 統計詞頻

            int sum = 0;

            for (Text value : values) {

                sum += Integer.parseInt(value.toString());

            }

 

            int splitIndex = key.toString().indexOf(":");

            // 從新設置value值由URL和詞頻組成

            info.set(key.toString().substring(splitIndex + 1) + ":" + sum);

            // 從新設置key值爲單詞

            key.set(key.toString().substring(0, splitIndex));

 

            context.write(key, info);

        }

    }

 

    public static class Reduce extends Reducer<Text, Text, Text, Text> {

 

        private Text result = new Text();

 

        // 實現reduce函數

        public void reduce(Text key, Iterable<Text> values, Context context)

                throws IOException, InterruptedException {

 

            // 生成文檔列表

            String fileList = new String();

            for (Text value : values) {

                fileList += value.toString() + ";";

            }

 

            result.set(fileList);

 

            context.write(key, result);

        }

    }

 

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        // 這句話很關鍵

        conf.set("mapred.job.tracker", "192.168.1.2:9001");

 

        String[] ioArgs = new String[] { "index_in", "index_out" };

        String[] otherArgs = new GenericOptionsParser(conf, ioArgs)

                .getRemainingArgs();

        if (otherArgs.length != 2) {

            System.err.println("Usage: Inverted Index <in> <out>");

            System.exit(2);

        }

 

        Job job = new Job(conf, "Inverted Index");

        job.setJarByClass(InvertedIndex.class);

 

        // 設置MapCombineReduce處理類

        job.setMapperClass(Map.class);

        job.setCombinerClass(Combine.class);

        job.setReducerClass(Reduce.class);

 

        // 設置Map輸出類型

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(Text.class);

 

        // 設置Reduce輸出類型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

 

        // 設置輸入和輸出目錄

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

 

6.4 代碼結果

1)準備測試數據

    經過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下建立輸入文件"index_in"文件夾(備註:"index_out"不須要建立。)如圖6.4-1所示,已經成功建立。

 

                

圖6.4-1 建立"index_in"                                             圖6.4.2 上傳"file*.txt"

 

    而後在本地創建三個txt文件,經過Eclipse上傳到"/user/hadoop/index_in"文件夾中,三個txt文件的內容如"實例描述"那三個文件同樣。如圖6.4-2所示,成功上傳以後。

    從SecureCRT遠處查看"Master.Hadoop"的也能證明咱們上傳的三個文件。

 

圖6.4.3 三個"file*.txt"的內容

2)查看運行結果

    這時咱們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發現多出一個"index_out"文件夾,且裏面有3個文件,而後打開雙其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖6.4-4所示。

 

圖6.4-4 運行結果

 

 

  文章下載地址:http://files.cnblogs.com/xia520pi/HadoopCluster_Vol.9.rar

相關文章
相關標籤/搜索