Hadoop Streaming例子(python)

  之前老是用java寫一些MapReduce程序現舉一個例子使用Python經過Hadoop Streaming來實現Mapreduce。java

  任務描述:python

  HDFS上有兩個目錄/a和/b,裏面數據均有3列,第一列都是id,第二列是各自的業務類型(這裏假設/a對應a,/b對應b),第三列是一個json串。各舉一例:shell

  /a的一行:1234567  a  {"name":"jiufeng","age":"27","sex":"male","school":"","status":["111","000","001"],...}json

  /b的一行:12345  b  {"a":"abc","b":"adr","xxoo":"e",...}app

  要查找在/a中出現"status"且有"111"狀態,並且要再/b中有這個id的全部id列表。oop

  那麼來吧,首先須要mapper來提取/a中知足"status"有"111"狀態的id和第二列"a"、/b中全部行的前兩列,python代碼以下,mapper.py:spa

 1 #!/usr/bin/env python
 2 #coding = utf-8
 3 
 4 import json
 5 import sys
 6 import traceback
 7 import datetime,time
 8 
 9 def mapper():
10     for line in sys.stdin:
11         line = line.strip()
12         id,tag,content = line.split('\t')
13         if tag == 'a':
14             jstr = json.loads(content)
15             active = jstr.get('status',[])
16             if "111" in active:
17                 print '%s\t%s' %(id,tag)
18         if tag == 'b':
19             print '%s\t%s' % ( id,tag)
20 
21 if __name__ == '__main__':
22     mapper()

  這個mapper是從表中輸入中提取數據,而後將知足條件的數據經過標準輸出。而後是reducer.py:code

 1 #!/usr/bin/env python
 2 #coding = utf-8
 3 
 4 import sys
 5 import json
 6 
 7 def reducer():
 8     tag_a = 0
 9     tag_b = 0
10     pre_id = ''
11     for line in sys.stdin:
12         line = line.strip()
13         current_id,tag = line.split('\t')
14         if current_id != pre_id:
15             if tag_a==1 and tag_b==1:
16                 tag_a = 0
17                 tag_b = 0
18                 print '%s' % pre_id
19             else :
20                 tag_a = 0
21                 tag_b = 0
22         pre_id = current_id
23         if tag == 'a':
24             if tag_a == 0:
25                 tag_a = 1
26         if tag == 'b':
27             if tag_b == 0:
28                 tag_b = 1
29     if tag_b==1 and tag_b==1:
30         print '%s' % pre_id
31 
32 if __name__ == '__main__':
33     reducer()

  一個reducer能夠接受N多行數據,不像java那樣的一行對應一個key而後多個value,而是一個key對應一個value,但好在相同key的行都是連續的,只要在key變化的時候作一下處理就行。blog

  而後安排讓hadoop執行,schedule.py:ip

 1 #!/usr/bin/env python
 2 #coding = utf-8
 3 
 4 import subprocess, os
 5 import datetime
 6 
 7 
 8 def mr_job():
 9     mypath = os.path.dirname(os.path.abspath(__file__))
10     inputpath1 = '/b/*'
11     inputpath2 = '/a/*'
12     outputpath = '/out/'
13     mapper = mypath + '/mapper.py'
14     reducer = mypath + '/reducer.py'
15     cmds = ['$HADOOP_HOME/bin/hadoop', 'jar', '$HADOOP_HOME/contrib/streaming/hadoop-streaming-1.2.1.jar',
16             '-numReduceTasks', '40',
17             '-input', inputpath1,
18             '-input', inputpath2,
19             '-output', outputpath,
20             '-mapper', mapper,
21             '-reducer', reducer,
22             '-file', mapper,
23             '-file', reducer,]
24     for f in os.listdir(mypath):
25         cmds.append(mypath + '/' + f)
26     cmd = ['$HADOOP_HOME/bin/hadoop', 'fs', '-rmr', outputpath]
27     subprocess.call(cmd)
28     subprocess.call(cmds)
29 
30 
31 def main():
32     mr_job()
33 
34 if __name__ == '__main__':
35     main()

  schedule.py就是執行MapReduce的地方經過調用hadoop-streamingXXX.jar會經過調用shell命令來提交job,另外能夠配置一下參數,shell命令會將制定的文件上傳到hdfs而後分發到各個節點執行。。。$HADOOP_HOME就是hadoop的安裝目錄。。。mapper和reducer的python腳本的名字無所謂,方法名無所謂由於在配置shell執行命令時已經指定了

 

  上述是一個很簡單的python_hadoop-streamingXXX例子。。。。

相關文章
相關標籤/搜索