案例:java
數據:apache
郵編 | 日期 |金額app
ILMN,2013-12-05,97.65
GOOD,2013-12-09,1078.14
IBM,2013-12-09,177.46
ILMN,2013-12-09,101.33
ILMN,2013-12-06,99.25,
GOOD,2013-12-06,1069.87
IBM,2013-12-06,177.67
GOOD,2013-12-05,1057.34
GOOD,2013-12-05,10.23
GOOD,2013-12-05,11.43
GOOD,2013-12-05,17.34ide
要求:把同一個郵編的放在一塊兒,而後根據日期和金額降序排列。函數
效果以下:工具
思路:在map階段,構造的key(CompositeKey)是:(郵編,日期);value(NaturalValue)是(日期,價格)。而後key繼承oop
WritableComparable,實現比較函數這樣就能夠保證一份數據出來是分區且區內有序的。this
而後在shuffle過程當中,指定一個key比較器(CompositeKeyComparator),使得在聚合過程後,對key按照先郵編,再時間,最後金額的順序排序,key-value是鍵值對,key按照咱們的意願排好序了,spa
value也就排好了。code
總的來講:降序什麼的都是CompositeKeyComparator來決定的。
代碼結構:
(1)key:組合鍵
1 package com.book.test1; 2 import java.io.DataInput; 3 import java.io.DataOutput; 4 import java.io.IOException; 5 6 import org.apache.hadoop.io.DoubleWritable; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.io.Writable; 10 import org.apache.hadoop.io.WritableComparable; 11 /** 12 * 這個的做用就是要數據在分區裏面有序 13 */ 14 /** 15 * 定義組合鍵:就是能夠把本身要比較的字段寫入 16 * @author Sxq 17 * 18 */ 19 //必需要時間這個WritableComparable這個類 20 public class CompositeKey implements Writable, WritableComparable<CompositeKey> { 21 22 // 股票的名字 23 private Text stockSymbol; 24 // 日期 25 private LongWritable timestamp; 26 private DoubleWritable price; 27 28 29 public DoubleWritable getPrice() { 30 return price; 31 } 32 public void setPrice(DoubleWritable price) { 33 this.price = price; 34 } 35 public CompositeKey() 36 { 37 38 } 39 public CompositeKey(Text _stockSymbol, LongWritable _timestamp,DoubleWritable _price) { 40 this.stockSymbol = _stockSymbol; 41 this.timestamp = _timestamp; 42 this.price=_price; 43 } 44 45 46 47 public Text getStockSymbol() { 48 return stockSymbol; 49 } 50 51 52 public void setStockSymbol(Text stockSymbol) { 53 this.stockSymbol = stockSymbol; 54 } 55 56 57 58 public LongWritable getTimestamp() { 59 return timestamp; 60 } 61 62 63 64 public void setTimestamp(LongWritable timestamp) { 65 this.timestamp = timestamp; 66 } 67 68 69 70 //讀出 71 public void readFields(DataInput input) throws IOException { 72 String value1=input.readUTF(); 73 long value2=input.readLong(); 74 this.stockSymbol=new Text( value1); 75 this.timestamp= new LongWritable(value2); 76 this.price=new DoubleWritable(input.readDouble()); 77 } 78 79 //寫入 80 81 //@Override 82 public void write(DataOutput output) throws IOException { 83 output.writeUTF(this.stockSymbol.toString()); 84 output.writeLong(this.timestamp.get()); 85 output.writeDouble(this.price.get()); 86 } 87 88 public int compareTo(CompositeKey other) { 89 90 int comparator=this.stockSymbol.compareTo(other.stockSymbol); 91 if(comparator==0) 92 { 93 comparator=this.timestamp.compareTo(other.timestamp); 94 } 95 96 //升序 97 //return comparator; 98 99 return -comparator; 100 } 101 102 103 @Override 104 public String toString() { 105 return "CompositeKey [stockSymbol=" + stockSymbol + ", timestamp=" + timestamp + "]"; 106 } 107 108 }
(2)key對應的value:
package com.book.test1; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class NaturalValue implements Writable { private long timestamp; private double privce; public long getTimestamp() { return timestamp; } public void setTimestamp(long timestamp) { this.timestamp = timestamp; } public double getPrivce() { return privce; } public void setPrivce(double privce) { this.privce = privce; } public void readFields(DataInput input) throws IOException { this.timestamp=input.readLong(); this.privce=input.readDouble(); } public void write(DataOutput output) throws IOException { output.writeLong(this.timestamp); output.writeDouble(this.privce); } }
(3)分區器:
NaturalKeyPartitioner
package com.book.test1; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; /** * 分區:按照郵編分,把郵編相同的放在一塊兒 * @author Sxq */ public class NaturalKeyPartitioner extends Partitioner<CompositeKey, NaturalValue> { @Override public int getPartition(CompositeKey key, NaturalValue value, int numPartitions) { return Math.abs((int)(key.getStockSymbol().hashCode())%numPartitions); } }
(4)把key排序的比較器:在shuffle過程當中用到的
package com.book.test1; import javax.print.attribute.standard.MediaSize.Other; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 這個類的做用是把組合鍵排序,使得組合鍵也有順序 * @author Sxq * */ public class CompositeKeyComparator extends WritableComparator { public CompositeKeyComparator() { super(CompositeKey.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { CompositeKey ck1 = (CompositeKey) a; CompositeKey ck2 = (CompositeKey) b; int comparison = ck1.getStockSymbol().compareTo(ck2.getStockSymbol()); //若是郵編相同,則根據日期進一步處理。 if (comparison == 0) { int comparison2=ck1.getTimestamp().compareTo(ck2.getTimestamp()); // 若是日期相同,則須要根據價格進一步處理 if (comparison2==0) { //按照價格降序 return ck1.getPrice().compareTo(ck2.getPrice())>0?-1:1; } else { //日期不一樣,就按照日期降序 return ck1.getTimestamp().compareTo(ck2.getTimestamp())>0?-1:1; } } else { return comparison; } } static { WritableComparator.define(CompositeKey.class, new CompositeKeyComparator()); } }
(5)reduce的分區器:
CompositeGroupingComparator
package com.book.test1; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 分組:就是在reduce階段分到一個組; * 就是郵編相同的放在一個組裏面 * @author Sxq * */ public class CompositeGroupingComparator extends WritableComparator{ public CompositeGroupingComparator() { super(CompositeKey.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { CompositeKey v1=(CompositeKey)a; CompositeKey v2=(CompositeKey)b; return v1.getStockSymbol().compareTo(v2.getStockSymbol()); } }
(6)驅動類:
package com.book.test1; import java.io.IOException; import java.util.Date; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Cmain { static class Map1 extends Mapper<LongWritable, Text, CompositeKey, NaturalValue> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CompositeKey, NaturalValue>.Context context) throws IOException, InterruptedException { String line = value.toString().trim(); String[] lines = line.split(","); Date date = DateUtil.getDate(lines[1]); //long timestamp = date.getTime(); long timestamp=UtilsCmain.DataTranform(lines[1]); CompositeKey compositeKey = new CompositeKey(); NaturalValue naturalValue = new NaturalValue(); naturalValue.setPrivce(Double.valueOf(lines[2])); naturalValue.setTimestamp(timestamp); compositeKey.setStockSymbol(new Text(lines[0])); compositeKey.setPrice(new DoubleWritable(Double.valueOf(lines[2]))); compositeKey.setTimestamp(new LongWritable(timestamp)); context.write(compositeKey, naturalValue); } } static class reduce1 extends Reducer<CompositeKey, NaturalValue, Text, Text> { @Override protected void reduce(CompositeKey key, Iterable<NaturalValue> vlaue, Reducer<CompositeKey, NaturalValue, Text, Text>.Context context) throws IOException, InterruptedException { Iterator<NaturalValue> iterator = vlaue.iterator(); StringBuffer stringBuffer = new StringBuffer(); while (iterator.hasNext()) { NaturalValue naturalValue=iterator.next(); stringBuffer.append("("); stringBuffer.append(naturalValue.getTimestamp()); stringBuffer.append(","+naturalValue.getPrivce()+")"); } context.write(new Text(key.getStockSymbol()), new Text(stringBuffer.toString())); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(Cmain.class); job.setMapperClass(Map1.class); job.setReducerClass(reduce1.class); job.setMapOutputKeyClass(CompositeKey.class); job.setMapOutputValueClass(NaturalValue.class); job.setOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setSortComparatorClass(CompositeKeyComparator.class); // 在Reduce端設置分組,使得同一個郵編的在同一個組 job.setGroupingComparatorClass(CompositeGroupingComparator.class); // 設置分區 job.setPartitionerClass(NaturalKeyPartitioner.class); // 指定輸入的數據的目錄 FileInputFormat.setInputPaths(job, new Path("/Users/mac/Desktop/stock.txt")); FileOutputFormat.setOutputPath(job, new Path("/Users/mac/Desktop/flowresort")); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
(7)工具類:將2012-12-09轉爲20121209這種形式:
package com.book.test1; public class UtilsCmain { /** * 時間 */ public static long DataTranform(String vaule) { String[] args=vaule.split("-"); String datatime=args[0]+args[1]+args[2]; return Long.valueOf(datatime); } }
運行結果: