Hadoop Streaming原理及實踐

Hadoop Streaming提供了一個便於進行MapReduce編程的工具包,使用它能夠基於一些可執行命令、腳本語言或其餘編程語言來實現Mapper和 Reducer,從而充分利用Hadoop並行計算框架的優點和能力,來處理大數據。須要注意的是,Streaming方式是基於Unix系統的標準輸入輸出來進行MapReduce Job的運行,它區別與Pipes的地方主要是通訊協議,Pipes使用的是Socket通訊,是對使用C++語言來實現MapReduce Job並經過Socket通訊來與Hadopp平臺通訊,完成Job的執行。任何支持標準輸入輸出特性的編程語言均可以使用Streaming方式來實現MapReduce Job,基本原理就是輸入從Unix系統標準輸入,輸出使用Unix系統的標準輸出。
Hadoop是使用Java語言編寫的,因此最直接的方式的就是使用Java語言來實現Mapper和Reducer,而後配置MapReduce Job,提交到集羣計算環境來完成計算。可是不少開發者可能對Java並不熟悉,而是對一些具備腳本特性的語言,如C++、Shell、Python、 Ruby、PHP、Perl有實際開發經驗,Hadoop Streaming爲這一類開發者提供了使用Hadoop集羣來進行處理數據的工具,即工具包hadoop-streaming-.jar。
Hadoop Streaming使用了Unix的標準輸入輸出做爲Hadoop和其餘編程語言的開發接口,所以在其餘的編程語言所寫的程序中,只須要將標準輸入做爲程 序的輸入,將標準輸出做爲程序的輸出就能夠了。在標準的輸入輸出中,Key和Value是以Tab做爲分隔符,而且在Reducer的標準輸入中,Hadoop框架保證了輸入的數據是通過了按Key排序的。
如何使用Hadoop Streaming工具呢?咱們能夠查看該工具的使用方法,經過命令行來獲取,以下所示:html

01 xiaoxiang@ubuntu3:~/hadoop$ bin/hadoop jar ./contrib/streaming/hadoop-streaming-1.0.3.jar -info
02 Usage: $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar [options]
03 Options:
04   -input    <path>     DFS input file(s) for the Map step
05   -output   <path>     DFS output directory for the Reduce step
06   -mapper   <cmd|JavaClassName>      The streaming command to run
07   -combiner <cmd|JavaClassName> The streaming command to run
08   -reducer  <cmd|JavaClassName>      The streaming command to run
09   -file     <file>     File/dir to be shipped in the Job jar file
10   -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.
11   -outputformat TextOutputFormat(default)|JavaClassName  Optional.
12   -partitioner JavaClassName  Optional.
13   -numReduceTasks <num>  Optional.
14   -inputreader <spec>  Optional.
15   -cmdenv   <n>=<v>    Optional. Pass env.var to streaming commands
16   -mapdebug <path>  Optional. To run this script when a map task fails
17   -reducedebug <path>  Optional. To run this script when a reduce task fails
18   -io <identifier>  Optional.
19   -verbose
20
21 Generic options supported are
22 -conf <configuration file>     specify an application configuration file
23 -D <property=value>            use value for given property
24 -fs <local|namenode:port>      specify a namenode
25 -jt <local|jobtracker:port>    specify a job tracker
26 -files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
27 -libjars <comma separated list of jars>    specify comma separated jar files to include inthe classpath.
28 -archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.
29
30 The general command line syntax is
31 bin/hadoop command [genericOptions] [commandOptions]
32
33
34 In -input: globbing on <path> is supported and can have multiple -input
35 Default Map input format: a line is a record in UTF-8
36   the key part ends at first TAB, the rest of the line is the value
37 Custom input format: -inputformat package.MyInputFormat
38 Map output format, reduce input/output format:
39   Format defined by what the mapper command outputs. Line-oriented
40
41 The files named in the -file argument[s] end up in the
42   working directory when the mapper and reducer are run.
43   The location of this working directory is unspecified.
44
45 To set the number of reduce tasks (num. of output files):
46   -D mapred.reduce.tasks=10
47 To skip the sort/combine/shuffle/sort/reduce step:
48   Use -numReduceTasks 0
49   A Task's Map output then becomes a 'side-effect output' rather than a reduce input
50   This speeds up processing, This also feels more like "in-place" processing
51   because the input filename and the map input order are preserved
52   This equivalent -reducer NONE
53
54 To speed up the last maps:
55   -D mapred.map.tasks.speculative.execution=true
56 To speed up the last reduces:
57   -D mapred.reduce.tasks.speculative.execution=true
58 To name the job (appears in the JobTracker Web UI):
59   -D mapred.job.name='My Job'
60 To change the local temp directory:
61   -D dfs.data.dir=/tmp/dfs
62   -D stream.tmpdir=/tmp/streaming
63 Additional local temp directories with -cluster local:
64   -D mapred.local.dir=/tmp/local
65   -D mapred.system.dir=/tmp/system
66   -D mapred.temp.dir=/tmp/temp
67 To treat tasks with non-zero exit status as SUCCEDED:
68   -D stream.non.zero.exit.is.failure=false
69 Use a custom hadoopStreaming build along a standard hadoop install:
70   $HADOOP_HOME/bin/hadoop jar /path/my-hadoop-streaming.jar [...]\
71     [...] -D stream.shipped.hadoopstreaming=/path/my-hadoop-streaming.jar
72 For more details about jobconf parameters see:
73
74 http://wiki.apache.org/hadoop/JobConfFile
75
76 To set an environement variable in a streaming command:
77    -cmdenv EXAMPLE_DIR=/home/example/dictionaries/
78
79 Shortcut:
80    setenv HSTREAMING "$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar"
81
82 Example: $HSTREAMING -mapper "/usr/local/bin/perl5 filter.pl"
83            -file /local/filter.pl -input "/logs/0604*/*" [...]
84   Ships a script, invokes the non-shipped perl interpreter
85   Shipped files go to the working directory so filter.pl is found by perl
86   Input files are all the daily logs for days in month 2006-04

