在研究關於mapreduce實現topn時候,發現了這篇博文,感受挺不錯的,與你們一塊分享。原博客:http://my.oschina.net/u/1378204/blog/343666java
在最初接觸mapreduce時,top n 問題的解決辦法是將mapreduce輸出(排序後)放入一個集合中,取前n個,但這種寫法過於簡單,內存可以加載的集合的大小是有上限的,一旦數據量大,很容易出現內存溢出。
今天在這裏介紹另外一種實現方式,固然這也不是最好的方式,不過正所謂一步一個腳印,邁好每一步,之後的步伐才能更堅決,哈哈說了點題外話。恩恩,之後還會有更好的方式web
需求,獲得top 最大的前n條記錄
這裏只給出一些核心的代碼,其餘job等配置的代碼略apache
?數組
1
2
|
Configuration conf =
new
Configuration();
conf.setInt(
"N"
,
5
);
|
初始化job以前須要 conf.setInt("N",5); 意在在mapreduce階段讀取N,N就表明着top N
如下是map
服務器
?app
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
package
com.lzz.one;
import
java.io.IOException;
import
java.util.Arrays;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
/**
* topN
* #orderid,userid,payment,productid
* [root@x00 hd]# cat seventeen_a.txt
* 1,9819,100,121
* 2,8918,2000,111
* 3,2813,1234,22
* 4,9100,10,1101
* 5,3210,490,111
* 6,1298,28,1211
* 7,1010,281,90
* 8,1818,9000,20
* [root@x00 hd]# cat seventeen_b.txt
* 100,3333,10,100
* 101,9321,1000,293
* 102,3881,701,20
* 103,6791,910,30
* 104,8888,11,39
* 預測結果:(求 Top N=5 的結果)
* 1 9000
* 2 2000
* 3 1234
* 4 1000
* 5 910
* @author Administrator
*
*/
public
class
TopNMapper
extends
Mapper<LongWritable, Text, IntWritable, IntWritable>{
int
len;
int
top[];
@Override
public
void
setup(Context context)
throws
IOException,InterruptedException {
len = context.getConfiguration().getInt(
"N"
,
10
);
top =
new
int
[len+
1
];
}
@Override
public
void
map(LongWritable key, Text value, Context context)
throws
IOException, InterruptedException {
String line = value.toString();
String arr []= line.split(
","
);
if
(arr !=
null
&& arr.length ==
4
){
int
pay = Integer.parseInt(arr[
2
]);
add(pay);
}
}
public
void
add(
int
pay){
top[
0
] = pay;
Arrays.sort(top);
}
@Override
public
void
cleanup(Context context)
throws
IOException,InterruptedException {
for
(
int
i=
1
;i<=len;i++){
<span></span>context.write(
new
IntWritable(top[i]),
new
IntWritable(top[i]));
<span></span>}
}
}
<div>
</div>
|
接下來是reduceide
?oop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
package
com.lzz.one;
import
java.io.IOException;
import
java.util.Arrays;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.mapreduce.Reducer;
public
class
TopNReduce
extends
Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
int
len;
int
top[];
@Override
public
void
setup(Context context)
throws
IOException, InterruptedException {
len = context.getConfiguration().getInt(
"N"
,
10
);
top =
new
int
[len+
1
];
}
@Override
public
void
reduce(IntWritable key, Iterable<IntWritable> values,
Context context)
throws
IOException, InterruptedException {
for
(IntWritable val : values){
add(val.get());
}
}
public
void
add(
int
pay){
top[
0
] = pay;
Arrays.sort(top);
}
@Override
public
void
cleanup(Context context)
throws
IOException, InterruptedException {
for
(
int
i=len;i>
0
;i--){
context.write(
new
IntWritable(len-i+
1
),
new
IntWritable(top[i]));
}
}
}
|
說一下邏輯,雖然畫圖比較清晰,可是時間有限,畫圖水平有限,只用語言來描述吧,但願能說的明白
若是要取top 5,則應該定義一個長度爲爲6的數組,map所要作的事情就是將每條日誌的那個須要排序的字段放入數組第一個元素中,調用Arrays.sort(Array[])方法能夠將數組按照正序,從數字角度說是從小到大排序,好比第一條記錄是9000,那麼排序結果是[0,0,0,0,0,9000],第二條日誌記錄是8000,排序結果是[0,0,0,0,8000,9000],第三條日誌記錄是8500,排序結果是[0,0,0,8000,8500,9000],以此類推,每次放進去一個數字若是大於數組裏面最小的元素,至關於將最小的覆蓋掉了,也就是說數組中元素永遠是拿到日誌中最大的那些個記錄
ok,map將數組原封不動按照順序輸出,reduce接收到從每一個map拿到的五個排好序的元素,在進行跟map同樣的排序,排序後數組裏面就是按照從小到大排好序的元素,將這些元素倒序輸出就是最終咱們要的結果了spa
與以前的方式作個比較,以前的map作的事情不多,在reduce中排序後哪前5條,reduce的壓力是很大的,要把全部的數據都處理一遍,而通常設置reduce的個數較少,一旦數據較多,reduce就會承受不了,悲劇了。而如今的方式巧妙的將reduce的壓力轉移到了map,而map是集羣效應的,不少臺服務器來作這件事情,減小了一臺機器上的負擔,每一個map其實只是輸出了5個元素而已,若是有5個map,其實reduce纔對5*5個數據進行了操做,也就不會出現內存溢出等問題了.net