reduce端鏈接-分區分組聚合

1.1.1         reduce端鏈接-分區分組聚合

reduce端鏈接則是利用了reduce的分區功能將stationid相同的分到同一個分區,在利用reduce的分組聚合功能,將同一個stationid的氣象站數據和溫度記錄數據分爲一組,reduce函數讀取分組後的第一個記錄(就是氣象站的名稱)與其餘記錄組合後輸出,實現鏈接。例如鏈接下面氣象站數據集和溫度記錄數據集。先用幾條數據作分析說明,實際確定不僅這點數據。html

氣象站數據集,氣象站id和名稱數據表java

StationId StationNameapache

1~hangzhouapp

2~shanghaiide

3~beijing函數

溫度記錄數據集oop

StationId  TimeStamp Temperaturethis

3~20200216~6spa

3~20200215~2orm

3~20200217~8

1~20200211~9

1~20200210~8

2~20200214~3

2~20200215~4

目標:是將上面兩個數據集進行鏈接,將氣象站名稱按照氣象站id加入氣象站溫度記錄中最輸出結果:

1~hangzhou ~20200211~9

1~hangzhou ~20200210~8

2~shanghai ~20200214~3

2~shanghai ~20200215~4

3~beijing ~20200216~6

3~beijing ~20200215~2

3~beijing ~20200217~8

詳細步驟以下

(1)   兩個maper讀取兩個數據集的數據輸出到同一個文件

由於是不一樣的數據格式,因此須要建立兩個不一樣maper分別讀取,輸出到同一個文件中,因此要用MultipleInputs設置兩個文件路徑,設置兩個mapper。

(2)   建立一個組合鍵<stationed,mark>用於map輸出結果排序。

組合鍵使得map輸出按照stationid升序排列,stationid相同的按照第二字段升序排列。mark只有兩個值,氣象站中讀取的數據,mark爲0,溫度記錄數據集中讀取的數據mark爲1。這樣就能保證stationid相同的記錄中第一條就是氣象站名稱,其他的是溫度記錄數據。組合鍵TextPair定義以下

package Temperature;


import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class TextPair implements WritableComparable<TextPair> {
    private Text first;
    private Text second;

    public TextPair(Text first, Text second) {
        this.first = first;
        this.second = second;
    }

    public int compareTo(TextPair o) {
        int cmp=first.compareTo(o.getFirst());
        if (cmp!=0)//第一字段不一樣按第一字段升序排列
        {
            return cmp;
        }
        ///第一字段相同,按照第二字段升序排列
        return second.compareTo(o.getSecond());
    }

    public void write(DataOutput dataOutput) throws IOException {
        first.write(dataOutput);
        second.write(dataOutput);
    }

    public void readFields(DataInput dataInput) throws IOException {
        first.readFields(dataInput);
        second.readFields(dataInput);
    }

    public Text getFirst() {
        return first;
    }

    public void setFirst(Text first) {
        this.first = first;
    }

    public Text getSecond() {
        return second;
    }
    public void setSecond(Text second) {
        this.second = second;
    }
}

定義maper輸出的結果以下,前面是組合鍵,後面是值。

<1,0>    hangzhou

<1,1>    20200211~9

<1,1>    20200210~8

<2,0>    shanghai

<2,1>    20200214~3

<2,1>    20200215~4

<3,0>    beijing

<3,1>    20200216~6

<3,1>    20200215~2

<3,1>    20200217~8

(3)map結果傳入reducestationid分區再分組聚合

map輸出結果會按照組合鍵第一個字段stationid升序排列,相同stationid的記錄按照第二個字段升序排列,氣象站數據和記錄數據混合再一塊兒,shulfe過程當中,map將數據傳給reduce,會通過partition分區,相同stationid的數據會被分到同一個reduce,一個reduce中stationid相同的數據會被分爲一組。假設採用兩個reduce任務,分區按照stationid%2,則分區後的結果爲

分區1

<1,0>    hangzhou

<1,1>    20200211~9

<1,1>    20200210~8

<3,0>    beijing

<3,1>    20200216~6

<3,1>    20200215~2

<3,1>    20200217~8

分區2

<2,0>    shanghai

<2,1>    20200214~3

<2,1>    20200215~4

4)分區以後再將每一個分區的數據按照stationid分組聚合

分區1

分組1

<1,0>    <Hangzhou, 20200211~9, 20200210~8>

分組2

<3,0>    <Beijing, 20200216~6, 20200215~2, 20200217~8>

分區2

<2,0> <shanghai, 20200214~3, 20200215~4>

5)將分組聚合後的數據傳入reduce函數,將車站加入到後面的溫度記錄輸出。

