02Hadoop二次排序2

案例:java

數據:apache

郵編   |     日期     |金額app

ILMN,2013-12-05,97.65
GOOD,2013-12-09,1078.14
IBM,2013-12-09,177.46
ILMN,2013-12-09,101.33
ILMN,2013-12-06,99.25,
GOOD,2013-12-06,1069.87
IBM,2013-12-06,177.67
GOOD,2013-12-05,1057.34
GOOD,2013-12-05,10.23
GOOD,2013-12-05,11.43
GOOD,2013-12-05,17.34ide

 

要求:把同一個郵編的放在一塊兒,而後根據日期和金額降序排列。函數

效果以下:工具

思路:在map階段,構造的key(CompositeKey)是:(郵編,日期);value(NaturalValue)是(日期,價格)。而後key繼承oop

WritableComparable,實現比較函數這樣就能夠保證一份數據出來是分區且區內有序的。this

而後在shuffle過程當中,指定一個key比較器(CompositeKeyComparator),使得在聚合過程後,對key按照先郵編,再時間,最後金額的順序排序,key-value是鍵值對,key按照咱們的意願排好序了,spa

value也就排好了。code

 

 

總的來講:降序什麼的都是CompositeKeyComparator來決定的。

 

代碼結構:

 

 (1)key:組合鍵

  1 package com.book.test1;
  2 import java.io.DataInput;
  3 import java.io.DataOutput;
  4 import java.io.IOException;
  5 
  6 import org.apache.hadoop.io.DoubleWritable;
  7 import org.apache.hadoop.io.LongWritable;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.io.Writable;
 10 import org.apache.hadoop.io.WritableComparable;
 11 /**
 12  * 這個的做用就是要數據在分區裏面有序
 13  */
 14 /**
 15  * 定義組合鍵:就是能夠把本身要比較的字段寫入
 16  * @author Sxq
 17  *
 18  */
 19 //必需要時間這個WritableComparable這個類
 20 public class CompositeKey implements Writable, WritableComparable<CompositeKey> {
 21 
 22     // 股票的名字
 23     private Text stockSymbol;
 24     // 日期
 25     private LongWritable timestamp;
 26     private DoubleWritable price;
 27 
 28     
 29     public DoubleWritable getPrice() {
 30         return price;
 31     }
 32     public void setPrice(DoubleWritable price) {
 33         this.price = price;
 34     }
 35     public CompositeKey()
 36     {
 37         
 38     }
 39     public CompositeKey(Text _stockSymbol, LongWritable _timestamp,DoubleWritable _price) {
 40         this.stockSymbol = _stockSymbol;
 41         this.timestamp = _timestamp;
 42         this.price=_price;
 43     }
 44 
 45     
 46 
 47     public Text getStockSymbol() {
 48         return stockSymbol;
 49     }
 50 
 51 
 52     public void setStockSymbol(Text stockSymbol) {
 53         this.stockSymbol = stockSymbol;
 54     }
 55 
 56 
 57 
 58     public LongWritable getTimestamp() {
 59         return timestamp;
 60     }
 61 
 62 
 63 
 64     public void setTimestamp(LongWritable timestamp) {
 65         this.timestamp = timestamp;
 66     }
 67 
 68 
 69 
 70     //讀出
 71     public void readFields(DataInput input) throws IOException {
 72         String value1=input.readUTF();
 73         long value2=input.readLong();
 74       this.stockSymbol=new  Text( value1);
 75       this.timestamp=  new LongWritable(value2);
 76       this.price=new DoubleWritable(input.readDouble());
 77     }
 78     
 79    //寫入
 80     
 81     //@Override
 82     public void write(DataOutput output) throws IOException {
 83         output.writeUTF(this.stockSymbol.toString());
 84         output.writeLong(this.timestamp.get());
 85         output.writeDouble(this.price.get());
 86     }
 87     
 88     public int compareTo(CompositeKey other) {
 89         
 90            int comparator=this.stockSymbol.compareTo(other.stockSymbol);
 91             if(comparator==0)
 92             {
 93                 comparator=this.timestamp.compareTo(other.timestamp);
 94             }
 95         
 96         //升序
 97         //return comparator;
 98     
 99     return -comparator;
100     }
101 
102 
103     @Override
104     public String toString() {
105         return "CompositeKey [stockSymbol=" + stockSymbol + ", timestamp=" + timestamp + "]";
106     }
107     
108 }

(2)key對應的value:

package com.book.test1;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

public class NaturalValue implements Writable {
private long timestamp;
private double privce;


public long getTimestamp() {
    return timestamp;
}

public void setTimestamp(long timestamp) {
    this.timestamp = timestamp;
}

public double getPrivce() {
    return privce;
}

public void setPrivce(double privce) {
    this.privce = privce;
}



public void readFields(DataInput input) throws IOException {
    this.timestamp=input.readLong();
    this.privce=input.readDouble();
    
    
    
    
}

public void write(DataOutput output) throws IOException {
    
    
    output.writeLong(this.timestamp);
    output.writeDouble(this.privce);
    
}

    
    
    
    
}

(3)分區器:

NaturalKeyPartitioner

 

package com.book.test1;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
 * 分區:按照郵編分,把郵編相同的放在一塊兒
 * @author Sxq
 */

public class NaturalKeyPartitioner extends Partitioner<CompositeKey, NaturalValue> {

    @Override
    public int getPartition(CompositeKey key, NaturalValue value, int numPartitions) {
        return Math.abs((int)(key.getStockSymbol().hashCode())%numPartitions);
    }
    
    

}

 

(4)把key排序的比較器:在shuffle過程當中用到的

package com.book.test1;

import javax.print.attribute.standard.MediaSize.Other;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 這個類的做用是把組合鍵排序,使得組合鍵也有順序
 * @author Sxq
 *
 */
public class CompositeKeyComparator extends WritableComparator {

    public CompositeKeyComparator() {
         super(CompositeKey.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        CompositeKey ck1 = (CompositeKey) a;
        CompositeKey ck2 = (CompositeKey) b;
        int comparison = ck1.getStockSymbol().compareTo(ck2.getStockSymbol());
        //若是郵編相同,則根據日期進一步處理。
        if (comparison == 0) {
            
            int comparison2=ck1.getTimestamp().compareTo(ck2.getTimestamp());
            // 若是日期相同,則須要根據價格進一步處理
            if (comparison2==0) {
                //按照價格降序
                return ck1.getPrice().compareTo(ck2.getPrice())>0?-1:1;

            } else {
                //日期不一樣,就按照日期降序
                return ck1.getTimestamp().compareTo(ck2.getTimestamp())>0?-1:1;            
            }
            }        
        else {
            return comparison;
        }
    }
    static {   
        WritableComparator.define(CompositeKey.class, new CompositeKeyComparator());   
    }

}

(5)reduce的分區器:

CompositeGroupingComparator

package com.book.test1;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 分組:就是在reduce階段分到一個組;
 * 就是郵編相同的放在一個組裏面
 * @author Sxq
 *
 */
public class CompositeGroupingComparator extends WritableComparator{
    
     public CompositeGroupingComparator() {

     super(CompositeKey.class,true);
     }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
      CompositeKey v1=(CompositeKey)a;
      CompositeKey v2=(CompositeKey)b;
      
      return v1.getStockSymbol().compareTo(v2.getStockSymbol());
        
        
        
        
    }

    
    
}

(6)驅動類:

package com.book.test1;

import java.io.IOException;
import java.util.Date;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Cmain {
    static class Map1 extends Mapper<LongWritable, Text, CompositeKey, NaturalValue> {
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, CompositeKey, NaturalValue>.Context context)
                throws IOException, InterruptedException {
            String line = value.toString().trim();
            String[] lines = line.split(",");
            Date date = DateUtil.getDate(lines[1]);
            //long timestamp = date.getTime();
            
            long timestamp=UtilsCmain.DataTranform(lines[1]);
            CompositeKey compositeKey = new CompositeKey();
            NaturalValue naturalValue = new NaturalValue();
            naturalValue.setPrivce(Double.valueOf(lines[2]));
            naturalValue.setTimestamp(timestamp);
            compositeKey.setStockSymbol(new Text(lines[0]));
            compositeKey.setPrice(new DoubleWritable(Double.valueOf(lines[2])));
            compositeKey.setTimestamp(new LongWritable(timestamp));
            context.write(compositeKey, naturalValue);
        }

    }

    static class reduce1 extends Reducer<CompositeKey, NaturalValue, Text, Text> {
        @Override
        protected void reduce(CompositeKey key, Iterable<NaturalValue> vlaue,
                Reducer<CompositeKey, NaturalValue, Text, Text>.Context context) throws IOException, InterruptedException {

            Iterator<NaturalValue> iterator = vlaue.iterator();
            StringBuffer stringBuffer = new StringBuffer();
            while (iterator.hasNext()) {
                NaturalValue naturalValue=iterator.next();
                stringBuffer.append("(");
                stringBuffer.append(naturalValue.getTimestamp());
                stringBuffer.append(","+naturalValue.getPrivce()+")");
            }

            context.write(new Text(key.getStockSymbol()), new Text(stringBuffer.toString()));
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(Cmain.class);

        job.setMapperClass(Map1.class);
        job.setReducerClass(reduce1.class);

        job.setMapOutputKeyClass(CompositeKey.class);
        job.setMapOutputValueClass(NaturalValue.class);

        job.setOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);

        job.setSortComparatorClass(CompositeKeyComparator.class);
        // 在Reduce端設置分組,使得同一個郵編的在同一個組
        job.setGroupingComparatorClass(CompositeGroupingComparator.class);
        // 設置分區
        job.setPartitionerClass(NaturalKeyPartitioner.class);

        // 指定輸入的數據的目錄
        FileInputFormat.setInputPaths(job, new Path("/Users/mac/Desktop/stock.txt"));

        FileOutputFormat.setOutputPath(job, new Path("/Users/mac/Desktop/flowresort"));

        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }

}

(7)工具類:將2012-12-09轉爲20121209這種形式:

package com.book.test1;

public class UtilsCmain {
     /**
      * 時間
      */
     public static  long  DataTranform(String vaule)
    {
        String[] args=vaule.split("-");
        String datatime=args[0]+args[1]+args[2];
        
         return Long.valueOf(datatime);
    
    }
    
    
}

 

 

 

運行結果:

相關文章
相關標籤/搜索