Hadoop大數據開發基礎系列:4、MapReduce初級編程

第四章、MapReduce編程入門

目錄結構html

1.使用Eclipse創建MapReduce工程java

    1.1 下載與安裝Eclipseapache

    1.2 配置MapReduce環境編程

    1.3 新建MapReduce工程數組

2.經過源碼初識MapReduce工程bash

    2.1 通俗理解MapReduce原理oracle

    2.2 瞭解MR實現詞頻統計的執行流程app

    2.3 讀懂官方提供的WordCount源碼分佈式

3.編程實現按日期統計訪問次數函數

    3.1 分析思路與處理邏輯

    3.2 編寫核心模塊代碼

    3.3 任務實現

4.編程實現按訪問次數排序

    4.1 分析思路與處理邏輯

    4.2 編寫核心模塊代碼

    4.3 任務實現

5.小結

6.實訓

    實訓1.獲取成績表的最高分記錄

    實訓2.對兩個文件中的數據進行合併和去重

7.課後練習


背景:某社交網站通過幾年的發展,註冊用戶超過1000萬,其中付費用戶(VIP)佔用戶總數的0.1%。網站運營方的重點之一是向付費用戶提供更加優質的服務,必須根據服務對象的特色設計有針對性的服務方案。須要對付費用戶訪問網站的數據分析,這是一項很是重要的工做任務。這個任務由如下幾個階段來詳細展開:

1.使用Eclipse創建MapReduce工程

    具體參考: https://blog.csdn.net/hehe_soft_engineer/article/details/102147721

2.經過源碼初識MapReduce工程

此部分,目的是對MapReduce的核心模塊 Mapper與Reducer的執行流程有必定的認識。經過學習wordcount的源碼來了解一下。

    2.1 通俗理解MapReduce原理

        (1)MapReduce包括Mapper模塊和Reducer模塊,MapReduce能夠看作是一個專業處理大數據的工程隊,主要由下面成員構成:

            ①Mapper:映射器 ②Mapper助理InputFormat:輸入文件讀取器

            ③Shuffle:運輸隊 ④Shuffle助理Sorter:排序器 

            ⑤Reducer:歸約器 ⑥Reducer助理OutputFormat:輸出結果寫入器

        (2)簡化的MapReduce處理流程圖:

            ①數據切片:系統把數據分給多個Mapper來處理,經過數據分片的形式,這是分佈式計算的第一步。

            ②數據映射:分片完成後,Mapper助理InputFormat從文件輸入目錄讀取數據,再由Mapper對數據進行解析,組織成爲新的格式(鍵值對形式),最後Mapper將處理好的數據輸出,等待shuffle運輸隊取走結果。

            ③數據混洗:shuffle運輸隊把獲取的結果按照相同的鍵(Key)進行聚集,再把結果送到Shuffle助理Sorter處,由Sorter負責對這些結果排序,,而後提交給Reducer。

            ④數據歸約:Reducer收到數據後,將結果進行彙總與映射工做,獲得最終的計算結果,最後由Reducer助理OutputFormat將結果輸出到指定位置處。

    2.2 瞭解MR實現詞頻統計的執行流程

        下面舉一個實例來講明一下Map和Reduce過程

 

輸入

輸出

Hello World Our World

BigData               2

Hello BigData Real BigData

Great               1

Hello Hadoop Great Hadoop

Hadoop               3

Hadoop MapReduce

Hello               3

 

MapReduce               1

 

Our               1

 

Real               1

 

World               2

        (1)Map任務的處理過程

        (2)Reduce任務的處理過程

    2.3 讀懂官方提供的WordCount源碼

        要編寫數據處理程序,還要參考MapReduce編程的具體規範,下面進行代碼級別的分析和說明:

        在「D:\tools\hadoop-2.7.7\share\hadoop\mapreduce\sources」目錄下找到「hadoop-mapreduce-examples-2.7.7-sources.jar」,解壓縮此文件,在子目錄「\org\apache\hadoop\examples」看到WordCount文件,這就是WordCount程序的源代碼。

        從結構上能夠分爲3部分,分別是應用程序Driver、Mapper模塊與Reducer模塊

        (1)應用程序Driver分析

            這裏的Driver程序主要是指的main函數,在main函數裏面進行MapReduce程序的一些初始化設置,並提交任務,等待程序運行完成。

            基本上是這樣的格式,在此基礎上只須要修改部分參數便可。

        (2)Mapper模式分析

        (3)Reducer模式分析

        (4)歸納地講:

            進行MapReduce編程時,開發者主要處理的是Mapper和Reducer兩個模塊,其中包括定義輸入輸出的鍵值對格式、編寫map與reduce函數中定義的處理邏輯等。

