python調用mrjob實現hadoop的mapreduce日誌解析

我們通常寫mapreduce是經過java和streaming來寫的,身爲pythoner的我,html

java不會,沒辦法就用streaming來寫mapreduce日誌分析。 這裏要介紹一個java

模塊,是基於streaming搞的東西。python


mrjob 能夠讓用 Python 來編寫 MapReduce 運算,並在多個不一樣平臺上運行,你能夠:
shell

  • 使用純 Python 編寫多步的 MapReduce 做業app

  • 在本機上進行測試ide

  • 在 Hadoop 集羣上運行oop


pip 的安裝方法:測試

pip install mrjob


我測試的腳本ui

#coding:utf-8
from mrjob.job import MRJob
import re
#xiaorui.cc
#WORD_RE = re.compile(r"[\w']+")
WORD_RE = re.compile(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}")
class MRWordFreqCount(MRJob):
    def mapper(self, word, line):
        for word in WORD_RE.findall(line):
            yield word.lower(), 1
    def combiner(self, word, counts):
        yield word, sum(counts)
    def reducer(self, word, counts):
        yield word, sum(counts)
if __name__ == '__main__':
    MRWordFreqCount.run()


用法算簡單:this

python i.py -r inline input1 input2 input3 > out 命令能夠將處理多個文件的結果輸出到out文件裏面。

本地模擬hadoop運行:python 1.py -r local <input> output

這個會把結果輸出到output裏面,這個output必須寫。

hadoop集羣上運行:python 1.py -r hadoop <input> output


執行腳本 ~

[root@kspc ~]# python mo.py -r local  <10.7.17.7-dnsquery.log.1> output
no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
creating tmp directory /tmp/mo.root.20131224.040935.241241
reading from STDIN
writing to /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00000
> /usr/bin/python mo.py --step-num=0 --mapper /tmp/mo.root.20131224.040935.241241/input_part-00000 | sort | /usr/bin/python mo.py --step-num=0 --combiner > /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00000
writing to /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00001
> /usr/bin/python mo.py --step-num=0 --mapper /tmp/mo.root.20131224.040935.241241/input_part-00001 | sort | /usr/bin/python mo.py --step-num=0 --combiner > /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00001
Counters from step 1:
  (no counters found)
writing to /tmp/mo.root.20131224.040935.241241/step-0-mapper-sorted
> sort /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00000 /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00001
writing to /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00000
> /usr/bin/python mo.py --step-num=0 --reducer /tmp/mo.root.20131224.040935.241241/input_part-00000 > /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00000
writing to /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00001
> /usr/bin/python mo.py --step-num=0 --reducer /tmp/mo.root.20131224.040935.241241/input_part-00001 > /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00001
Counters from step 1:
  (no counters found)
Moving /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00000 -> /tmp/mo.root.20131224.040935.241241/output/part-00000
Moving /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00001 -> /tmp/mo.root.20131224.040935.241241/output/part-00001
Streaming final output from /tmp/mo.root.20131224.040935.241241/output
removing tmp directory /tmp/mo.root.20131224.040935.241241

執行的時候,資源的佔用狀況。

133630767.jpg


發現一個很奇妙的東西,mrjob竟然調用shell下的sort來排序。。。。

133937941.jpg


爲了更好的理解mrjob的用法,再來個例子。


from mrjob.job import MRJob
#from xiaorui.cc
class MRWordFrequencyCount(MRJob):
#把東西拼湊起來
    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1
#總結kv
    def reducer(self, key, values):
        yield key, sum(values)
if __name__ == '__main__':
    MRWordFrequencyCount.run()


看下結果:

135509171.jpg

下面是官網給的一些個用法:


咱們能夠看到他是支持hdfs和s3存儲的 !

Running your job different ways

The most basic way to run your job is on the command line:

$ python my_job.py input.txt

By default, output will be written to stdout.

You can pass input via stdin, but be aware that mrjob will just dump it to a file first:

$ python my_job.py < input.txt

You can pass multiple input files, mixed with stdin (using the - character):

$ python my_job.py input1.txt input2.txt - < input3.txt

By default, mrjob will run your job in a single Python process. This provides the friendliest debugging experience, but it’s not exactly distributed computing!

You change the way the job is run with the -r/--runner option. You can use -rinline (the default), -rlocal, -rhadoop, or -remr.

To run your job in multiple subprocesses with a few Hadoop features simulated, use -rlocal.

To run it on your Hadoop cluster, use -rhadoop.

If you have Elastic MapReduce configured (see Elastic MapReduce Quickstart), you can run it there with -remr.

Your input files can come from HDFS if you’re using Hadoop, or S3 if you’re using EMR:

$ python my_job.py -r emr s3://my-inputs/input.txt
$ python my_job.py -r hadoop hdfs://my_home/input.txt
相關文章
相關標籤/搜索