Hadoop Streaming提供了一個便於進行MapReduce編程的工具包,使用它能夠基於一些可執行命令、腳本語言或其餘編程語言來實現Mapper和 Reducer,從而充分利用Hadoop並行計算框架的優點和能力,來處理大數據。須要注意的是,Streaming方式是基於Unix系統的標準輸入輸出來進行MapReduce Job的運行,它區別與Pipes的地方主要是通訊協議,Pipes使用的是Socket通訊,是對使用C++語言來實現MapReduce Job並經過Socket通訊來與Hadopp平臺通訊,完成Job的執行。任何支持標準輸入輸出特性的編程語言均可以使用Streaming方式來實現MapReduce Job,基本原理就是輸入從Unix系統標準輸入,輸出使用Unix系統的標準輸出。
Hadoop是使用Java語言編寫的,因此最直接的方式的就是使用Java語言來實現Mapper和Reducer,而後配置MapReduce Job,提交到集羣計算環境來完成計算。可是不少開發者可能對Java並不熟悉,而是對一些具備腳本特性的語言,如C++、Shell、Python、 Ruby、PHP、Perl有實際開發經驗,Hadoop Streaming爲這一類開發者提供了使用Hadoop集羣來進行處理數據的工具,即工具包hadoop-streaming-.jar。
Hadoop Streaming使用了Unix的標準輸入輸出做爲Hadoop和其餘編程語言的開發接口,所以在其餘的編程語言所寫的程序中,只須要將標準輸入做爲程 序的輸入,將標準輸出做爲程序的輸出就能夠了。在標準的輸入輸出中,Key和Value是以Tab做爲分隔符,而且在Reducer的標準輸入中,Hadoop框架保證了輸入的數據是通過了按Key排序的。
如何使用Hadoop Streaming工具呢?咱們能夠查看該工具的使用方法,經過命令行來獲取,以下所示:html
01 |
xiaoxiang@ubuntu3:~/hadoop$ bin/hadoop jar ./contrib/streaming/hadoop-streaming-1.0.3.jar -info |
02 |
Usage: $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar [options] |
04 |
-input <path> DFS input file (s) for the Map step |
05 |
-output <path> DFS output directory for the Reduce step |
06 |
-mapper <cmd|JavaClassName> The streaming command to run |
07 |
-combiner <cmd|JavaClassName> The streaming command to run |
08 |
-reducer <cmd|JavaClassName> The streaming command to run |
09 |
- file < file > File/ dir to be shipped in the Job jar file |
10 |
-inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional. |
11 |
-outputformat TextOutputFormat(default)|JavaClassName Optional. |
12 |
-partitioner JavaClassName Optional. |
13 |
-numReduceTasks <num> Optional. |
14 |
-inputreader <spec> Optional. |
15 |
-cmdenv <n>=< v > Optional. Pass env .var to streaming commands |
16 |
-mapdebug <path> Optional. To run this script when a map task fails |
17 |
-reducedebug <path> Optional. To run this script when a reduce task fails |
18 |
-io <identifier> Optional. |
21 |
Generic options supported are |
22 |
-conf <configuration file > specify an application configuration file |
23 |
-D <property=value> use value for given property |
24 |
-fs < local |namenode:port> specify a namenode |
25 |
-jt < local |jobtracker:port> specify a job tracker |
26 |
-files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster |
27 |
-libjars <comma separated list of jars> specify comma separated jar files to include in the classpath. |
28 |
-archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines. |
30 |
The general command line syntax is |
31 |
bin/hadoop command [genericOptions] [commandOptions] |
34 |
In -input: globbing on <path> is supported and can have multiple -input |
35 |
Default Map input format : a line is a record in UTF-8 |
36 |
the key part ends at first TAB, the rest of the line is the value |
37 |
Custom input format : -inputformat package.MyInputFormat |
38 |
Map output format , reduce input/output format : |
39 |
Format defined by what the mapper command outputs. Line-oriented |
41 |
The files named in the - file argument[s] end up in the |
42 |
working directory when the mapper and reducer are run. |
43 |
The location of this working directory is unspecified. |
45 |
To set the number of reduce tasks (num. of output files): |
46 |
-D mapred.reduce.tasks=10 |
47 |
To skip the sort /combine/shuffle/ sort /reduce step: |
49 |
A Task 's Map output then becomes a ' side-effect output' rather than a reduce input |
50 |
This speeds up processing, This also feels more like "in-place" processing |
51 |
because the input filename and the map input order are preserved |
52 |
This equivalent -reducer NONE |
54 |
To speed up the last maps: |
55 |
-D mapred.map.tasks.speculative.execution= true |
56 |
To speed up the last reduces: |
57 |
-D mapred.reduce.tasks.speculative.execution= true |
58 |
To name the job (appears in the JobTracker Web UI): |
59 |
-D mapred.job.name= 'My Job' |
60 |
To change the local temp directory: |
61 |
-D dfs.data. dir =/tmp/dfs |
62 |
-D stream.tmpdir=/tmp/streaming |
63 |
Additional local temp directories with -cluster local : |
64 |
-D mapred. local . dir =/tmp/ local |
65 |
-D mapred.system. dir =/tmp/system |
66 |
-D mapred.temp. dir =/tmp/temp |
67 |
To treat tasks with non-zero exit status as SUCCEDED: |
68 |
-D stream.non.zero. exit .is.failure= false |
69 |
Use a custom hadoopStreaming build along a standard hadoop install : |
70 |
$HADOOP_HOME/bin/hadoop jar /path/my-hadoop-streaming.jar [...]\ |
71 |
[...] -D stream.shipped.hadoopstreaming=/path/my-hadoop-streaming.jar |
72 |
For more details about jobconf parameters see: |
76 |
To set an environement variable in a streaming command : |
77 |
-cmdenv EXAMPLE_DIR=/home/example/dictionaries/ |
80 |
setenv HSTREAMING "$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar" |
82 |
Example: $HSTREAMING -mapper "/usr/local/bin/perl5 filter.pl" |
83 |
- file / local /filter.pl -input "/logs/0604*/*" [...] |
84 |
Ships a script, invokes the non-shipped perl interpreter |
85 |
Shipped files go to the working directory so filter.pl is found by perl |
86 |
Input files are all the daily logs for days in month 2006-04 |
面,咱們分別選擇幾個可使用Hadoop Streaming工具來進行計算的例子,好比對單詞詞頻進行統計計算,即WordCount功能。
首先,咱們準備測試使用的數據集,以下所示:node
1 |
xiaoxiang@ubuntu3:~/hadoop$ bin/hadoop fs -lsr /user/xiaoxiang/dataset/ join / |
2 |
-rw-r--r-- 3 xiaoxiang supergroup 391103029 2013-03-26 12:19 /user/xiaoxiang/dataset/ join /irc_basic_info.ds |
3 |
-rw-r--r-- 3 xiaoxiang supergroup 11577164 2013-03-26 12:20 /user/xiaoxiang/dataset/ join /irc_net_block.ds |
4 |
-rw-r--r-- 3 xiaoxiang supergroup 8335235 2013-03-26 12:20 /user/xiaoxiang/dataset/ join /irc_org_info.ds |
一共有3個數據文件,大概將近400M大小。
下面,選擇Python語言來實現MapReduce Job的運行。
使用Python實現Mapper,代碼文件爲word_count_mapper.py,代碼以下所示python
7 |
words = filter ( lambda word: word, line.split()) |
9 |
print '%s\t%s' % (word, 1 ) |
使用Python實現Reducer,代碼文件爲word_count_reducer.py,代碼以下所示:web
04 |
from operator import itemgetter |
08 |
for line in sys.stdin: |
10 |
word, count = line.split() |
13 |
wc_dict[word] = wc_dict.get(word, 0 ) + count |
17 |
sorted_dict = sorted (wc_dict.items(), key = itemgetter( 0 )) |
18 |
for word, count in sorted_dict: |
19 |
print '%s\t%s' % (word, count) |
運行Python實現的MapReduce程序,以下所示:apache
01 |
xiaoxiang@ubuntu3:~/hadoop$ bin/hadoop jar ./contrib/streaming/hadoop-streaming-1.0.3.jar -input /user/xiaoxiang/dataset/ join / -output /user/xiaoxiang/output/streaming/python -mapper word_count_mapper.py -reducer word_count_reducer.py -numReduceTasks 2 - file *.py |
02 |
packageJobJar: [word_count_mapper.py, word_count_reducer.py, /opt/stone/cloud/storage/tmp/hadoop-xiaoxiang/hadoop-unjar4066863202997744310/] [] /tmp/streamjob2336302975421423718.jar tmpDir=null |
03 |
13/04/18 17:50:17 INFO util.NativeCodeLoader: Loaded the native-hadoop library |
04 |
13/04/18 17:50:17 WARN snappy.LoadSnappy: Snappy native library not loaded |
05 |
13/04/18 17:50:17 INFO mapred.FileInputFormat: Total input paths to process : 3 |
06 |
13/04/18 17:50:17 INFO streaming.StreamJob: getLocalDirs(): [/opt/stone/cloud/storage/mapred/ local ] |
07 |
13/04/18 17:50:17 INFO streaming.StreamJob: Running job: job_201303302227_0047 |
08 |
13/04/18 17:50:17 INFO streaming.StreamJob: To kill this job, run: |
09 |
13/04/18 17:50:17 INFO streaming.StreamJob: /opt/stone/cloud/hadoop-1.0.3/libexec/../bin/hadoop job -Dmapred.job.tracker=hdfs://ubuntu3:9001/ - kill job_201303302227_0047 |
11 |
13/04/18 17:50:18 INFO streaming.StreamJob: map 0% reduce 0% |
12 |
13/04/18 17:50:33 INFO streaming.StreamJob: map 7% reduce 0% |
13 |
13/04/18 17:50:36 INFO streaming.StreamJob: map 11% reduce 0% |
14 |
13/04/18 17:50:39 INFO streaming.StreamJob: map 15% reduce 0% |
15 |
13/04/18 17:50:42 INFO streaming.StreamJob: map 19% reduce 0% |
16 |
13/04/18 17:50:45 INFO streaming.StreamJob: map 23% reduce 0% |
17 |
13/04/18 17:50:48 INFO streaming.StreamJob: map 25% reduce 0% |
18 |
13/04/18 17:51:09 INFO streaming.StreamJob: map 32% reduce 2% |
19 |
13/04/18 17:51:12 INFO streaming.StreamJob: map 36% reduce 4% |
20 |
13/04/18 17:51:15 INFO streaming.StreamJob: map 40% reduce 8% |
21 |
13/04/18 17:51:18 INFO streaming.StreamJob: map 44% reduce 8% |
22 |
13/04/18 17:51:21 INFO streaming.StreamJob: map 47% reduce 8% |
23 |
13/04/18 17:51:24 INFO streaming.StreamJob: map 50% reduce 8% |
24 |
13/04/18 17:51:45 INFO streaming.StreamJob: map 54% reduce 10% |
25 |
13/04/18 17:51:48 INFO streaming.StreamJob: map 60% reduce 15% |
26 |
13/04/18 17:51:51 INFO streaming.StreamJob: map 65% reduce 17% |
27 |
13/04/18 17:51:55 INFO streaming.StreamJob: map 66% reduce 17% |
28 |
13/04/18 17:51:58 INFO streaming.StreamJob: map 68% reduce 17% |
29 |
13/04/18 17:52:01 INFO streaming.StreamJob: map 72% reduce 17% |
30 |
13/04/18 17:52:04 INFO streaming.StreamJob: map 75% reduce 17% |
31 |
13/04/18 17:52:19 INFO streaming.StreamJob: map 75% reduce 19% |
32 |
13/04/18 17:52:22 INFO streaming.StreamJob: map 87% reduce 21% |
33 |
13/04/18 17:52:25 INFO streaming.StreamJob: map 100% reduce 23% |
34 |
13/04/18 17:52:28 INFO streaming.StreamJob: map 100% reduce 27% |
35 |
13/04/18 17:52:31 INFO streaming.StreamJob: map 100% reduce 29% |
36 |
13/04/18 17:52:37 INFO streaming.StreamJob: map 100% reduce 49% |
37 |
13/04/18 17:52:40 INFO streaming.StreamJob: map 100% reduce 69% |
38 |
13/04/18 17:52:43 INFO streaming.StreamJob: map 100% reduce 72% |
39 |
13/04/18 17:52:46 INFO streaming.StreamJob: map 100% reduce 74% |
40 |
13/04/18 17:52:49 INFO streaming.StreamJob: map 100% reduce 76% |
41 |
13/04/18 17:52:52 INFO streaming.StreamJob: map 100% reduce 78% |
42 |
13/04/18 17:52:55 INFO streaming.StreamJob: map 100% reduce 79% |
43 |
13/04/18 17:52:58 INFO streaming.StreamJob: map 100% reduce 81% |
44 |
13/04/18 17:53:01 INFO streaming.StreamJob: map 100% reduce 84% |
45 |
13/04/18 17:53:04 INFO streaming.StreamJob: map 100% reduce 87% |
46 |
13/04/18 17:53:07 INFO streaming.StreamJob: map 100% reduce 90% |
47 |
13/04/18 17:53:10 INFO streaming.StreamJob: map 100% reduce 93% |
48 |
13/04/18 17:53:13 INFO streaming.StreamJob: map 100% reduce 96% |
49 |
13/04/18 17:53:16 INFO streaming.StreamJob: map 100% reduce 98% |
50 |
13/04/18 17:53:19 INFO streaming.StreamJob: map 100% reduce 99% |
51 |
13/04/18 17:53:22 INFO streaming.StreamJob: map 100% reduce 100% |
52 |
13/04/18 17:54:10 INFO streaming.StreamJob: Job complete: job_201303302227_0047 |
53 |
13/04/18 17:54:10 INFO streaming.StreamJob: Output: /user/xiaoxiang/output/streaming/python |
驗證結果,以下所示:編程
查看源代碼ubuntu
<EMBED style="BORDER-BOTTOM: 0px; BORDER-LEFT: 0px; PADDING-BOTTOM: 0px; BACKGROUND-COLOR: transparent; MARGIN: 1.5em 0px; OUTLINE-STYLE: none; OUTLINE-COLOR: invert; PADDING-LEFT: 0px; OUTLINE-WIDTH: 0px; PADDING-RIGHT: 0px; DISPLAY: block; MAX-WIDTH: 100%; FONT-SIZE: 12px; VERTICAL-ALIGN: baseline; BORDER-TOP: 0px; BORDER-RIGHT: 0px; PADDING-TOP: 0px" id=highlighter_684348_clipboard title=複製到剪貼板 height=16 type=application/x-shockwave-flash width=16 src=http://shiyanjun.cn/wp-content/plugins/syntaxhighlighter/syntaxhighlighter2/scripts/clipboard.swf menu="false" flashvars="highlighterId=highlighter_684348" wmode="transparent" allowscriptaccess="always">app
打印幫助框架
01 |
xiaoxiang@ubuntu3:~/hadoop$ bin/hadoop fs -lsr /user/xiaoxiang/output/streaming/python |
02 |
-rw-r--r-- 3 xiaoxiang supergroup 0 2013-04-18 17:54 /user/xiaoxiang/output/streaming/python/_SUCCESS |
03 |
drwxr-xr-x - xiaoxiang supergroup 0 2013-04-18 17:50 /user/xiaoxiang/output/streaming/python/_logs |
04 |
drwxr-xr-x - xiaoxiang supergroup 0 2013-04-18 17:50 /user/xiaoxiang/output/streaming/python/_logs/ history |
05 |
-rw-r--r-- 3 xiaoxiang supergroup 37646 2013-04-18 17:50 /user/xiaoxiang/output/streaming/python/_logs/ history /job_201303302227_0047_1366278617842_xiaoxiang_streamjob2336302975421423718.jar |
06 |
-rw-r--r-- 3 xiaoxiang supergroup 21656 2013-04-18 17:50 /user/xiaoxiang/output/streaming/python/_logs/ history /job_201303302227_0047_conf.xml |
07 |
-rw-r--r-- 3 xiaoxiang supergroup 91367389 2013-04-18 17:52 /user/xiaoxiang/output/streaming/python/part-00000 |
08 |
-rw-r--r-- 3 xiaoxiang supergroup 91268074 2013-04-18 17:52 /user/xiaoxiang/output/streaming/python/part-00001 |
09 |
xiaoxiang@ubuntu3:~/hadoop$ bin/hadoop fs - cat /user/xiaoxiang/output/streaming/python/part-00000 | head -5 |