3.編程實現按日期統計訪問次數

    本部分任務目標是統計用戶在2016年每一個天然日的總訪問次數。原始數據文件中提供了用戶名稱與訪問日期,這個任務實質就是要獲取以每一個天然日爲單位的全部用戶訪問次數的累加值。若是經過MapReduce編程實現這個任務,首先要考慮的是,Mapper與Reducer各自的處理邏輯是怎樣的,而後根據處理邏輯編寫核心代碼,最後在Eclipse中編寫核心代碼,編譯打包後提交集羣運行。

    3.1 分析思路與處理邏輯

        着重考慮如下幾個要素:

        ①輸入輸出格式 ②Mapper要實現的邏輯 ③Reducer要實現的計算邏輯

        (1) 定義輸入/輸出格式 

            社交網站用戶的訪問日期在格式上屬於文本格式,訪問次數爲整型數值格式。其組成的鍵值對爲<訪問日期,訪問次數>,所以Mapper的輸出與Reducer的輸出都選用Text類與IntWritable類。

        (2)Mapper 類的邏輯實現

            Mapper類中最主要的部分就是map函數。map函數的主要任務就是讀取用戶訪問文件中的數據,輸出全部訪問日期與初始次數的鍵值對。所以訪問日期是數據文件的第二列,全部先定義一個數組,再提取第二個元素,與初始次數1一塊兒構成要輸出的鍵值對,即<訪問日期,1>。

        (3)Reducer的邏輯實現

            Reducer類中最主要的部分就是reduce函數。reduce的主要任務就是讀取Mapper輸出的鍵值對<訪問日期,1>。這一部分與官網給出的WordCount中的Reducer徹底相同。

    3.2 編寫核心模塊代碼

        目錄結構:

        

package test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DailyAccessCount {
       // Mapper模塊
       public static class MyMapper
    extends Mapper<Object, Text, Text, IntWritable>{
             private final static IntWritable one = new IntWritable(1);
             public void map(Object key, Text value, Context context) //map函數的編寫要根據讀取的文件內容和業務邏輯來寫
                           throws IOException, InterruptedException {
                    String line = value.toString();
                    String array[] = line.split(",");//指定,爲分隔符,組成數組
                    String keyOutput = array[1];//提取數組中的訪問日期做爲Key
                    context.write(new Text(keyOutput), one);//造成鍵值對
             }
       }
       
       // Reducer模塊
       public static class MyReducer
    extends Reducer<Text,IntWritable,Text,IntWritable> {
             private IntWritable result = new IntWritable();
             public void reduce(Text key, Iterable<IntWritable> values, Context  context)
                           throws IOException, InterruptedException {
                    int sum = 0;                                   //定義累加器,初始值爲0
                    for (IntWritable val : values) {
                           sum += val.get();                       //將相同鍵的全部值進行累加
                    }
                    result.set(sum);
                    context.write(key, result);
             }
       }
       
       //Driver模塊,主要是配置參數
       public static void main(String[] args) throws Exception {
           Configuration conf = new Configuration();
           
           Job job = Job.getInstance(conf, "DailyAccessCount");
           job.setJarByClass(DailyAccessCount.class);
           job.setMapperClass(MyMapper.class);
           job.setReducerClass(MyReducer.class);
           job.setMapOutputKeyClass(Text.class);
           job.setMapOutputValueClass(IntWritable.class);
           job.setOutputKeyClass(Text.class);
           job.setOutputValueClass(IntWritable.class);
           for (int i = 0; i < args.length - 1; ++i) {
             FileInputFormat.addInputPath(job, new Path(args[i]));
           }
           FileOutputFormat.setOutputPath(job,
             new Path(args[args.length - 1]));
           System.exit(job.waitForCompletion(true) ? 0 : 1);
         }    
}

    3.3 任務實現

        將文件編譯生成JAR包文件,提交Hadoop集羣執行

    在運行過程當中報錯,緣由是我在外面的JDK用的是1.9的,等級太高了(Linux系統的JDK是1.8的),因此要從新配置JDK。

    記住,在用Windows環境下的JDK要和Hadoop集羣環境下的JDK環境相同。

    歷史各個版本的JDK下載地址: 

                     https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8-2177648.html

    通過配置,終於能夠運行啦,哈哈哈 o(* ̄︶ ̄*)o

