reduce端鏈接則是利用了reduce的分區功能將stationid相同的分到同一個分區,在利用reduce的分組聚合功能,將同一個stationid的氣象站數據和溫度記錄數據分爲一組,reduce函數讀取分組後的第一個記錄(就是氣象站的名稱)與其餘記錄組合後輸出,實現鏈接。例如鏈接下面氣象站數據集和溫度記錄數據集。先用幾條數據作分析說明,實際確定不僅這點數據。html
氣象站數據集,氣象站id和名稱數據表java
StationId StationNameapache
1~hangzhouapp
2~shanghaiide
3~beijing函數
溫度記錄數據集oop
StationId TimeStamp Temperaturethis
3~20200216~6spa
3~20200215~2orm
3~20200217~8
1~20200211~9
1~20200210~8
2~20200214~3
2~20200215~4
目標:是將上面兩個數據集進行鏈接,將氣象站名稱按照氣象站id加入氣象站溫度記錄中最輸出結果:
1~hangzhou ~20200211~9
1~hangzhou ~20200210~8
2~shanghai ~20200214~3
2~shanghai ~20200215~4
3~beijing ~20200216~6
3~beijing ~20200215~2
3~beijing ~20200217~8
詳細步驟以下
(1) 兩個maper讀取兩個數據集的數據輸出到同一個文件
由於是不一樣的數據格式,因此須要建立兩個不一樣maper分別讀取,輸出到同一個文件中,因此要用MultipleInputs設置兩個文件路徑,設置兩個mapper。
(2) 建立一個組合鍵<stationed,mark>用於map輸出結果排序。
組合鍵使得map輸出按照stationid升序排列,stationid相同的按照第二字段升序排列。mark只有兩個值,氣象站中讀取的數據,mark爲0,溫度記錄數據集中讀取的數據mark爲1。這樣就能保證stationid相同的記錄中第一條就是氣象站名稱,其他的是溫度記錄數據。組合鍵TextPair定義以下
package Temperature;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair(Text first, Text second) {
this.first = first;
this.second = second;
}
public int compareTo(TextPair o) {
int cmp=first.compareTo(o.getFirst());
if (cmp!=0)//第一字段不一樣按第一字段升序排列
{
return cmp;
}
///第一字段相同,按照第二字段升序排列
return second.compareTo(o.getSecond());
}
public void write(DataOutput dataOutput) throws IOException {
first.write(dataOutput);
second.write(dataOutput);
}
public void readFields(DataInput dataInput) throws IOException {
first.readFields(dataInput);
second.readFields(dataInput);
}
public Text getFirst() {
return first;
}
public void setFirst(Text first) {
this.first = first;
}
public Text getSecond() {
return second;
}
public void setSecond(Text second) {
this.second = second;
}
}
定義maper輸出的結果以下,前面是組合鍵,後面是值。
<1,0> hangzhou
<1,1> 20200211~9
<1,1> 20200210~8
<2,0> shanghai
<2,1> 20200214~3
<2,1> 20200215~4
<3,0> beijing
<3,1> 20200216~6
<3,1> 20200215~2
<3,1> 20200217~8
(3)map結果傳入reduce按stationid分區再分組聚合
map輸出結果會按照組合鍵第一個字段stationid升序排列,相同stationid的記錄按照第二個字段升序排列,氣象站數據和記錄數據混合再一塊兒,shulfe過程當中,map將數據傳給reduce,會通過partition分區,相同stationid的數據會被分到同一個reduce,一個reduce中stationid相同的數據會被分爲一組。假設採用兩個reduce任務,分區按照stationid%2,則分區後的結果爲
分區1
<1,0> hangzhou
<1,1> 20200211~9
<1,1> 20200210~8
<3,0> beijing
<3,1> 20200216~6
<3,1> 20200215~2
<3,1> 20200217~8
分區2
<2,0> shanghai
<2,1> 20200214~3
<2,1> 20200215~4
(4)分區以後再將每一個分區的數據按照stationid分組聚合
分區1
分組1
<1,0> <Hangzhou, 20200211~9, 20200210~8>
分組2
<3,0> <Beijing, 20200216~6, 20200215~2, 20200217~8>
分區2
<2,0> <shanghai, 20200214~3, 20200215~4>
(5)將分組聚合後的數據傳入reduce函數,將車站加入到後面的溫度記錄輸出。
由於數據是通過mark升序排列的,因此每組中第一個數據就是氣象站的名稱數據,剩下的是改氣象的溫度記錄數據,mark字段的做用就是爲了保證氣象站數據在第一條。因此讀取每組中第一個value,既是氣象站名稱。與其餘value組合輸出,即實現了數據集的鏈接。
1~hangzhou ~20200211~9
1~hangzhou ~20200210~8
2~shanghai ~20200214~3
2~shanghai ~20200215~4
3~beijing ~20200216~6
3~beijing ~20200215~2
3~beijing ~20200217~8
(6)詳細的代碼實例
package Temperature;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.Iterator;
public class JoinRecordWithStationId extends Configured implements Tool {
//氣象站名稱數據集map處理類
public static class StationMapper extends Mapper<LongWritable,Text,TextPair,Text>{
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
//1~hangzhou
String[] values=value.toString().split("~");
if (values.length!=2)
{
return;
}
//組合鍵第一字段爲stationid,第二字段爲默認0,表示車站名字數據
context.write(new TextPair(new Text(values[0]),new Text("0")),new Text(values[1]));
}
}
//溫度記錄數據集處理mapper類
public static class TemperatureRecordMapper extends Mapper<LongWritable,Text,TextPair,Text>{
protected void map(TextPair key, Text value, Context context) throws IOException, InterruptedException {
String[] values=value.toString().split("~");
if (values.length!=3)
{
return;
}
//組合鍵第一字段爲stationid,第二字段爲默認1,表示溫度記錄數據
//3~20200216~6
String outputValue=values[1]+"~"+values[2];
context.write(new TextPair(new Text(values[0]),new Text("1")),new Text(outputValue));
}
}
//按照statitionid分區的partioner類
public static class FirstPartitioner extends Partitioner<TextPair,Text>{
public int getPartition(TextPair textPair, Text text, int i) {
//按照第一字段stationid取餘reduce任務數,獲得分區id
return Integer.parseInt(textPair.getFirst().toString())%i;
}
}
//分組比較類
public static class GroupingComparator extends WritableComparator
{
public int compare(WritableComparable a, WritableComparable b) {
TextPair pairA=(TextPair)a;
TextPair pairB=(TextPair)b;
//stationid相同,返回值爲0的分爲一組
return pairA.getFirst().compareTo(pairB.getFirst());
}
}
//reudce將按鍵分組的後數據,去values中第一個數據(氣象站名稱),聚合values後面的溫度記錄輸出到文件
public static class JoinReducer extends Reducer<TextPair,Text,Text,Text>
{
@Override
protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Iterator it =values.iterator();
String stationName=it.next().toString();
while (it.hasNext())
{
String outputValue="~"+stationName+"~"+it.toString();
context.write(key.getFirst(),new Text(outputValue));
}
}
}
public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if (args.length!=3)
{
return -1;
}
Job job=new Job(getConf(),"joinStationTemperatueRecord");
if (job==null)
{
return -1;
}
job.setJarByClass(this.getClass());
//設置兩個輸入路徑,一個輸出路徑
Path StationPath=new Path(args[0]);
Path TemperatureRecordPath= new Path(args[1]);
Path outputPath=new Path(args[2]);
MultipleInputs.addInputPath(job,StationPath, TextInputFormat.class,StationMapper.class);
MultipleInputs.addInputPath(job,TemperatureRecordPath,TextInputFormat.class,TemperatureRecordMapper.class);
FileOutputFormat.setOutputPath(job,outputPath);
//設置分區類、分組類、reduce類
job.setPartitionerClass(FirstPartitioner.class);
job.setGroupingComparatorClass(GroupingComparator.class);
job.setReducerClass(JoinReducer.class);
//設置輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
return job.waitForCompletion(true)? 0:1;
}
public static void main(String[] args) throws Exception
{
//三個參數,參數1:氣象站數據集路徑,參數2:溫度記錄數據集路徑,參數3:輸出路徑
int exitCode= ToolRunner.run(new JoinRecordWithStationId(),args);
System.exit(exitCode);
}
}
執行任務命令
% hadoop jar temperature-example.jar JoinRecordWithStationId input/station/all input/ncdc/all output
本身開發了一個股票智能分析軟件,功能很強大,須要的點擊下面的連接獲取: