Hadoop對文本文件的快速全局排序

1、背景

Hadoop中實現了用於全局排序的InputSampler類和TotalOrderPartitioner類,調用示例是org.apache.hadoop.examples.Sort。node

可是當咱們以Text文件做爲輸入時,結果並不是按Text中的string列排序,並且輸出結果是SequenceFile。apache

緣由:dom

1) hadoop在處理Text文件時,key是行號LongWritable類型,InputSampler抽樣的是key,TotalOrderPartitioner也是用key去查找分區。這樣,抽樣獲得的partition文件是對行號的抽樣,結果天然是根據行號來排序。函數

2)大數據量時,InputSampler抽樣速度會很是慢。好比,RandomSampler須要遍歷全部數據,IntervalSampler須要遍歷文件數與splits數同樣。SplitSampler效率比較高,但它只抽取每一個文件前面的記錄,不適合應用於文件內有序的狀況。oop

2、功能

1. 實現了一種局部抽樣方法PartialSampler,適用於輸入數據各文件是獨立同分布的狀況性能

2. 使RandomSampler、IntervalSampler、SplitSampler支持對文本的抽樣大數據

3. 實現了針對Text文件string列的TotalOrderPartitionerthis

3、實現

1. PartialSampler
PartialSampler從第一份輸入數據中隨機抽取 第一列文本數據。PartialSampler有兩個屬性:freq(採樣頻率),numSamples(採樣總數)。
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
      ArrayList<K> samples = new ArrayList<K>(numSamples);
      Random r = new Random();
      long seed = r.nextLong();
      r.setSeed(seed);
      LOG.debug("seed: " + seed);      
      // 對splits【0】抽樣
      for (int i = 0; i < 1; i++) {
        System.out.println("PartialSampler will getSample splits["+i+"]");
        RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
            Reporter.NULL);
        K key = reader.createKey();
        V value = reader.createValue();
        while (reader.next(key, value)) {
          if (r.nextDouble() <= freq) {
            if (samples.size() < numSamples) {
                // 選擇value中的第一列抽樣
                Text value0 = new Text(value.toString().split("\t")[0]);         
                samples.add((K) value0);                
            } else {
              // When exceeding the maximum number of samples, replace a
              // random element with this one, then adjust the frequency
              // to reflect the possibility of existing elements being
              // pushed out
              int ind = r.nextInt(numSamples);
              if (ind != numSamples) {
                Text value0 = new Text(value.toString().split("\t")[0]);  
                samples.set(ind, (K) value0);
              }
              freq *= (numSamples - 1) / (double) numSamples;
            }
            key = reader.createKey();
          }
        }        
        reader.close();
      }
      return (K[])samples.toArray();
    }
首先經過InputFormat的getSplits方法獲得全部的輸入分區;
而後掃描第一個分區中的記錄進行採樣。

記錄採樣的具體過程以下:url

從指定分區中取出一條記錄,判斷獲得的隨機浮點數是否小於等於採樣頻率freqspa

  若是大於則放棄這條記錄;

  若是小於,則判斷當前的採樣數是否小於最大采樣數,

    若是小於則這條記錄被選中,被放進採樣集合中;

    不然從【0,numSamples】中選擇一個隨機數,若是這個隨機數不等於最大采樣數numSamples,則用這條記錄替換掉採樣集合隨機數對應位置的記錄,同時採樣頻率freq減少變爲freq*(numSamples-1)/numSamples。

而後依次遍歷分區中的其它記錄。

note:

1)PartialSampler只適用於輸入數據各文件是獨立同分布的狀況。

2)自帶的三種Sampler經過修改samples.add(key)爲samples.add((K) value0); 也能夠實現對第一列的抽樣。

2. TotalOrderPartitioner

TotalOrderPartitioner主要改進了兩點:

1)讀partition時指定keyClass爲Text.class

由於partition文件中的key類型爲Text

在configure函數中,修改:

//Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();

Class<K> keyClass = (Class<K>)Text.class;

2)查找分區時,改用value查

public int getPartition(K key, V value, int numPartitions) {
    Text value0 = new Text(value.toString().split("\t")[0]); 
    return partitions.findPartition((K) value0);
  }

3. Sort

1)設置InputFormat、OutputFormat、OutputKeyClass、OutputValueClass、MapOutputKeyClass

2)初始化InputSampler對象,抽樣

3)partitionFile經過CacheFile傳給TotalOrderPartitioner,執行MapReduce任務

    Class<? extends InputFormat> inputFormatClass = TextInputFormat.class;
    Class<? extends OutputFormat> outputFormatClass =  TextOutputFormat.class;
    Class<? extends WritableComparable> outputKeyClass = Text.class;
    Class<? extends Writable> outputValueClass = Text.class;

    jobConf.setMapOutputKeyClass(LongWritable.class);

    // Set user-supplied (possibly default) job configs
    jobConf.setNumReduceTasks(num_reduces);
    
    jobConf.setInputFormat(inputFormatClass);
    jobConf.setOutputFormat(outputFormatClass);

    jobConf.setOutputKeyClass(outputKeyClass);
    jobConf.setOutputValueClass(outputValueClass);

    if (sampler != null) {
      System.out.println("Sampling input to effect total-order sort...");
      jobConf.setPartitionerClass(TotalOrderPartitioner.class);
      Path inputDir = FileInputFormat.getInputPaths(jobConf)[0];
      inputDir = inputDir.makeQualified(inputDir.getFileSystem(jobConf));
      //Path partitionFile = new Path(inputDir, "_sortPartitioning");
      TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);
      InputSampler.<K,V>writePartitionFile(jobConf, sampler);
      
      URI partitionUri = new URI(partitionFile.toString() + "#" + "_sortPartitioning");
      DistributedCache.addCacheFile(partitionUri, jobConf);
      DistributedCache.createSymlink(jobConf);
    }

    FileSystem hdfs = FileSystem.get(jobConf);
    hdfs.delete(outputpath);
    hdfs.close();
    
    System.out.println("Running on " +
        cluster.getTaskTrackers() +
        " nodes to sort from " + 
        FileInputFormat.getInputPaths(jobConf)[0] + " into " +
        FileOutputFormat.getOutputPath(jobConf) +
        " with " + num_reduces + " reduces.");
    Date startTime = new Date();
    System.out.println("Job started: " + startTime);
    jobResult = JobClient.runJob(jobConf);

3、執行

usage:

hadoop jar yitengfei.jar com.yitengfei.Sort [-m <maps>] [-r <reduces>]

[-splitRandom <double pcnt> <numSamples> <maxsplits> | // Sample from random splits at random (general)

-splitSample <numSamples> <maxsplits> | // Sample from first records in splits (random data)

-splitInterval <double pcnt> <maxsplits>] // Sample from splits at intervals (sorted data)

-splitPartial <double pcnt> <numSamples> <maxsplits> | // Sample from partial splits at random (general) ]

<input> <output> <partitionfile>

Example:

hadoop jar yitengfei.jar com.yitengfei.Sort -r 10 -splitPartial 0.1 10000 10 /user/rp-rd/yitengfei/sample/input /user/rp-rd/yitengfei/sample/output /user/rp-rd/yitengfei/sample/partition

4、性能

200G輸入數據,15億條url,1000個分區,排序時間只用了6分鐘

相關文章
相關標籤/搜索