使用mapreduce計算環比的實例

  最近作了一個小的mapreduce程序,主要目的是計算環比值最高的前5名,原本打算使用spark計算,但是本人目前spark還只是簡單看了下,所以就先改用mapreduce計算了,今天和你們分享下這個例子,也算是對本身寫的程序的總結了。java

  首先解釋下環比,例如咱們要算本週的環比,那麼計算方式就是本週的數據和上週數字的差值除以上週數值就是環比了,若是是月的環比就是本月和上月數據的差值除以上月數字就是本月環比了。不過本mapreduce實例不會直接算出比值,只是簡單求出不一樣時間段數值的差值,最終環比結果由業務系統進行運算了。正則表達式

  下面看看本人構造的測試數據了,測試數據分紅兩個文件,文件一的內容以下所示:算法

guanggu,1;90
hongshan,1;80
xinzhou,1;70
wuchang,1;95
hankou,1;85
hanyang,1;75

  第二個文件的測試數據以下:shell

guanggu,2;66
hongshan,2;68
xinzhou,2;88
wuchang,2;59
hankou,2;56
hanyang,2;38

  這裏每行第一列的字段就是key了,key和value使用逗號分割,1;90是value值,value值包含兩個內容,1爲時間段標記,90就是數值,你們能夠看到同一個key會有兩個不一樣的時間段(使用1和2來標記)。apache

  Mapreduce的運算邏輯以下:首先第一步咱們要求出環比數值,第二步就是排序了,作這個算法我曾考慮許久就是想把求環比值和排序兩個過程合併,可是最後發現很難作到,只好將整個運算過程拆分紅兩個不一樣mapreduce,第一個mapreduce計算環比,第二個進行排序,兩者是迭代關係。這裏解釋下分紅兩個mapreduce緣由吧,主要緣由就是最原始數據很難把兩個不一樣時間段的數據按照key合併在一塊兒變成一行數據,所以mapreduce計算時候必須有一個過程就是執行相同key合併操做,所以不得不分紅兩個步驟完成計算。數組

  接下來就是具體代碼了,首先是第一個mapreduce,用來計算環比值的mapreduce了,它的map實現代碼以下:服務器

import java.io.IOException;

import org.apache.hadoop.io.Text;
// 使用輸入爲object,text,輸出爲Text,Text的數據結構,Object實際上是行號,在本計算裏意義不大,Text就是每行的內容
public class MrByAreaHBMap extends org.apache.hadoop.mapreduce.Mapper<Object, Text, Text, Text>{
    
    private static String firstSeparator = ",";//每行的key和value值使用逗號分割

    @Override
    protected void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        /* 本map的邏輯很是簡單,就是從行裏拆分key和value,對於有些初學者可能疑惑,咱們到底如何讓相同的key合併在一塊兒了?這個就要看reduce計算了*/        
        Text areaKey = new Text();// reduce輸入是Text類型
        Text areaVal = new Text();// reduce輸入是Text類型
        String line = value.toString();
        if (line != null && !line.equals("")){
            String[] arr = line.split(firstSeparator);

            areaKey.set(arr[0]);
            areaVal.set(arr[1]);
            
            context.write(areaKey, areaVal);
        }

    }

}

  下面是reduce代碼了,具體以下:數據結構

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MrByAreaHBReduce extends Reducer<Text, Text, Text, Text>{
    
    private static String firstSeparator = ";";
    private static String preFlag = "1";
    private static String nextFlag = "2";

    /*reduce的輸入也是key,value的形式,不過這個輸入是會將map裏相同的key的值進行合併,合併形式就是一個數組形式,不過reduce方法裏是經過迭代器進行數值處理*/
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        int num1 = 0,num2 = 0,hbNum = 0;
        for(Text value : values){
            String inVal = value.toString();
            String[] arr = inVal.split(firstSeparator);
            // 下面的邏輯是經過不一樣時間段標記獲取不一樣時間段數值
            if (arr[0].equals(preFlag)){
                num1 = Integer.valueOf(arr[1]);
            }
            if (arr[0].equals(nextFlag)){
                num2 = Integer.valueOf(arr[1]);
            }
        }
        hbNum = num1 - num2;// 這裏計算環比
        Text valueText = new Text();
        valueText.set(hbNum + "");
        Text retKey = new Text();
        /* 對reduce的key進行了修改,將原來key和各個時間段的數值合併在一塊兒,這樣讀取計算結果時候就能夠讀取到原始計算數據了,這是key,value計算模式能夠簡陋的無奈之舉*/
        retKey.set(key.toString() + firstSeparator + num1 + firstSeparator + num2);
        context.write(valueText,retKey);
    }

}

  求環比的mapredue代碼介紹結束了,下面是排序的算法,排序算法更加簡單,在計算環比的mapreduce輸出裏我將環比值和原始key進行了互換,而後輸出到結果文件裏,這個結果文件就是第二個mapreduce的輸入了,下面咱們就要對這個新key進行排序,mapredcue計算模型裏從map到reduce有默認的排序機制,若是map輸出的key是字符類型那麼排序規則就是按照字典進行排序,若是key是數字,那麼就會按照數字由小到大進行排序,下面就是排序mapreduce的具體算法,map代碼以下:app

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MrByAreaSortMap extends
        Mapper<LongWritable, Text, IntWritable, Text> {
    /* 咱們須要的排序是按照key的數值排序,不過這個排序是map的輸出才作的,所以代碼裏輸出的key使用了IntWritable類型 
       其實排序的map邏輯很是簡單就是保證map的輸出key是數字類型便可
    */
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        /*reduce的輸出結果文件格式是按照空格分隔的,不過也搞不清有幾個空格,或者是tab分割了,這裏使用正則表達式s+就不怕多少空格和tab了*/
        String[] arr = line.split("\\s+");
        IntWritable outputKey = new IntWritable(Integer.valueOf(arr[0]));
        Text outputValue = new Text();
        outputValue.set(arr[1]);
        context.write(outputKey, outputValue);
    }
}

  reduce代碼以下:eclipse

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/* reduce代碼很讓人吃驚吧,就是把map結果原樣輸出便可 */
public class MrByAreaSortReduce extends
        Reducer<IntWritable, Text, IntWritable, Text> {

    @Override
    protected void reduce(IntWritable key, Iterable<Text> values,
            Context context) throws IOException, InterruptedException {
        for (Text text : values){
            context.write(key, text);
        }
    }

}

  代碼裏的註釋對代碼邏輯進行了詳細的解釋,這裏就不累述了。

  下面就是調用兩個mapreduce的main函數了,也就是咱們該如何執行mapreduce的方式,這個main函數仍是很是有特色的,特色一就是兩個mapreduce有迭代關係,具體就是第一個mapredcue執行完畢後第二個mapredcue才能執行,或者說第一個mapredcue的輸出就是第二個mapredcue的輸入,特色二就是排序計算裏咱們使用了map到reduce過程及shuffle過程裏的默認排序機制,那麼該機制運用可不是像mapreduce代碼那麼簡單了,其實背後須要咱們更加深刻理解mapreduce的原理,這裏咱們直接看代碼了,代碼以下:

