之前老是用java寫一些MapReduce程序現舉一個例子使用Python經過Hadoop Streaming來實現Mapreduce。java
任務描述:python
HDFS上有兩個目錄/a和/b,裏面數據均有3列,第一列都是id,第二列是各自的業務類型(這裏假設/a對應a,/b對應b),第三列是一個json串。各舉一例:shell
/a的一行:1234567 a {"name":"jiufeng","age":"27","sex":"male","school":"","status":["111","000","001"],...}json
/b的一行:12345 b {"a":"abc","b":"adr","xxoo":"e",...}app
要查找在/a中出現"status"且有"111"狀態,並且要再/b中有這個id的全部id列表。oop
那麼來吧,首先須要mapper來提取/a中知足"status"有"111"狀態的id和第二列"a"、/b中全部行的前兩列,python代碼以下,mapper.py:spa
1 #!/usr/bin/env python 2 #coding = utf-8 3 4 import json 5 import sys 6 import traceback 7 import datetime,time 8 9 def mapper(): 10 for line in sys.stdin: 11 line = line.strip() 12 id,tag,content = line.split('\t') 13 if tag == 'a': 14 jstr = json.loads(content) 15 active = jstr.get('status',[]) 16 if "111" in active: 17 print '%s\t%s' %(id,tag) 18 if tag == 'b': 19 print '%s\t%s' % ( id,tag) 20 21 if __name__ == '__main__': 22 mapper()
這個mapper是從表中輸入中提取數據,而後將知足條件的數據經過標準輸出。而後是reducer.py:code
1 #!/usr/bin/env python 2 #coding = utf-8 3 4 import sys 5 import json 6 7 def reducer(): 8 tag_a = 0 9 tag_b = 0 10 pre_id = '' 11 for line in sys.stdin: 12 line = line.strip() 13 current_id,tag = line.split('\t') 14 if current_id != pre_id: 15 if tag_a==1 and tag_b==1: 16 tag_a = 0 17 tag_b = 0 18 print '%s' % pre_id 19 else : 20 tag_a = 0 21 tag_b = 0 22 pre_id = current_id 23 if tag == 'a': 24 if tag_a == 0: 25 tag_a = 1 26 if tag == 'b': 27 if tag_b == 0: 28 tag_b = 1 29 if tag_b==1 and tag_b==1: 30 print '%s' % pre_id 31 32 if __name__ == '__main__': 33 reducer()
一個reducer能夠接受N多行數據,不像java那樣的一行對應一個key而後多個value,而是一個key對應一個value,但好在相同key的行都是連續的,只要在key變化的時候作一下處理就行。blog
而後安排讓hadoop執行,schedule.py:ip
1 #!/usr/bin/env python 2 #coding = utf-8 3 4 import subprocess, os 5 import datetime 6 7 8 def mr_job(): 9 mypath = os.path.dirname(os.path.abspath(__file__)) 10 inputpath1 = '/b/*' 11 inputpath2 = '/a/*' 12 outputpath = '/out/' 13 mapper = mypath + '/mapper.py' 14 reducer = mypath + '/reducer.py' 15 cmds = ['$HADOOP_HOME/bin/hadoop', 'jar', '$HADOOP_HOME/contrib/streaming/hadoop-streaming-1.2.1.jar', 16 '-numReduceTasks', '40', 17 '-input', inputpath1, 18 '-input', inputpath2, 19 '-output', outputpath, 20 '-mapper', mapper, 21 '-reducer', reducer, 22 '-file', mapper, 23 '-file', reducer,] 24 for f in os.listdir(mypath): 25 cmds.append(mypath + '/' + f) 26 cmd = ['$HADOOP_HOME/bin/hadoop', 'fs', '-rmr', outputpath] 27 subprocess.call(cmd) 28 subprocess.call(cmds) 29 30 31 def main(): 32 mr_job() 33 34 if __name__ == '__main__': 35 main()
schedule.py就是執行MapReduce的地方經過調用hadoop-streamingXXX.jar會經過調用shell命令來提交job,另外能夠配置一下參數,shell命令會將制定的文件上傳到hdfs而後分發到各個節點執行。。。$HADOOP_HOME就是hadoop的安裝目錄。。。mapper和reducer的python腳本的名字無所謂,方法名無所謂由於在配置shell執行命令時已經指定了
上述是一個很簡單的python_hadoop-streamingXXX例子。。。。