面,咱們分別選擇幾個可使用Hadoop Streaming工具來進行計算的例子,好比對單詞詞頻進行統計計算,即WordCount功能。
首先,咱們準備測試使用的數據集,以下所示:node

1 xiaoxiang@ubuntu3:~/hadoop$ bin/hadoop fs -lsr /user/xiaoxiang/dataset/join/
2 -rw-r--r--   3 xiaoxiang supergroup  391103029 2013-03-26 12:19 /user/xiaoxiang/dataset/join/irc_basic_info.ds
3 -rw-r--r--   3 xiaoxiang supergroup   11577164 2013-03-26 12:20 /user/xiaoxiang/dataset/join/irc_net_block.ds
4 -rw-r--r--   3 xiaoxiang supergroup    8335235 2013-03-26 12:20 /user/xiaoxiang/dataset/join/irc_org_info.ds

一共有3個數據文件,大概將近400M大小。
下面,選擇Python語言來實現MapReduce Job的運行。
使用Python實現Mapper,代碼文件爲word_count_mapper.py,代碼以下所示python

1 #!/usr/bin/env python
2
3 import sys
4
5 for line in sys.stdin:
6     line = line.strip()
7     words = filter(lambda word: word, line.split())
8     for word in words:
9         print '%s\t%s' % (word, 1)

使用Python實現Reducer,代碼文件爲word_count_reducer.py,代碼以下所示:web

01 #!/usr/bin/env python
02
03 import sys
04 from operator import itemgetter
05
06 wc_dict = {}
07
08 for line in sys.stdin:
09     line = line.strip()
10     word, count = line.split()
11     try:
12         count = int(count)
13         wc_dict[word] = wc_dict.get(word, 0) + count
14     except ValueError:
15         pass
16
17 sorted_dict = sorted(wc_dict.items(), key=itemgetter(0))
18 for word, count in sorted_dict:
19     print '%s\t%s' % (word, count)

運行Python實現的MapReduce程序,以下所示:apache