hadoop jar NewDaily.jar test.NewDaily /user/dftest/user_login.txt /user/dftest/AccessCount

    結果以下:

        再來查看輸出結果:

        打開文件能夠看到:

        第一列是已經按照天然日期排好順序,第二列是對應日期的總訪問次數,任務基本完成。

4.編程實現按訪問次數排序

    前一部分完成了日期統計任務,本部分要對AccessCount中的數據按照訪問次數進行排序,將排序後的結果存放在相同目錄下的TimesSort中。

    4.1 分析思路與處理邏輯

        MapReduce只會對鍵值進行排序,因此咱們在Mapper模塊中對於輸入的鍵值對,把Key與Value位置互換,在Mapper輸出後,鍵值對通過shuffle的處理,已經變成了按照訪問次數排序的數據順序啦,輸出格式爲<訪問次數,日期>。Reducer的處理和Mapper剛好相反,將鍵和值的位置互換,輸出格式變爲<日期,訪問次數>。

    4.2 編寫核心模塊代碼

package test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class AccessTimesSort {
       // Mapper模塊
       public static class MyMapper
    extends Mapper<Object, Text, IntWritable,Text>{
             public void map(Object key, Text value, Context context) //map函數的編寫要根據讀取的文件內容和業務邏輯來寫
                           throws IOException, InterruptedException {
                    String line = value.toString();
                    String array[] = line.split("\t");//指定,爲分隔符,組成數組
                    int keyOutput = Integer.parseInt(array[1]);//提取數組中的訪問次數做爲Key
                    String valueOutput = array[0]; //將日期做爲value
                    context.write(new IntWritable(keyOutput), new  Text(valueOutput));
             }
       }
       
       // Reducer模塊
       public static class MyReducer
    extends Reducer<IntWritable,Text,Text,IntWritable> {//注意與上面輸出對應
             public void reduce(IntWritable key, Iterable<Text> values, Context  context)  
                           throws IOException, InterruptedException {
                    for (Text val : values) {
                           context.write(val, key);                //進行鍵值位置互換
                    }
             }
       }
       
       //Driver模塊,主要是配置參數
       public static void main(String[] args) throws Exception {
           Configuration conf = new Configuration();
           
           Job job = Job.getInstance(conf, "AccessTimesSort");
           job.setJarByClass(AccessTimesSort.class);
           job.setMapperClass(MyMapper.class);
           job.setReducerClass(MyReducer.class);
           job.setMapOutputKeyClass(IntWritable.class);
           job.setMapOutputValueClass(Text.class);
           job.setOutputKeyClass(Text.class);
           job.setOutputValueClass(IntWritable.class);
           for (int i = 0; i < args.length - 1; ++i) {
             FileInputFormat.addInputPath(job, new Path(args[i]));
           }
           FileOutputFormat.setOutputPath(job,
             new Path(args[args.length - 1]));
           System.exit(job.waitForCompletion(true) ? 0 : 1);
         }    
}

    4.3 任務實現

        (1)編譯生成jar包

        (2)將jar包上傳到集羣

        (3)執行jar包內的AccessTimesSort類

hadoop jar NewDaily.jar test.AccessTimesSort /user/dftest/AccessCount /user/dftest/TimesSort

        (4)查看執行結果(由此能夠看到結果爲升序排列)

        此任務順利完成 哈哈。

5.小結

    本章介紹了MapReduce編程的基礎知識,經過對Hadoop官方的示例代碼的分析及解讀,深刻了解了MapReduce的執行過程。MapReduce把複雜的、運行在Hadoop集羣上的並行計算過程集成到了兩個模塊——Mapper和Reducer上。開發人員只須要把業務處理邏輯經過其中的map函數和reduce函數來實現,就能夠達到分佈式並行編程的目的。

    MapReduce執行過程主要包括如下幾個部分:讀取分佈式文件系統的數據,進行數據分片,執行map任務以輸出中間結果,shuffle階段把中間結果進行匯合、排序,再傳到Reduce任務,在Reduce階段對數據進行處理,輸出最終結果到分佈式文件系統內。

6.實訓

    實訓目的是,掌握MapReduce編程的基本方法,經過MapReduce編程來實現一些經常使用的數據處理方法,包括求最大值、去重等。

    實訓1.獲取成績表的最高分記錄

        (1)需求說明:對於樣例文件subject_score,即成績表A。文件中的每一行數據包含兩個字段:科目和分數。要求得到成績列表中每一個科目成績最高的記錄,並將結果輸出到最高成績表B。

        表A的部份內容:

 

