http://www.cnblogs.com/MengYan-LongYou/p/3360613.htmlhtml
在作這個Join查詢的時候,必然涉及數據,我這裏設計了2張表,分別較data.txt和info.txt,字段之間以/t劃分。app
data.txt內容以下:ide
201001 1003 abc函數
201002 1005 defoop
201003 1006 ghithis
201004 1003 jklspa
201005 1004 mno設計
201006 1005 pqrorm
info.txt內容以下:htm
1003 kaka
1004 da
1005 jue
1006 zhao
指望輸出結果:
1003 201001 abc kaka
1003 201004 jkl kaka
1004 201005 mno da
1005 201002 def jue
1005 201006 pqr jue
1006 201003 ghi zhao
4、Map代碼
首先是map的代碼,我貼上,而後簡要說說
public static class Example_Join_01_Mapper extends Mapper<LongWritable, Text, TextPair, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 獲取輸入文件的全路徑和名稱
String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
if (pathName.contains("data.txt")) {
String values[] = value.toString().split("/t");
if (values.length < 3) {
// data數據格式不規範,字段小於3,拋棄數據
return;
} else {
// 數據格式規範,區分標識爲1
TextPair tp = new TextPair(new Text(values[1]), new Text("1"));
context.write(tp, new Text(values[0] + "/t" + values[2]));
}
}
if (pathName.contains("info.txt")) {
String values[] = value.toString().split("/t");
if (values.length < 2) {
// data數據格式不規範,字段小於2,拋棄數據
return;
} else {
// 數據格式規範,區分標識爲0
TextPair tp = new TextPair(new Text(values[0]), new Text("0"));
context.write(tp, new Text(values[1]));
}
}
}
}
這裏須要注意如下部分:
A、pathName是文件在HDFS中的全路徑(例如:hdfs://M1:9000/MengYan/join/data/info.txt),能夠以endsWith()的方法來判斷。
B、資料表,也就是這裏的info.txt須要放在前面,也就是標識號是0.不然沒法輸出理想結果。
C、Map執行完成以後,輸出的中間結果以下:
1003,0 kaka
1004,0 da
1005,0 jue
1006,0 zhao
1003,1 201001 abc
1003,1 201004 jkl
1004,1 201005 mon
1005,1 201002 def
1005,1 201006 pqr
1006,1 201003 ghi
5、分區和分組
1、map以後的輸出會進行一些分區的操做,代碼貼出來:
public static class Example_Join_01_Partitioner extends Partitioner<TextPair, Text> {
@Override
public int getPartition(TextPair key, Text value, int numParititon) {
return Math.abs(key.getFirst().hashCode() * 127) % numParititon;
}
}
分區我在之前的文檔中寫過,這裏不作描述了,就說是按照map輸出的符合key的第一個字段作分區關鍵字。分區以後,相同key會劃分到一個reduce中去處理(若是reduce設置是1,那麼就是分區有多個,可是仍是在一個reduce中處理。可是結果會按照分區的原則排序)。分區後結果大體以下:
同一區:
1003,0 kaka
1003,1 201001 abc
1003,1 201004 jkl
同一區:
1004,0 da
1004,1 201005 mon
同一區:
1005,0 jue
1005,1 201002 def
1005,1 201006 pqr
同一區:
1006,0 zhao
1006,1 201003 ghi
2、分組操做,代碼以下
public static class Example_Join_01_Comparator extends WritableComparator {
public Example_Join_01_Comparator() {
super(TextPair.class, true);
}
@SuppressWarnings("unchecked")
public int compare(WritableComparable a, WritableComparable b) {
TextPair t1 = (TextPair) a;
TextPair t2 = (TextPair) b;
return t1.getFirst().compareTo(t2.getFirst());
}
}
分組操做就是把在相同分區的數據按照指定的規則進行分組的操做,就以上來看,是按照複合key的第一個字段作分組原則,達到忽略複合key的第二個字段值的目的,從而讓數據可以迭代在一個reduce中。輸出後結果以下:
同一組:
1003,0 kaka
1003,0 201001 abc
1003,0 201004 jkl
同一組:
1004,0 da
1004,0 201005 mon
同一組:
1005,0 jue
1005,0 201002 def
1005,0 201006 pqr
同一組:
1006,0 zhao
1006,0 201003 ghi
6、reduce操做
貼上代碼以下:
public static class Example_Join_01_Reduce extends Reducer<TextPair, Text, Text, Text> {
protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
Text pid = key.getFirst();
String desc = values.iterator().next().toString();
while (values.iterator().hasNext()) {
context.write(pid, new Text(values.iterator().next().toString() + "/t" + desc));
}
}
}
1、代碼比較簡單,首先獲取關鍵的ID值,就是key的第一個字段。
2、獲取公用的字段,經過排組織後能夠看到,一些共有字段是在第一位,取出來便可。
3、遍歷餘下的結果,輸出。
7、其餘的支撐代碼
1、首先是TextPair代碼,沒有什麼能夠細說的,貼出來:
public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
public int compareTo(TextPair tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
return second.compareTo(tp.second);
}
}
2、Job的入口函數
public static void main(String agrs[]) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, agrs);
String[] otherArgs = parser.getRemainingArgs();
if (agrs.length < 3) {
System.err.println("Usage: Example_Join_01 <in_path_one> <in_path_two> <output>");
System.exit(2);
}
//conf.set("hadoop.job.ugi", "root,hadoop");
Job job = new Job(conf, "Example_Join_01");
// 設置運行的job
job.setJarByClass(Example_Join_01.class);
// 設置Map相關內容
job.setMapperClass(Example_Join_01_Mapper.class);
// 設置Map的輸出
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
// 設置partition
job.setPartitionerClass(Example_Join_01_Partitioner.class);
// 在分區以後按照指定的條件分組
job.setGroupingComparatorClass(Example_Join_01_Comparator.class);
// 設置reduce
job.setReducerClass(Example_Join_01_Reduce.class);
// 設置reduce的輸出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 設置輸入和輸出的目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
// 執行,直到結束就退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
8、總結
1、這是個簡單的join查詢,能夠看到,我在處理輸入源的時候是在map端作來源判斷。其實在0.19能夠用MultipleInputs.addInputPath()的方法,可是它用了JobConf作參數。這個方法原理是多個數據源就採用多個map來處理。方法各有優劣。
2、對於資源表,若是咱們採用0和1這樣的模式來區分,資源表是須要放在前的。例如本例中info.txt就是資源表,因此標識位就是0.若是寫爲1的話,能夠試下,在分組以後,資源表對應的值放在了迭代器最後一位,沒法追加在最後全部的結果集合中。
3、關於分區,並非全部的map都結束纔開始的,一部分數據完成就會開始執行。一樣,分組操做在一個分區內執行,若是分區完成,分組將會開始執行,也不是等全部分區完成纔開始作分組的操做。