出處:http://rfyiamcool.blog.51cto.com/1030776/1340057
java
流量比較大的日誌要是直接寫入Hadoop對Namenode負載過大,因此入庫前合併,能夠把各個節點的日誌湊併成一個文件寫入HDFS。 根據狀況按期合成,寫入到hdfs裏面。node
我們看看日誌的大小,200G的dns日誌文件,我壓縮到了18G,要是用awk perl固然也能夠,可是處理速度確定沒有分佈式那樣的給力。python
Hadoop Streaming原理shell
mapper和reducer會從標準輸入中讀取用戶數據,一行一行處理後發送給標準輸出。Streaming工具會建立MapReduce做業,發送給各個tasktracker,同時監控整個做業的執行過程。bash
任何語言,只要是方便接收標準輸入輸出就能夠作mapreduce~app
再搞以前咱們先簡單測試下shell模擬mapreduce的性能速度~ jsp
看下他的結果,350M的文件用時35秒左右。分佈式
這是2G的日誌文件,竟然用了3分鐘。 固然和我寫的腳本也有問題,咱們是模擬mapreduce的方式,而不是調用shell下牛逼的awk,gawk處理。ide
awk的速度 !果真很霸道,處理日誌的時候,我也很喜歡用awk,只是學習的難度有點大,不像別的shell組件那麼靈活簡單。工具
這是官方的提供的兩個demo ~
map.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
#!/usr/bin/env python
""
"A more advanced Mapper, using Python iterators and generators."
""
import
sys
def read_input(file):
for
line
in
file:
# split the line into words
yield line.split()
def main(separator=
'\t'
):
# input comes from STDIN (standard input)
data = read_input(sys.stdin)
for
words
in
data:
# write the results to STDOUT (standard output);
# what we output here will be the input
for
the
# Reduce step, i.e. the input
for
reducer.py
#
# tab-delimited; the trivial word count
is
1
for
word
in
words:
print
'%s%s%d'
% (word, separator,
1
)
if
__name__ ==
"__main__"
:
main()
|
reduce.py的修改方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
#!/usr/bin/env python
""
"A more advanced Reducer, using Python iterators and generators."
""
from itertools
import
groupby
from operator
import
itemgetter
import
sys
def read_mapper_output(file, separator=
'\t'
):
for
line
in
file:
yield line.rstrip().split(separator,
1
)
def main(separator=
'\t'
):
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin, separator=separator)
# groupby groups multiple word-count pairs by word,
# and creates an iterator that returns consecutive keys and their group:
# current_word - string containing a word (the key)
# group - iterator yielding all [
"<current_word>"
,
"<count>"
] items
for
current_word, group
in
groupby(data, itemgetter(
0
)):
try
:
total_count = sum(
int
(count)
for
current_word, count
in
group)
print
"%s%s%d"
% (current_word, separator, total_count)
except ValueError:
# count was not a number, so silently discard
this
item
pass
if
__name__ ==
"__main__"
:
main()
|
我們再簡單點:
1
2
3
4
5
6
7
|
#!/usr/bin/env python
import
sys
for
line
in
sys.stdin:
line = line.strip()
words = line.split()
for
word
in
words:
print
'%s\t%s'
% (word,
1
)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
#!/usr/bin/env python
from operator
import
itemgetter
import
sys
current_word = None
current_count =
0
word = None
for
line
in
sys.stdin:
line = line.strip()
word, count = line.split(
'\t'
,
1
)
try
:
count =
int
(count)
except ValueError:
continue
if
current_word == word:
current_count += count
else
:
if
current_word:
print
'%s\t%s'
% (current_word, current_count)
current_count = count
current_word = word
if
current_word == word:
print
'%s\t%s'
% (current_word, current_count)
|
我們就簡單模擬下數據,跑個測試
剩下就沒啥了,在hadoop集羣環境下,運行hadoop的steaming.jar組件,加入mapreduce的腳本,指定輸出就好了. 下面的例子我用的是shell的成分。
1
2
3
4
5
|
[root@
101
cron]#$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper cat \
-reducer wc
|
詳細的參數,對於我們來講×××能能夠把tasks的任務數增長下,根據狀況本身測試下,也別過高了,增長負擔。
(1)-input:輸入文件路徑
(2)-output:輸出文件路徑
(3)-mapper:用戶本身寫的mapper程序,能夠是可執行文件或者腳本
(4)-reducer:用戶本身寫的reducer程序,能夠是可執行文件或者腳本
(5)-file:打包文件到提交的做業中,能夠是mapper或者reducer要用的輸入文件,如配置文件,字典等。
(6)-partitioner:用戶自定義的partitioner程序
(7)-combiner:用戶自定義的combiner程序(必須用java實現)
(8)-D:做業的一些屬性(之前用的是-jonconf),具體有:
1)mapred.map.tasks:map task數目
2)mapred.reduce.tasks:reduce task數目
3)stream.map.input.field.separator/stream.map.output.field.separator: map task輸入/輸出數
據的分隔符,默認均爲\t。
4)stream.num.map.output.key.fields:指定map task輸出記錄中key所佔的域數目
5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task輸入/輸出數據的分隔符,默認均爲\t。
6)stream.num.reduce.output.key.fields:指定reduce task輸出記錄中key所佔的域數目
這裏是統計dns的日誌文件有多少行 ~
在mapreduce做爲參數的時候,不能用太多太複雜的shell語言,他不懂的~
能夠寫成shell文件的模式;
1
2
3
4
5
6
7
8
|
#! /bin/bash
while
read LINE;
do
#
for
word
in
$LINE
#
do
# echo
"$word 1"
awk
'{print $5}'
done
done
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
#! /bin/bash
count=
0
started=
0
word=
""
while
read LINE;
do
goodk=`echo $LINE | cut -d
' '
-f
1
`
if
[
"x"
== x
"$goodk"
];then
continue
fi
if
[
"$word"
!=
"$goodk"
];then
[ $started -ne
0
] && echo -e
"$word\t$count"
word=$goodk
count=
1
started=
1
else
count=$(( $count +
1
))
fi
done
|
有時候會出現這樣的問題,好好看看本身寫的mapreduce程序 ~
13/12/14 13:26:52 INFO streaming.StreamJob: Tracking URL: http://101.rui.com:50030/jobdetails.jsp?jobid=job_201312131904_0030
13/12/14 13:26:53 INFO streaming.StreamJob: map 0% reduce 0%
13/12/14 13:27:16 INFO streaming.StreamJob: map 100% reduce 100%
13/12/14 13:27:16 INFO streaming.StreamJob: To kill this job, run:
13/12/14 13:27:16 INFO streaming.StreamJob: /usr/local/hadoop/libexec/../bin/hadoop job -Dmapred.job.tracker=localhost:9001 -kill job_201312131904_0030
13/12/14 13:27:16 INFO streaming.StreamJob: Tracking URL: http://101.rui.com:50030/jobdetails.jsp?jobid=job_201312131904_0030
13/12/14 13:27:16 ERROR streaming.StreamJob: Job not successful. Error: # of failed Map Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask: task_201312131904_0030_m_000000
13/12/14 13:27:16 INFO streaming.StreamJob: killJob...
Streaming Command Failed!
python作爲mapreduce執行成功後,結果和日誌通常是放在你指定的目錄下的,結果是在part-00000文件裏面~