mapreduce top n

    在最初接觸mapreduce時,top n 問題的解決辦法是將mapreduce輸出(排序後)放入一個集合中,取前n個,但這種寫法過於簡單,內存可以加載的集合的大小是有上限的,一旦數據量大,很容易出現內存溢出。
    今天在這裏介紹另外一種實現方式,固然這也不是最好的方式,不過正所謂一步一個腳印,邁好每一步,之後的步伐才能更堅決,哈哈說了點題外話。恩恩,之後還會有更好的方式 java

    需求,獲得top 最大的前n條記錄
    這裏只給出一些核心的代碼,其餘job等配置的代碼略 apache

Configuration conf = new Configuration();
conf.setInt("N", 5);

    初始化job以前須要 conf.setInt("N",5); 意在在mapreduce階段讀取N,N就表明着top N
    如下是map
數組

package com.lzz.one;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


/**
 * topN
* 	#orderid,userid,payment,productid
* [root@x00 hd]# cat seventeen_a.txt
* 1,9819,100,121
* 2,8918,2000,111
* 3,2813,1234,22
* 4,9100,10,1101
* 5,3210,490,111
* 6,1298,28,1211
* 7,1010,281,90
* 8,1818,9000,20
* [root@x00 hd]# cat seventeen_b.txt
* 100,3333,10,100
* 101,9321,1000,293
* 102,3881,701,20
* 103,6791,910,30
* 104,8888,11,39
 
* 預測結果:(求 Top N=5 的結果)
* 1	9000
* 2	2000
* 3	1234
* 4	1000
* 5	910
 * @author Administrator
 *
 */
public class TopNMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{
    int len;
    int top[];
    @Override
    public void setup(Context context) throws IOException,InterruptedException {
        len = context.getConfiguration().getInt("N", 10);
        top = new int[len+1];
    }
 
    @Override
public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
    String line = value.toString();
    String arr []= line.split(",");
    if(arr != null && arr.length == 4){
        int pay = Integer.parseInt(arr[2]);
        add(pay);
    }
}


public void add(int pay){
    top[0] = pay;
    Arrays.sort(top);
}
 
@Override
public void cleanup(Context context) throws IOException,InterruptedException {
    for(int i=1;i<=len;i++){
        context.write(new IntWritable(top[i]),new IntWritable(top[i]));
    }
 }
 
}

 
 
 
 

  
  
  
  
  

 

    接下來是reduce 服務器

package com.lzz.one;

import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class TopNReduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
	int len;
	int top[];
	@Override
	public void setup(Context context)
			throws IOException, InterruptedException {
		len = context.getConfiguration().getInt("N", 10);
		top = new int[len+1];
	}
	
	@Override
	public void reduce(IntWritable key, Iterable<IntWritable> values,
			Context context)
			throws IOException, InterruptedException {
		for(IntWritable val : values){
			add(val.get());
		}
	}
	
	public void add(int pay){
		top[0] = pay;
		Arrays.sort(top);
	}
	
	@Override
	public void cleanup(Context context)
			throws IOException, InterruptedException {
		for(int i=len;i>0;i--){
			context.write(new IntWritable(len-i+1),new IntWritable(top[i]));
		}
	}
}

    說一下邏輯,雖然畫圖比較清晰,可是時間有限,畫圖水平有限,只用語言來描述吧,但願能說的明白
    若是要取top 5,則應該定義一個長度爲爲6的數組,map所要作的事情就是將每條日誌的那個須要排序的字段放入數組第一個元素中,調用Arrays.sort(Array[])方法能夠將數組按照正序,從數字角度說是從小到大排序,好比第一條記錄是9000,那麼排序結果是[0,0,0,0,0,9000],第二條日誌記錄是8000,排序結果是[0,0,0,0,8000,9000],第三條日誌記錄是8500,排序結果是[0,0,0,8000,8500,9000],以此類推,每次放進去一個數字若是大於數組裏面最小的元素,至關於將最小的覆蓋掉了,也就是說數組中元素永遠是拿到日誌中最大的那些個記錄
    ok,map將數組原封不動按照順序輸出,reduce接收到從每一個map拿到的五個排好序的元素,在進行跟map同樣的排序,排序後數組裏面就是按照從小到大排好序的元素,將這些元素倒序輸出就是最終咱們要的結果了 app

    與以前的方式作個比較,以前的map作的事情不多,在reduce中排序後哪前5條,reduce的壓力是很大的,要把全部的數據都處理一遍,而通常設置reduce的個數較少,一旦數據較多,reduce就會承受不了,悲劇了。而如今的方式巧妙的將reduce的壓力轉移到了map,而map是集羣效應的,不少臺服務器來作這件事情,減小了一臺機器上的負擔,每一個map其實只是輸出了5個元素而已,若是有5個map,其實reduce纔對5*5個數據進行了操做,也就不會出現內存溢出等問題了 ide

相關文章
相關標籤/搜索