01 xiaoxiang@ubuntu3:~/hadoop$ bin/hadoop jar ./contrib/streaming/hadoop-streaming-1.0.3.jar -input /user/xiaoxiang/dataset/join/ -output /user/xiaoxiang/output/streaming/python -mapper word_count_mapper.py -reducer word_count_reducer.py -numReduceTasks 2 -file *.py
02 packageJobJar: [word_count_mapper.py, word_count_reducer.py, /opt/stone/cloud/storage/tmp/hadoop-xiaoxiang/hadoop-unjar4066863202997744310/] [] /tmp/streamjob2336302975421423718.jar tmpDir=null
03 13/04/18 17:50:17 INFO util.NativeCodeLoader: Loaded the native-hadoop library
04 13/04/18 17:50:17 WARN snappy.LoadSnappy: Snappy native library not loaded
05 13/04/18 17:50:17 INFO mapred.FileInputFormat: Total input paths to process : 3
06 13/04/18 17:50:17 INFO streaming.StreamJob: getLocalDirs(): [/opt/stone/cloud/storage/mapred/local]
07 13/04/18 17:50:17 INFO streaming.StreamJob: Running job: job_201303302227_0047
08 13/04/18 17:50:17 INFO streaming.StreamJob: To kill this job, run:
09 13/04/18 17:50:17 INFO streaming.StreamJob: /opt/stone/cloud/hadoop-1.0.3/libexec/../bin/hadoop job  -Dmapred.job.tracker=hdfs://ubuntu3:9001/ -killjob_201303302227_0047
10 13/04/18 17:50:17 INFO streaming.StreamJob: Tracking URL:http://ubuntu3:50030/jobdetails.jsp?jobid=job_201303302227_0047
11 13/04/18 17:50:18 INFO streaming.StreamJob:  map 0%  reduce 0%
12 13/04/18 17:50:33 INFO streaming.StreamJob:  map 7%  reduce 0%
13 13/04/18 17:50:36 INFO streaming.StreamJob:  map 11%  reduce 0%
14 13/04/18 17:50:39 INFO streaming.StreamJob:  map 15%  reduce 0%
15 13/04/18 17:50:42 INFO streaming.StreamJob:  map 19%  reduce 0%
16 13/04/18 17:50:45 INFO streaming.StreamJob:  map 23%  reduce 0%
17 13/04/18 17:50:48 INFO streaming.StreamJob:  map 25%  reduce 0%
18 13/04/18 17:51:09 INFO streaming.StreamJob:  map 32%  reduce 2%
19 13/04/18 17:51:12 INFO streaming.StreamJob:  map 36%  reduce 4%
20 13/04/18 17:51:15 INFO streaming.StreamJob:  map 40%  reduce 8%
21 13/04/18 17:51:18 INFO streaming.StreamJob:  map 44%  reduce 8%
22 13/04/18 17:51:21 INFO streaming.StreamJob:  map 47%  reduce 8%
23 13/04/18 17:51:24 INFO streaming.StreamJob:  map 50%  reduce 8%
24 13/04/18 17:51:45 INFO streaming.StreamJob:  map 54%  reduce 10%
25 13/04/18 17:51:48 INFO streaming.StreamJob:  map 60%  reduce 15%
26 13/04/18 17:51:51 INFO streaming.StreamJob:  map 65%  reduce 17%
27 13/04/18 17:51:55 INFO streaming.StreamJob:  map 66%  reduce 17%
28 13/04/18 17:51:58 INFO streaming.StreamJob:  map 68%  reduce 17%
29 13/04/18 17:52:01 INFO streaming.StreamJob:  map 72%  reduce 17%
30 13/04/18 17:52:04 INFO streaming.StreamJob:  map 75%  reduce 17%
31 13/04/18 17:52:19 INFO streaming.StreamJob:  map 75%  reduce 19%
32 13/04/18 17:52:22 INFO streaming.StreamJob:  map 87%  reduce 21%
33 13/04/18 17:52:25 INFO streaming.StreamJob:  map 100%  reduce 23%
34 13/04/18 17:52:28 INFO streaming.StreamJob:  map 100%  reduce 27%
35 13/04/18 17:52:31 INFO streaming.StreamJob:  map 100%  reduce 29%
36 13/04/18 17:52:37 INFO streaming.StreamJob:  map 100%  reduce 49%
37 13/04/18 17:52:40 INFO streaming.StreamJob:  map 100%  reduce 69%
38 13/04/18 17:52:43 INFO streaming.StreamJob:  map 100%  reduce 72%
39 13/04/18 17:52:46 INFO streaming.StreamJob:  map 100%  reduce 74%
40 13/04/18 17:52:49 INFO streaming.StreamJob:  map 100%  reduce 76%
41 13/04/18 17:52:52 INFO streaming.StreamJob:  map 100%  reduce 78%
42 13/04/18 17:52:55 INFO streaming.StreamJob:  map 100%  reduce 79%
43 13/04/18 17:52:58 INFO streaming.StreamJob:  map 100%  reduce 81%
44 13/04/18 17:53:01 INFO streaming.StreamJob:  map 100%  reduce 84%
45 13/04/18 17:53:04 INFO streaming.StreamJob:  map 100%  reduce 87%
46 13/04/18 17:53:07 INFO streaming.StreamJob:  map 100%  reduce 90%
47 13/04/18 17:53:10 INFO streaming.StreamJob:  map 100%  reduce 93%
48 13/04/18 17:53:13 INFO streaming.StreamJob:  map 100%  reduce 96%
49 13/04/18 17:53:16 INFO streaming.StreamJob:  map 100%  reduce 98%
50 13/04/18 17:53:19 INFO streaming.StreamJob:  map 100%  reduce 99%
51 13/04/18 17:53:22 INFO streaming.StreamJob:  map 100%  reduce 100%
52 13/04/18 17:54:10 INFO streaming.StreamJob: Job complete: job_201303302227_0047
53 13/04/18 17:54:10 INFO streaming.StreamJob: Output: /user/xiaoxiang/output/streaming/python