由於數據是通過mark升序排列的,因此每組中第一個數據就是氣象站的名稱數據,剩下的是改氣象的溫度記錄數據,mark字段的做用就是爲了保證氣象站數據在第一條。因此讀取每組中第一個value,既是氣象站名稱。與其餘value組合輸出,即實現了數據集的鏈接。

1~hangzhou ~20200211~9

1~hangzhou ~20200210~8

2~shanghai ~20200214~3

2~shanghai ~20200215~4

3~beijing ~20200216~6

3~beijing ~20200215~2

3~beijing ~20200217~8

6)詳細的代碼實例

package Temperature;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.Iterator;

public class JoinRecordWithStationId extends Configured implements Tool {
    //氣象站名稱數據集map處理類
   public static class StationMapper extends Mapper<LongWritable,Text,TextPair,Text>{
       protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
           //1~hangzhou
           String[] values=value.toString().split("~");
           if (values.length!=2)
           {
               return;
           }
           //組合鍵第一字段爲stationid,第二字段爲默認0,表示車站名字數據
           context.write(new TextPair(new Text(values[0]),new Text("0")),new Text(values[1]));
       }
   }
   //溫度記錄數據集處理mapper類
   public static class TemperatureRecordMapper extends Mapper<LongWritable,Text,TextPair,Text>{
       protected void map(TextPair key, Text value, Context context) throws IOException, InterruptedException {
           String[] values=value.toString().split("~");
           if (values.length!=3)
           {
               return;
           }
           //組合鍵第一字段爲stationid,第二字段爲默認1,表示溫度記錄數據
           //3~20200216~6
           String outputValue=values[1]+"~"+values[2];
           context.write(new TextPair(new Text(values[0]),new Text("1")),new Text(outputValue));
       }
   }
   //按照statitionid分區的partioner類
    public static class FirstPartitioner extends Partitioner<TextPair,Text>{

       public int getPartition(TextPair textPair, Text text, int i) {
           //按照第一字段stationid取餘reduce任務數,獲得分區id
           return Integer.parseInt(textPair.getFirst().toString())%i;
       }
   }
   //分組比較類
   public static class GroupingComparator extends WritableComparator
   {
       public int compare(WritableComparable a, WritableComparable b) {
           TextPair pairA=(TextPair)a;
           TextPair pairB=(TextPair)b;
           //stationid相同,返回值爲0的分爲一組
           return pairA.getFirst().compareTo(pairB.getFirst());
       }
   }
   //reudce將按鍵分組的後數據,去values中第一個數據(氣象站名稱),聚合values後面的溫度記錄輸出到文件
    public static class JoinReducer extends Reducer<TextPair,Text,Text,Text>
    {
        @Override
        protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            Iterator it =values.iterator();
            String stationName=it.next().toString();
            while (it.hasNext())
            {
                String outputValue="~"+stationName+"~"+it.toString();
                context.write(key.getFirst(),new Text(outputValue));
            }
        }
    }
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
       if (args.length!=3)
       {
           return -1;
       }
        Job job=new Job(getConf(),"joinStationTemperatueRecord");
       if (job==null)
       {
           return -1;
       }
       job.setJarByClass(this.getClass());
      //設置兩個輸入路徑,一個輸出路徑
       Path StationPath=new Path(args[0]);
       Path TemperatureRecordPath= new Path(args[1]);
       Path outputPath=new Path(args[2]);
       MultipleInputs.addInputPath(job,StationPath, TextInputFormat.class,StationMapper.class);
       MultipleInputs.addInputPath(job,TemperatureRecordPath,TextInputFormat.class,TemperatureRecordMapper.class);
       FileOutputFormat.setOutputPath(job,outputPath);
       //設置分區類、分組類、reduce類
       job.setPartitionerClass(FirstPartitioner.class);
       job.setGroupingComparatorClass(GroupingComparator.class);
       job.setReducerClass(JoinReducer.class);
       //設置輸出類型
       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(Text.class);
       job.setMapOutputKeyClass(TextPair.class);
       job.setMapOutputValueClass(Text.class);
       return job.waitForCompletion(true)? 0:1;
    }
    public static void main(String[] args) throws Exception
    {
        //三個參數,參數1:氣象站數據集路徑,參數2:溫度記錄數據集路徑,參數3:輸出路徑
       int exitCode= ToolRunner.run(new JoinRecordWithStationId(),args);
       System.exit(exitCode);
    }

}

 

執行任務命令

% hadoop jar temperature-example.jar JoinRecordWithStationId input/station/all input/ncdc/all output

本身開發了一個股票智能分析軟件,功能很強大,須要的點擊下面的連接獲取:

http://www.javashuo.com/article/p-kahdodke-ge.html

相關文章
相關標籤/搜索