語文

96

數學

102

英語

130

物理

19

化學

44

生物

44

語文

109

數學

118

英語

141

        要輸出的表B結構:

 

化學

99

數學

149

物理

99

生物

99

英語

144

語文

114

        (2)實現思路與步驟:

            ①在Mapper中,map函數讀取成績表A中的數據,直接將讀取的數據以空格分隔,組成鍵值對<科目,成績>,即設置輸出鍵值對類型爲<Text,IntWritable>。

            ②在Reducer中,因爲map函數輸出鍵值對類型是<Text,IntWritable>,因此在Reducer中接收的鍵值對類型就是<Text,Iterable<IntWritable>>。針對相同的鍵遍歷它的值,找到最高值,最後輸出的鍵值對爲<科目,最高成績>。

        (3)實現及輸出結果:

            ①代碼實現:

 

package test;


import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;


import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class ScoreSorting {
    // Mapper模塊
        public static class MyMapper
        extends Mapper<LongWritable, Text, Text ,IntWritable>{
            Text course=new Text();
            IntWritable score=new IntWritable();
            public void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, Text ,IntWritable>.Context context) //map函數的編寫要根據讀取的文件內容和業務邏輯來寫
                    throws IOException, InterruptedException {
                String line = value.toString();
                String array[] = line.trim().split(" ");//trim函數去掉兩邊多餘的空格,指定空格爲分隔符,組成數組
                course.set(array[0]);//第一列是科目
                score.set(Integer.parseInt(array[1]));//第二列是分數
                context.write(course, score);
            }
        }
        
        // Reducer模塊
        public static class MyReducer
        extends Reducer<Text,IntWritable,Text,IntWritable> {//注意與上面輸出對應
            private IntWritable result = new IntWritable();
            public void reduce(Text key, Iterable<IntWritable> values,
                    Reducer<Text,IntWritable,Text,IntWritable>.Context context)  
                    throws IOException, InterruptedException {
                int maxscore=0;                        //初始化最大值
                for (IntWritable score:values) {
                    if(maxscore < score.get()) {
                        maxscore=score.get();        //相同鍵內找最大值
                    }
                }
                result.set(maxscore);
                context.write(key, result);
                
            }
        }
        
        //Driver模塊,主要是配置參數
        public static void main(String[] args) throws Exception {  //對有幾個參數要有很強的敏感性,若是多能夠用前面的遍歷方式,若是少就能夠直接指定。
            if(args.length!=2) {
                System.err.println("ScoreSorting <input> <output>");
                System.exit(-1);
            }
            Configuration conf = new Configuration();  
            Job job = Job.getInstance(conf, "ScoreSorting");
            job.setJarByClass(ScoreSorting.class);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            job.setNumReduceTasks(1);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
          }    
}

            上傳數據文件並執行程序:

hdfs dfs -put /testhadoop/subject_score.txt /user/dftest
hadoop jar NewDaily3.jar test.ScoreSorting /user/dftest/subject_score.txt /user/dftest/SortScore8     //通過8次才調好的。

通過不斷地調試,可是總會出現輸入類型不匹配的問題,最終,找到在map函數重寫的時候,由於值類型錯了,應該是Text類型,寫成了IntWritable類型,可是報錯老是報是由於Text 和 LongWritable問題,因此有點迷。

最後問題解決:

    實訓2.對兩個文件中的數據進行合併和去重

        (1)需求說明:

            有兩個樣例文件:XX與YY。要求合併兩個文件中的數據,並對合並後的數據進行去重,將結果輸出到文件ZZ。

        (2)實現思路與步驟:

            ①利用MapReduce中Reducer類會合並相同鍵值對的特性,對目標數據進行去重。

            ②在HDFS建立目錄XXYY,將樣例文件XX與YY上傳到此目錄。MapReduce程序讀取此目錄下的文件。

            ③在Mapper類中,map函數讀取兩個文件數據,直接將讀取的數據做爲鍵,將值設置爲1,最後輸出格式爲<Text,IntWritable>。

            ④在Reducer中,鍵保持不變,將對應的值取爲空,輸出類型爲<Text,NullWritable>。

        (3)實現及輸出結果:

            略

7.小練習

    在MapReduce程序中,Reducer類中包括的函數有:B

    A.   startup、reduce、end        B.    setup、reduce、cleanup    

    C.    start、run、reduce、end    D.   startup、run、end     

 

下一章將對MapReduce編程進行更深一步的剖析 ^_^ 。

相關文章
相關標籤/搜索