驗證結果,以下所示:編程

查看源代碼ubuntu

<EMBED style="BORDER-BOTTOM: 0px; BORDER-LEFT: 0px; PADDING-BOTTOM: 0px; BACKGROUND-COLOR: transparent; MARGIN: 1.5em 0px; OUTLINE-STYLE: none; OUTLINE-COLOR: invert; PADDING-LEFT: 0px; OUTLINE-WIDTH: 0px; PADDING-RIGHT: 0px; DISPLAY: block; MAX-WIDTH: 100%; FONT-SIZE: 12px; VERTICAL-ALIGN: baseline; BORDER-TOP: 0px; BORDER-RIGHT: 0px; PADDING-TOP: 0px" id=highlighter_684348_clipboard title=複製到剪貼板 height=16 type=application/x-shockwave-flash width=16 src=http://shiyanjun.cn/wp-content/plugins/syntaxhighlighter/syntaxhighlighter2/scripts/clipboard.swf menu="false" flashvars="highlighterId=highlighter_684348" wmode="transparent" allowscriptaccess="always">app

打印幫助框架

01 xiaoxiang@ubuntu3:~/hadoop$ bin/hadoop fs -lsr /user/xiaoxiang/output/streaming/python
02 -rw-r--r--   3 xiaoxiang supergroup          0 2013-04-18 17:54 /user/xiaoxiang/output/streaming/python/_SUCCESS
03 drwxr-xr-x   - xiaoxiang supergroup          0 2013-04-18 17:50 /user/xiaoxiang/output/streaming/python/_logs
04 drwxr-xr-x   - xiaoxiang supergroup          0 2013-04-18 17:50 /user/xiaoxiang/output/streaming/python/_logs/history
05 -rw-r--r--   3 xiaoxiang supergroup      37646 2013-04-18 17:50 /user/xiaoxiang/output/streaming/python/_logs/history/job_201303302227_0047_1366278617842_xiaoxiang_streamjob2336302975421423718.jar
06 -rw-r--r--   3 xiaoxiang supergroup      21656 2013-04-18 17:50 /user/xiaoxiang/output/streaming/python/_logs/history/job_201303302227_0047_conf.xml
07 -rw-r--r--   3 xiaoxiang supergroup   91367389 2013-04-18 17:52 /user/xiaoxiang/output/streaming/python/part-00000
08 -rw-r--r--   3 xiaoxiang supergroup   91268074 2013-04-18 17:52 /user/xiaoxiang/output/streaming/python/part-00001
09 xiaoxiang@ubuntu3:~/hadoop$ bin/hadoop fs -cat /user/xiaoxiang/output/streaming/python/part-00000 | head -5
10 !    2
11 #    36
12 #039    1
13 #1059)    1
14 #1098    1
相關文章
相關標籤/搜索