在Hadoop上用Python實現WordCount

1、簡單說明python

  本例中咱們用Python寫一個簡單的運行在Hadoop上的MapReduce程序,即WordCount(讀取文本文件並統計單詞的詞頻)。這裏咱們將要輸入的單詞文本input.txt和Python腳本放到/home/data/python/WordCount目錄下。app

cd /home/data/python/WordCountoop

vi input.txt測試

輸入:spa

There is no denying thatblog

hello python排序

hello mapreduceip

mapreduce is goodhadoop

 

2、編寫Map代碼input

  這裏咱們建立一個mapper.py腳本,從標準輸入(stdin)讀取數據,默認以空格分隔單詞,而後按行輸出單詞機器詞頻到標準輸出(stdout),整個Map處理過程不會統計每一個單詞出現的總次數,而是直接輸出「word 1」,以便做爲Reduce的輸入進行統計,確保該文件是可執行的(chmod +x /home/data/python//WordCount/mapper.py)。

cd /home/data/python//WordCount

vi mapper.py

#!/usr/bin/env python
# -*- coding:UTF-8 -*-

import sys

for line in sys.stdin:     #sys.stdin爲讀取數據,遍歷讀入數據的每一行

    line = line.strip()   #刪除開頭和結尾的空格

    words = line.split()  #以默認空格分隔行單詞到words列表

    for word in words:

    #輸出全部單詞,格式爲「單詞,1」以便做爲Reduce的輸入

    print('%s\t%s' %(word,1))

#截圖以下:

 

3、編寫Reduce代碼

  這裏咱們建立一個reducer.py腳本,從標準輸入(stdin)讀取mapper.py的結果,而後統計每一個單詞出現的總次數並輸出到標準輸出(stdout),

確保該文件是可執行的(chmod +x /home/data/python//WordCount/reducer.py)

cd /home/data/python//WordCount

vi reducer.py

#!/usr/bin/env python 

# -*- coding:UTF-8 -*-

import sys

current_word = None    #當前單詞

current_count = 0     #當前單詞頻數

word = None

for line in sys.stdin: 

    line = line.strip()    #刪除開頭和結尾的空格

    #解析mapper.py輸出做爲程序的輸入,以tab做爲分隔符

    word,count = line.split('\t',1)

    try:

   count = int(count)   #轉換count從字符型爲整型

    except ValueError:

       continue

    #要求mapper.py的輸出作排序操做,以便對鏈接的word作判斷,hadoop會自動排序

    if current_word == word:    #若是當前的單詞等於讀入的單詞

       current_count += count    #單詞頻數加1

    else:

       if current_word:    #若是當前的單詞不爲空則打印其單詞和頻數

 

           print('%s\t%s' %(current_word,current_count))

       current_count = count   #不然將讀入的單詞賦值給當前單詞,且更新頻數

       current_word = word

 

if current_word == word   #輸出最後一個word統計

    print('%s\%s' %(current_word,current_count))

#截圖以下:

 

4、本地測試代碼

  咱們能夠在Hadoop平臺運行以前在本地測試,校驗mapper.py與reducer.py運行的結果是否正確。注意:測試reducer.py時須要對mapper.py的輸出作排序(sort)操做,不過,Hadoop環境會自動實現排序。

 #在本地運行mapper.py:

cd /home/data/python/WordCount/

#記得執行: chmod +x /home/data/python//WordCount/mapper.py

cat input.txt | ./mapper.py

 

#在本地運行reducer.py

#記得執行:chmod +x /home/data/python//WordCount/reducer.py

cat input.txt | ./mapper.py | sort -k1,1 | ./reducer.py

 

#這裏注意:利用管道符「|」將輸出數據做爲mapper.py這個腳本的輸入數據,並將mapper.py的數據輸入到reducer.py中,其中參數sort -k 1,1是將reducer的輸出內容按照第一列的第一個字母的ASCII碼值進行升序排序。

 

 

 

5、在Hadoop平臺上運行代碼

在hadoop運行代碼,前提是已經搭建好hadoop集羣

一、建立目錄並上傳文件

  首先在HDFS上建立文本文件存儲目錄,這裏我建立爲:/WordCound

hdfs dfs -mkdir /WordCound

#將本地文件input.txt上傳到hdfs的/WordCount上。

hadoop fs -put /home/data/python/WordCount/input.txt /WordCount

hadoop fs -ls /WordCount       #查看在hdfs中/data/WordCount目錄下的內容

 

二、執行MapReduce程序

  爲了簡化咱們執行Hadoop MapReduce的命令,咱們能夠將Hadoop的hadoop-streaming-3.0.0.jar加入到系統環境變量/etc/profile中,在/etc/profile文件中添加以下配置:

首先在配置裏導入hadoop-streaming-3.0.0.jar

vi /etc/profile

HADOOP_STREAM=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.0.0.jar 

export HADOOP_STREAM 

source /etc/profile    #刷新配置

 

#執行如下命令:

hadoop jar $HADOOP_STREAM -file /home/data/python/WordCount/mapper.py -mapper ./mapper.py -file /home/data/python/WordCount/reducer.py -reducer ./reducer.py -input /WordCount -output /output/word1

獲得:

而後,輸入如下命令查看結果:

hadoop fs -ls /output/word1

hadoop fs -cat /output/word1/part-00000    #查看分析結果

 

能夠發現,結果與以前測試的時候是一致的,那麼恭喜你,大功告成!

相關文章
相關標籤/搜索