使用python構建基於hadoop的mapreduce日誌分析平臺


224356461.jpg





流量比較大的日誌要是直接寫入Hadoop對Namenode負載過大,因此入庫前合併,能夠把各個節點的日誌湊併成一個文件寫入HDFS。 根據狀況按期合成,寫入到hdfs裏面。java

我們看看日誌的大小,200G的dns日誌文件,我壓縮到了18G,要是用awk perl固然也能夠,可是處理速度確定沒有分佈式那樣的給力。node

230102727.jpg


Hadoop Streaming原理python

mapper和reducer會從標準輸入中讀取用戶數據,一行一行處理後發送給標準輸出。Streaming工具會建立MapReduce做業,發送給各個tasktracker,同時監控整個做業的執行過程。shell


任何語言,只要是方便接收標準輸入輸出就能夠作mapreduce~bash

再搞以前咱們先簡單測試下shell模擬mapreduce的性能速度~ app

234955396.jpg


看下他的結果,350M的文件用時35秒左右。jsp

235045406.jpg


這是2G的日誌文件,竟然用了3分鐘。 固然和我寫的腳本也有問題,咱們是模擬mapreduce的方式,而不是調用shell下牛逼的awk,gawk處理。分佈式


001056805.jpg

awk的速度 !果真很霸道,處理日誌的時候,我也很喜歡用awk,只是學習的難度有點大,不像別的shell組件那麼靈活簡單。ide

000946258.jpg


這是官方的提供的兩個demo ~ 工具

map.py

#!/usr/bin/env python
"""A more advanced Mapper, using Python iterators and generators."""
import sys
def read_input(file):
    for line in file:
        # split the line into words
        yield line.split()
def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_input(sys.stdin)
    for words in data:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        for word in words:
            print '%s%s%d' % (word, separator, 1)
if __name__ == "__main__":
    main()


reduce.py的修改方式

#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)
def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_mapper_output(sys.stdin, separator=separator)
    # groupby groups multiple word-count pairs by word,
    # and creates an iterator that returns consecutive keys and their group:
    #   current_word - string containing a word (the key)
    #   group - iterator yielding all ["<current_word>", "<count>"] items
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            # count was not a number, so silently discard this item
            pass
if __name__ == "__main__":
    main()


我們再簡單點:

#!/usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print '%s\t%s' % (word, 1)


#!/usr/bin/env python
                                                                                                                                                                                                                                                                                                                                                              
from operator import itemgetter
import sys
                                                                                                                                                                                                                                                                                                                                                              
current_word = None
current_count = 0
word = None
                                                                                                                                                                                                                                                                                                                                                              
for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    try:
        count = int(count)
    except ValueError:
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word
                                                                                                                                                                                                                                                                                                                                                             
if current_word == word:
    print '%s\t%s' % (current_word, current_count)


我們就簡單模擬下數據,跑個測試


084336884.jpg



剩下就沒啥了,在hadoop集羣環境下,運行hadoop的steaming.jar組件,加入mapreduce的腳本,指定輸出就好了.  下面的例子我用的是shell的成分。


[root@101 cron]#$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper cat \
-reducer wc


詳細的參數,對於我們來講×××能能夠把tasks的任務數增長下,根據狀況本身測試下,也別過高了,增長負擔。

(1)-input:輸入文件路徑

(2)-output:輸出文件路徑

(3)-mapper:用戶本身寫的mapper程序,能夠是可執行文件或者腳本

(4)-reducer:用戶本身寫的reducer程序,能夠是可執行文件或者腳本

(5)-file:打包文件到提交的做業中,能夠是mapper或者reducer要用的輸入文件,如配置文件,字典等。

(6)-partitioner:用戶自定義的partitioner程序

(7)-combiner:用戶自定義的combiner程序(必須用java實現)

(8)-D:做業的一些屬性(之前用的是-jonconf),具體有:
           1)mapred.map.tasks:map task數目
           2)mapred.reduce.tasks:reduce task數目
           3)stream.map.input.field.separator/stream.map.output.field.separator: map task輸入/輸出數
據的分隔符,默認均爲\t。
            4)stream.num.map.output.key.fields:指定map task輸出記錄中key所佔的域數目
            5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task輸入/輸出數據的分隔符,默認均爲\t。
            6)stream.num.reduce.output.key.fields:指定reduce task輸出記錄中key所佔的域數目


這裏是統計dns的日誌文件有多少行 ~

114512821.jpg

在mapreduce做爲參數的時候,不能用太多太複雜的shell語言,他不懂的~

能夠寫成shell文件的模式;

#! /bin/bash
while read LINE; do
#  for word in $LINE
#  do
#    echo "$word 1"
        awk '{print $5}'                                                                                                        
  done
done


#! /bin/bash
count=0
started=0
word=""
while read LINE;do
  goodk=`echo $LINE | cut -d ' '  -f 1`
  if [ "x" == x"$goodk" ];then
     continue
  fi
  if [ "$word" != "$goodk" ];then
    [ $started -ne 0 ] && echo -e "$word\t$count"
    word=$goodk                                                                                                                 
    count=1
    started=1
  else
    count=$(( $count + 1 ))
  fi
done


有時候會出現這樣的問題,好好看看本身寫的mapreduce程序 ~

13/12/14 13:26:52 INFO streaming.StreamJob: Tracking URL: http://101.rui.com:50030/jobdetails.jsp?jobid=job_201312131904_0030

13/12/14 13:26:53 INFO streaming.StreamJob:  map 0%  reduce 0%

13/12/14 13:27:16 INFO streaming.StreamJob:  map 100%  reduce 100%

13/12/14 13:27:16 INFO streaming.StreamJob: To kill this job, run:

13/12/14 13:27:16 INFO streaming.StreamJob: /usr/local/hadoop/libexec/../bin/hadoop job  -Dmapred.job.tracker=localhost:9001 -kill job_201312131904_0030

13/12/14 13:27:16 INFO streaming.StreamJob: Tracking URL: http://101.rui.com:50030/jobdetails.jsp?jobid=job_201312131904_0030

13/12/14 13:27:16 ERROR streaming.StreamJob: Job not successful. Error: # of failed Map Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask: task_201312131904_0030_m_000000

13/12/14 13:27:16 INFO streaming.StreamJob: killJob...

Streaming Command Failed!


python作爲mapreduce執行成功後,結果和日誌通常是放在你指定的目錄下的,結果是在part-00000文件裏面~


172550144.jpg


下面我們談下,如何入庫和後臺的執行

相關文章
相關標籤/搜索