mport java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MrByAreaJob {
    public static void main(String[] args) throws IOException {
        // 一個mapreduce就是一個job 一個job須要一個單獨的Configuration,我開始讓兩個job公用Configuration,最後mr報錯
        Configuration conf01 = new Configuration();
        ControlledJob conJobHB = new ControlledJob(conf01);
        
        // 下面代碼不少文章裏都會提到這裏就很少說了
        Job jobHB = new Job(conf01,"hb");
        jobHB.setJarByClass(MrByAreaJob.class);
        jobHB.setMapperClass(MrByAreaHBMap.class);
        jobHB.setReducerClass(MrByAreaHBReduce.class);
        jobHB.setMapOutputKeyClass(Text.class);
        jobHB.setMapOutputValueClass(Text.class);
        jobHB.setOutputKeyClass(Text.class);
        jobHB.setOutputValueClass(Text.class);
        
        conJobHB.setJob(jobHB);
        
        FileInputFormat.addInputPath(jobHB, new Path(args[0]));
        FileOutputFormat.setOutputPath(jobHB, new Path(args[1]));
        
        Configuration conf02 = new Configuration();
        Job jobSort = new Job(conf02,"sort");
        jobSort.setJarByClass(MrByAreaJob.class);
        jobSort.setMapperClass(MrByAreaSortMap.class);
        jobSort.setReducerClass(MrByAreaSortReduce.class);
        // Partitioner是shuffle的一個步驟,一個Partitioner對應一個reduce
        // 假如這個mapredue有多個reduce,咱們如何保證排序的全局一致性,所以這裏須要進行處理
        jobSort.setPartitionerClass(PartitionByArea.class);
        // map對數值排序默認是由小到大,可是需求是由大到小,所以須要咱們改變這種排序
        jobSort.setSortComparatorClass(IntKeyComparator.class);
        jobSort.setMapOutputKeyClass(IntWritable.class);
        jobSort.setMapOutputValueClass(Text.class);
        
        jobSort.setOutputKeyClass(IntWritable.class);
        jobSort.setOutputValueClass(Text.class);
        
        ControlledJob conJobSort = new ControlledJob(conf02);
        conJobSort.setJob(jobSort);
        
        // 這裏添加job的依賴關係
        conJobSort.addDependingJob(conJobHB);
        
        // 能夠看到第一個mapreduce的輸出就是第二個的輸入
        FileInputFormat.addInputPath(jobSort, new Path(args[1]));
        FileOutputFormat.setOutputPath(jobSort, new Path(args[2]));
        
        // 主控制job
        JobControl mainJobControl = new JobControl("mainHBSort");
        
        mainJobControl.addJob(conJobHB);
        mainJobControl.addJob(conJobSort);
        
        Thread t = new Thread(mainJobControl);
        t.start();
        
        while(true){
            if (mainJobControl.allFinished()){
                System.out.println(mainJobControl.getSuccessfulJobList());
                mainJobControl.stop();
                break;
            }
        }
    }
}

  這裏有兩個類尚未介紹,一個是IntKeyComparator,這是爲了保證排序的mapreduce結果是按數字由大到小排序,代碼以下:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class IntKeyComparator extends WritableComparator {

    protected IntKeyComparator() {
        super(IntWritable.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        return -super.compare(a, b);
    }
    
    

}

  另外一個類就是PartitionByArea,這個是保證排序不會由於reduce設置的個數而不能保證排序的全局一致性,代碼具體以下:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class PartitionByArea<IntWritable, Text> extends Partitioner<IntWritable, Text> {

    @Override
    public int getPartition(IntWritable key, Text value, int numReduceTasks) {
        int maxValue = 50;
        int keySection = 0;
        
        // numReduceTasks就是默認的reduce任務個數
        if (numReduceTasks > 1 && key.hashCode() < maxValue){
            int sectionValue = maxValue / (numReduceTasks - 1);
            int count = 0;
            while((key.hashCode() - sectionValue * count) > sectionValue){
                count++;
            }
            keySection = numReduceTasks - 1 - count;
        }
        
        return keySection;
    }

}

  這裏特別要講解的是PartitionByArea,這個原理我花了好一段時間才理解過來,partition是map輸出爲reduce對應作的分區,通常一個partition對應一個reduce,若是咱們將reduce任務槽設置爲一個那麼就不用更改Partition類,可是實際生產狀況下reduce每每會配置多個,這個時候保證數據的總體排序就十分重要了,那麼咱們如何保證其數據的總體有序了,這個時候咱們要找到輸入數據的最大值,而後讓最大值除以partition的數量的商值做爲分割數據的邊界,這樣等分就能夠保證數據的總體排序的有效性了。

  如今全部的代碼都介紹完畢了,下面就是咱們該如何讓這個代碼運行了,我在寫本代碼時候使用的是ide是eclipse,不過我沒有使用mapreduce插件,而是直接放在服務器上運行,下面我來描述下運行該mr的方式,具體以下:

  首先我在裝有hadoop服務的服務器上使用root用戶建立一個屬於我本身的文件夾,這裏文件夾的名字叫作xiajun,我經過ftp將源文件傳遞到xiajun目錄下的javafile文件夾,執行以下命令:

mkdir /xiajun/javafile
javac –classpath /home/hadoop/hadoop/hadoop-core-0.20.2-cdh3u4.jar –d /xiajun/javaclass /xiajun/ javafile/*.java

  以上命令是編譯源文件,將javafile文件夾的java代碼編譯到javaclass目錄下。

  

Jar –cvf /xiajun/mymr.jar –C /xiajun/javaclass/ .

  這裏將javaclass目錄下class文件打成jar包,放在xiajun目錄下。

  接下來咱們使用hadoop用戶登陸:

su – hadoop

  之因此使用root用戶編譯,打jar包緣由是個人hadoop用戶沒有權限上傳文件不得已而爲之了。

  咱們首先將測試數據上傳到HDFS上,接下來執行以下命令:

cd /hadoop/bin

  切換目錄到bin目錄下,而後執行:

hadoop jar mymr.jar cn.com.TestMain  輸入目錄  輸出目錄

  這裏輸入能夠是具體文件也能夠是目錄,輸出目錄在HDFS上要不存在,若是存在hadoop會沒法確認任務是否已經執行完畢,就會強制終止任務。

  兩個mapreduce迭代執行日誌很是讓人失望,所以若是咱們發現任務沒法正常執行,我如今都是一個個mapredcue執行查看錯誤日誌。

  最後咱們看看應用服務應該如何調用這個mapreduce程序,這裏我使用遠程調用shell 的方式,代碼以下:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.Session;


public class TestMain {

    /**
     * @param args
     * @throws IOException 
     * @throws ClassNotFoundException 
     * @throws InterruptedException 
     */
    public static void main(String[] args) {
        String hostname = "192.168.1.200";
        String username = "hadoop";
        String pwd = "hadoop";
        
        Connection conn = new Connection(hostname);
        Session sess = null;
        long begin = System.currentTimeMillis();
        try {
            conn.connect();
            boolean isAuthenticated = conn.authenticateWithPassword(username, pwd);
            sess = conn.openSession();
            sess.execCommand("cd hadoop/bin && hadoop jar /xiajun/mymr.jar com.test.mr.MrByAreaJob /xiajun/areaHBinput /xiajun/areaHBoutput58 /xiajun/areaHBoutput68");
            
            InputStream stdout = sess.getStdout();
            BufferedReader br = new BufferedReader(new InputStreamReader(stdout));
            StringBuilder sb = new StringBuilder();
            
            while(true){
                String line = br.readLine();
                if (line == null) break;
                sb.append(line);
            }
            
            System.out.println(sb.toString());
            
            long end = System.currentTimeMillis();
            System.out.println("耗時:" + (begin - end)/1000 + "秒");
        } catch (IOException e) {
            e.printStackTrace();
        }finally{
            sess.close();
            conn.close();
        }
    }

}

  好了,本文就此結束了。

相關文章
相關標籤/搜索