1、簡單說明python
本例中咱們用Python寫一個簡單的運行在Hadoop上的MapReduce程序,即WordCount(讀取文本文件並統計單詞的詞頻)。這裏咱們將要輸入的單詞文本input.txt和Python腳本放到/home/data/python/WordCount目錄下。app
cd /home/data/python/WordCountoop
vi input.txt測試
輸入:spa
There is no denying thatblog
hello python排序
hello mapreduceip
mapreduce is goodhadoop
2、編寫Map代碼input
這裏咱們建立一個mapper.py腳本,從標準輸入(stdin)讀取數據,默認以空格分隔單詞,而後按行輸出單詞機器詞頻到標準輸出(stdout),整個Map處理過程不會統計每一個單詞出現的總次數,而是直接輸出「word 1」,以便做爲Reduce的輸入進行統計,確保該文件是可執行的(chmod +x /home/data/python//WordCount/mapper.py)。
cd /home/data/python//WordCount
vi mapper.py
#!/usr/bin/env python
# -*- coding:UTF-8 -*-
import sys
for line in sys.stdin: #sys.stdin爲讀取數據,遍歷讀入數據的每一行
line = line.strip() #刪除開頭和結尾的空格
words = line.split() #以默認空格分隔行單詞到words列表
for word in words:
#輸出全部單詞,格式爲「單詞,1」以便做爲Reduce的輸入
print('%s\t%s' %(word,1))
#截圖以下:
3、編寫Reduce代碼
這裏咱們建立一個reducer.py腳本,從標準輸入(stdin)讀取mapper.py的結果,而後統計每一個單詞出現的總次數並輸出到標準輸出(stdout),
確保該文件是可執行的(chmod +x /home/data/python//WordCount/reducer.py)
cd /home/data/python//WordCount
vi reducer.py
#!/usr/bin/env python
# -*- coding:UTF-8 -*-
import sys
current_word = None #當前單詞
current_count = 0 #當前單詞頻數
word = None
for line in sys.stdin:
line = line.strip() #刪除開頭和結尾的空格
#解析mapper.py輸出做爲程序的輸入,以tab做爲分隔符
word,count = line.split('\t',1)
try:
count = int(count) #轉換count從字符型爲整型
except ValueError:
continue
#要求mapper.py的輸出作排序操做,以便對鏈接的word作判斷,hadoop會自動排序
if current_word == word: #若是當前的單詞等於讀入的單詞
current_count += count #單詞頻數加1
else:
if current_word: #若是當前的單詞不爲空則打印其單詞和頻數
print('%s\t%s' %(current_word,current_count))
current_count = count #不然將讀入的單詞賦值給當前單詞,且更新頻數
current_word = word
if current_word == word #輸出最後一個word統計
print('%s\%s' %(current_word,current_count))
#截圖以下:
4、本地測試代碼
咱們能夠在Hadoop平臺運行以前在本地測試,校驗mapper.py與reducer.py運行的結果是否正確。注意:測試reducer.py時須要對mapper.py的輸出作排序(sort)操做,不過,Hadoop環境會自動實現排序。
#在本地運行mapper.py:
cd /home/data/python/WordCount/
#記得執行: chmod +x /home/data/python//WordCount/mapper.py
cat input.txt | ./mapper.py
#在本地運行reducer.py
#記得執行:chmod +x /home/data/python//WordCount/reducer.py
cat input.txt | ./mapper.py | sort -k1,1 | ./reducer.py
#這裏注意:利用管道符「|」將輸出數據做爲mapper.py這個腳本的輸入數據,並將mapper.py的數據輸入到reducer.py中,其中參數sort -k 1,1是將reducer的輸出內容按照第一列的第一個字母的ASCII碼值進行升序排序。
5、在Hadoop平臺上運行代碼
在hadoop運行代碼,前提是已經搭建好hadoop集羣
一、建立目錄並上傳文件
首先在HDFS上建立文本文件存儲目錄,這裏我建立爲:/WordCound
hdfs dfs -mkdir /WordCound
#將本地文件input.txt上傳到hdfs的/WordCount上。
hadoop fs -put /home/data/python/WordCount/input.txt /WordCount
hadoop fs -ls /WordCount #查看在hdfs中/data/WordCount目錄下的內容
二、執行MapReduce程序
爲了簡化咱們執行Hadoop MapReduce的命令,咱們能夠將Hadoop的hadoop-streaming-3.0.0.jar加入到系統環境變量/etc/profile中,在/etc/profile文件中添加以下配置:
首先在配置裏導入hadoop-streaming-3.0.0.jar
vi /etc/profile
HADOOP_STREAM=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.0.0.jar
export HADOOP_STREAM
source /etc/profile #刷新配置
#執行如下命令:
hadoop jar $HADOOP_STREAM -file /home/data/python/WordCount/mapper.py -mapper ./mapper.py -file /home/data/python/WordCount/reducer.py -reducer ./reducer.py -input /WordCount -output /output/word1
獲得:
而後,輸入如下命令查看結果:
hadoop fs -ls /output/word1
hadoop fs -cat /output/word1/part-00000 #查看分析結果
能夠發現,結果與以前測試的時候是一致的,那麼恭喜你,大功告成!