Python實現MapReduce,wordcount實例,MapReduce實現兩表的Join

Python實現MapReducejavascript

下面使用mapreduce模式實現了一個簡單的統計日誌中單詞出現次數的程序: html

from functools import reduce from multiprocessing import Pool from collections import Counter def read_inputs(file): for line in file: line = line.strip() yield line.split() def count(file_name): file = open(file_name) lines = read_inputs(file) c = Counter() for words in lines: for word in words: c[word] += 1 return c def do_task(): job_list = ['log.txt'] * 10000 pool = Pool(8) return reduce(lambda x, y: x+y, pool.map(count, job_list)) if __name__ == "__main__": rv = do_task()







一個python實現的mapreduce程序

版權聲明:本文爲博主原創文章,未經博主容許不得轉載。 https://blog.csdn.net/weijianpeng2013_2015/article/details/71908340

map:java

# !/usr/bin/env python import sys for line in sys.stdin: line = line.strip() words = line.split() for word in words: print ("%s\t%s") % (word, 1) 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

reduce:node

#!/usr/bin/env python import operator import sys current_word = None curent_count = 0 word = None for line in sys.stdin: line = line.strip() word, count = line.split('\t', 1) try: count = int(count) except ValueError: continue if current_word == word: curent_count += count else: if current_word: print '%s\t%s' % (current_word,curent_count) current_word=word curent_count=count if current_word==word: print '%s\t%s' % (current_word,curent_count)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

測試:python

[root@node1 input]# echo "foo foo quux labs foo bar zoo zoo hying" | /home/hadoop/input/max_map.py | sort | /home/hadoop/input/max_reduce.py
  • 1

這裏寫圖片描述

執行:可將其寫入腳本文件git

//注意\-file之間必定不能空格
hadoop jar /hadoop64/hadoop-2.7.1/share/hadoop/tools/lib/hadoop-*streaming*.jar -D stream.non.zero.exit.is.failure=false \-file /home/hadoop/input/max_map.py -mapper /home/hadoop/input/max_map.py \-file /home/hadoop/input/max_reduce.py -reducer /home/hadoop/input/max_reduce.py \-input /input/temperature/ -output /output/temperature
  • 1
  • 2

這裏寫圖片描述

 

 
 
 
 
 

Hadoop(三):MapReduce程序(python)

 

使用python語言進行MapReduce程序開發主要分爲兩個步驟,一是編寫程序,二是用Hadoop Streaming命令提交任務。github

仍是以詞頻統計爲例shell

1、程序開發
一、Mapper數據庫

1 for line in sys.stdin:
2     filelds = line.strip.split(' ')
3     for item in fileds:
4         print item+' '+'1'

二、Reducer編程

複製代碼
1 import sys
 2 
 3 result={}
 4 for line in  sys.stdin:
 5     kvs = line.strip().split(' ')
 6     k = kvs[0]
 7     v = kvs[1]
 8     if k in result:
 9         result[k]+=1
10     else:
11         result[k] = 1
12 for k,v in result.items():
13     print k+' '+v
複製代碼

....

寫完發現其實只用map就能夠處理了...reduce只用cat就行了

三、運行腳本

1)Streaming簡介

  Hadoop的MapReduce和HDFS均採用Java進行實現,默認提供Java編程接口,用戶經過這些編程接口,能夠定義map、reduce函數等等。
  可是若是但願使用其餘語言編寫map、reduce函數怎麼辦呢?
  Hadoop提供了一個框架Streaming,Streaming的原理是用Java實現一個包裝用戶程序的MapReduce程序,該程序負責調用hadoop提供的Java編程接口。

2)運行命令

  /.../bin/hadoop streaming

  -input /..../input

  -output /..../output

  -mapper "mapper.py"

  -reducer "reducer.py"

  -file mapper.py

  -file reducer.py

  -D mapred.job.name ="wordcount"

  -D mapred.reduce.tasks = "1"

3)Streaming經常使用命令

(1)-input <path>:指定做業輸入,path能夠是文件或者目錄,可使用*通配符,-input選項可使用屢次指定多個文件或目錄做爲輸入。

(2)-output <path>:指定做業輸出目錄,path必須不存在,並且執行做業的用戶必須有建立該目錄的權限,-output只能使用一次。

(3)-mapper:指定mapper可執行程序或Java類,必須指定且惟一。

(4)-reducer:指定reducer可執行程序或Java類,必須指定且惟一。

(5)-file, -cacheFile, -cacheArchive:分別用於向計算節點分發本地文件、HDFS文件和HDFS壓縮文件,具體使用方法參考文件分發與打包

(6)numReduceTasks:指定reducer的個數,若是設置-numReduceTasks 0或者-reducer NONE則沒有reducer程序,mapper的輸出直接做爲整個做業的輸出。

(7)-jobconf | -D NAME=VALUE:指定做業參數,NAME是參數名,VALUE是參數值,能夠指定的參數參考hadoop-default.xml。

   -jobconf mapred.job.name='My Job Name'設置做業名

   -jobconf mapred.job.priority=VERY_HIGH | HIGH | NORMAL | LOW | VERY_LOW設置做業優先級

   -jobconf mapred.job.map.capacity=M設置同時最多運行M個map任務

   -jobconf mapred.job.reduce.capacity=N設置同時最多運行N個reduce任務

   -jobconf mapred.map.tasks 設置map任務個數

   -jobconf mapred.reduce.tasks 設置reduce任務個數   

   -jobconf mapred.compress.map.output 設置map的輸出是否壓縮

   -jobconf mapred.map.output.compression.codec 設置map的輸出壓縮方式   

   -jobconf mapred.output.compress 設置reduce的輸出是否壓縮

   -jobconf mapred.output.compression.codec 設置reduce的輸出壓縮方式

   -jobconf stream.map.output.field.separator 設置map輸出分隔符

    例子:-D stream.map.output.field.separator=: \  以冒號進行分隔

            -D stream.num.map.output.key.fields=2 \  指定在第二個冒號處進行分隔,也就是第二個冒號以前的做爲key,以後的做爲value

(8)-combiner:指定combiner Java類,對應的Java類文件打包成jar文件後用-file分發。

(9)-partitioner:指定partitioner Java類,Streaming提供了一些實用的partitioner實現,參考KeyBasedFiledPartitonerIntHashPartitioner

(10)-inputformat, -outputformat:指定inputformat和outputformat Java類,用於讀取輸入數據和寫入輸出數據,分別要實現InputFormat和OutputFormat接口。若是不指定,默認使用TextInputFormat和TextOutputFormat。

(11)cmdenv NAME=VALUE:給mapper和reducer程序傳遞額外的環境變量,NAME是變量名,VALUE是變量值。

(12)-mapdebug, -reducedebug:分別指定mapper和reducer程序失敗時運行的debug程序。

(13)-verbose:指定輸出詳細信息,例如分發哪些文件,實際做業配置參數值等,能夠用於調試。

 

 
 
 
 
 
 

使用python實現MapReduce的wordcount實例

版權聲明:本文爲博主原創文章,未經博主容許不得轉載。 https://blog.csdn.net/sinat_33741547/article/details/54428025

Hadopp的基本框架是用java實現的,而各種書籍基本也是以java爲例實現mapreduce,但筆者平常工做都是用python,故此找了一些資料來用python實現mapreduce實例。

1、環境

一、Hadoop-2.7.3徹底分佈式搭建

二、python3.5

2、基本思想介紹

使用python實現mapreduce調用的是Hadoop Stream,主要利用STDIN(標準輸入),STDOUT(標準輸出)來實如今map函數和reduce函數之間的數據傳遞。

咱們須要作的是利用python的sys.stdin讀取輸入數據,並把輸入傳遞到sys.stdout,其餘的工做Hadoop的流API會爲咱們處理。

3、實例

如下是在hadoop官網下載的python版本mapper函數和reducer函數,文件位置默認在/usr/local/working中,

一、mapper.py

(1)代碼

 

  1.  
    import sys
  2.  
    #輸入爲標準輸入stdin
  3.  
    for line in sys.stdin:
  4.  
    #刪除開頭和結果的空格
  5.  
    line = line.strip( )
  6.  
    #以默認空格分隔行單詞到words列表
  7.  
    words = line.split( )
  8.  
    for word in words:
  9.  
    #輸出全部單詞,格式爲「單詞,1」以便做爲reduce的輸入
  10.  
    print( '%s\t%s' % (word,1))
(2)可對代碼進行檢驗

 

echo "aa bb cc dd aa cc" | python mapper.py

二、reducer.py

(1)代碼

 

  1.  
    import sys
  2.  
     
  3.  
    current_word = None
  4.  
    current_count = 0
  5.  
    word = None
  6.  
     
  7.  
    #獲取標準輸入,即mapper.py的輸出
  8.  
    for line in sys.stdin:
  9.  
    line = line.strip()
  10.  
    #解析mapper.py輸出做爲程序的輸入,以tab做爲分隔符
  11.  
    word,count = line.split( '\t',1)
  12.  
    #轉換count從字符型成整型
  13.  
    try:
  14.  
    count = int(count)
  15.  
    except ValueError:
  16.  
    #非字符時忽略此行
  17.  
    continue
  18.  
    #要求mapper.py的輸出作排序(sort)操做,以便對連續的word作判斷
  19.  
    if current_word == word:
  20.  
    current_count +=count
  21.  
    else:
  22.  
    if current_word:
  23.  
    #輸出當前word統計結果到標準輸出
  24.  
    print( '%s\t%s' % (current_word,current_count))
  25.  
    current_count =count
  26.  
    current_word =word
  27.  
     
  28.  
    #輸出最後一個word統計
  29.  
    if current_word ==word:
  30.  
    print( '%s\t%s' % (current_word,current_count))
(2)對代碼進行檢驗

 

echo "aa aa bb cc dd dd" | python mapper.py | python reducer.py

三、確保已經搭建好徹底分佈式的hadoop環境,在HDFS中新建文件夾

bin/hdfs dfs -mkdir /temp/

bin/hdfs dfs -mkdir /temp/hdin

四、將hadoop目錄中的LICENSE.txt文件上傳到HDFS中

bin/hdfs dfs -copyFromLocal LICENSE.txt /temp/hdin

五、執行文件work.sh

(1)代碼

 

  1.  
    #!/bin/bash  
  2.  
    #mapper函數和reducer函數文件地址
  3.  
    export CURRENT=/usr/local/working
  4.  
    #先刪除輸出目錄  
  5.  
    $HADOOP_HOME/bin/hdfs dfs -rm -r /temp/hdout   
  6.  
    $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar \
  7.  
    -input "/temp/hdin/*" \
  8.  
    -output "/temp/hdout" \
  9.  
    -mapper "python mapper.py" \
  10.  
    -reducer "python reducer.py" \
  11.  
    -file "$CURRENT/mapper.py" \
  12.  
    -file "$CURRENT/reducer.py"
(2)執行代碼

 

sh work.sh

六、查看結果

bin/hdfs dfs -cat /temp/hdout/*

 

  1.  
    "AS 16
  2.  
    "COPYRIGHTS 1
  3.  
    "Contribution" 2
  4.  
    "Contributor" 2
  5.  
    "Derivative 1
  6.  
    "Legal 1
  7.  
    "License" 1
  8.  
    "License"); 1
  9.  
    "Licensed 1
  10.  
    "Licensor" 1
  11.  
    "Losses") 1
  12.  
    "NOTICE" 1
  13.  
    "Not 1
  14.  
    ...
  15.  

 
 
 














Python結合Shell/Hadoop實現MapReduce

 

基本流程爲: 

cat data | map | sort | reduce

cat devProbe | ./mapper.py | sort| ./reducer.py

echo "foo foo quux labs foo bar quux" | ./mapper.py | sort -k1,1 | ./reducer.py

# -k, -key=POS1[,POS2]     鍵以pos1開始,以pos2結束

 

如不執行下述命令,能夠再py文件前加上python調用

chmod +x mapper.py
chmod +x reducer.py

 

對於分佈式環境下,可使用如下命令:

hadoop jar /[YOUR_PATH]/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.4.4.jar \
 -file mapper.py -mapper mapper.py \
 -file reducer.py -reducer reducer.py \
 -input [IN_FILE]    -output [OUT_DIR]

 

 

mapper.py

複製代碼
#!/usr/bin/python
# -*- coding: UTF-8 -*-

__author__ = 'Manhua'

import sys
for line in sys.stdin:
    line = line.strip()
    item = line.split('`')
    print "%s\t%s" % (item[0]+'`'+item[1], 1)
複製代碼

 

reducer.py

複製代碼
#!/usr/bin/python
# -*- coding: UTF-8 -*-

__author__ = 'Manhua'


import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    try:
        count = int(count)
    except ValueError:  #count若是不是數字的話,直接忽略掉
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print "%s\t%s" % (current_word, current_count)
        current_count = count
        current_word = word

if word == current_word:  #不要忘記最後的輸出
    print "%s\t%s" % (current_word, current_count)
複製代碼

 

 

 

 

其它:

Python+Hadoop Streaming實現MapReduce任務:https://blog.csdn.net/czl389/article/details/77247534

用Python編寫MapReduce代碼與調用-某一天以前的全部活躍用戶統計https://blog.csdn.net/babyfish13/article/details/53841990

Python 實踐之 400 行 Python 寫一個類 Hadoop 的 MapReduce 框架:https://www.v2ex.com/t/149803,https://github.com/xiaojiaqi/py_hadoop

http://xiaorui.cc/2014/11/14/python使用mrjob實現hadoop上的mapreduce/ 








MapReduce實現兩表的Join--原理及python和java代碼實現

版權聲明:本文爲博主原創文章,未經博主容許不得轉載。 https://blog.csdn.net/yimingsilence/article/details/70242604

用Hive一句話搞定的,可是有時必需要用mapreduce

1. 概述

在傳統數據庫(如:MYSQL)中,JOIN操做是很是常見且很是耗時的。而在HADOOP中進行JOIN操做,一樣常見且耗時,因爲Hadoop的獨特設計思想,當進行JOIN操做時,有一些特殊的技巧。
本文首先介紹了Hadoop上一般的JOIN實現方法,而後給出了幾種針對不一樣輸入數據集的優化方法。

2. 常見的join方法介紹

假設要進行join的數據分別來自File1和File2.

2.1 reduce side join

reduce side join是一種最簡單的join方式,其主要思想以下:
在map階段,map函數同時讀取兩個文件File1和File2,爲了區分兩種來源的key/value數據對,對每條數據打一個標籤(tag),好比:tag=0表示來自文件File1,tag=2表示來自文件File2。即:map階段的主要任務是對不一樣文件中的數據打標籤。
在reduce階段,reduce函數獲取key相同的來自File1和File2文件的value list, 而後對於同一個key,對File1和File2中的數據進行join(笛卡爾乘積)。即:reduce階段進行實際的鏈接操做。

2.2 map side join

之因此存在reduce side join,是由於在map階段不能獲取全部須要的join字段,即:同一個key對應的字段可能位於不一樣map中。Reduce side join是很是低效的,由於shuffle階段要進行大量的數據傳輸。
Map side join是針對如下場景進行的優化:兩個待鏈接表中,有一個表很是大,而另外一個表很是小,以致於小表能夠直接存放到內存中。這樣,咱們能夠將小表複製多份,讓每一個map task內存中存在一份(好比存放到hash table中),而後只掃描大表:對於大表中的每一條記錄key/value,在hash table中查找是否有相同的key的記錄,若是有,則鏈接後輸出便可。
爲了支持文件的複製,Hadoop提供了一個類DistributedCache,使用該類的方法以下:
(1)用戶使用靜態方法DistributedCache.addCacheFile()指定要複製的文件,它的參數是文件的URI(若是是HDFS上的文件,能夠這樣:hdfs://namenode:9000/home/XXX/file,其中9000是本身配置的NameNode端口號)。JobTracker在做業啓動以前會獲取這個URI列表,並將相應的文件拷貝到各個TaskTracker的本地磁盤上。(2)用戶使用DistributedCache.getLocalCacheFiles()方法獲取文件目錄,並使用標準的文件讀寫API讀取相應的文件。

2.3 SemiJoin

SemiJoin,也叫半鏈接,是從分佈式數據庫中借鑑過來的方法。它的產生動機是:對於reduce side join,跨機器的數據傳輸量很是大,這成了join操做的一個瓶頸,若是可以在map端過濾掉不會參加join操做的數據,則能夠大大節省網絡IO。
實現方法很簡單:選取一個小表,假設是File1,將其參與join的key抽取出來,保存到文件File3中,File3文件通常很小,能夠放到內存中。在map階段,使用DistributedCache將File3複製到各個TaskTracker上,而後將File2中不在File3中的key對應的記錄過濾掉,剩下的reduce階段的工做與reduce side join相同。
更多關於半鏈接的介紹,可參考:半鏈接介紹:http://wenku.baidu.com/view/ae7442db7f1922791688e877.html

2.4 reduce side join + BloomFilter

在某些狀況下,SemiJoin抽取出來的小表的key集合在內存中仍然存放不下,這時候可使用BloomFiler以節省空間。
BloomFilter最多見的做用是:判斷某個元素是否在一個集合裏面。它最重要的兩個方法是:add() 和contains()。最大的特色是不會存在false negative,即:若是contains()返回false,則該元素必定不在集合中,但會存在必定的true negative,即:若是contains()返回true,則該元素可能在集合中。
於是可將小表中的key保存到BloomFilter中,在map階段過濾大表,可能有一些不在小表中的記錄沒有過濾掉(可是在小表中的記錄必定不會過濾掉),這不要緊,只不過增長了少許的網絡IO而已。
更多關於BloomFilter的介紹,可參考:http://blog.csdn.net/jiaomeng/article/details/1495500

3. 二次排序

在Hadoop中,默認狀況下是按照key進行排序,若是要按照value進行排序怎麼辦?即:對於同一個key,reduce函數接收到的value list是按照value排序的。這種應用需求在join操做中很常見,好比,但願相同的key中,小表對應的value排在前面。
有兩種方法進行二次排序,分別爲:buffer and in memory sort和 value-to-key conversion。
對於buffer and in memory sort,主要思想是:在reduce()函數中,將某個key對應的全部value保存下來,而後進行排序。 這種方法最大的缺點是:可能會形成out of memory。
對於value-to-key conversion,主要思想是:將key和部分value拼接成一個組合key(實現WritableComparable接口或者調用setSortComparatorClass函數),這樣reduce獲取的結果即是先按key排序,後按value排序的結果,須要注意的是,用戶須要本身實現Paritioner,以便只按照key進行數據劃分。Hadoop顯式的支持二次排序,在Configuration類中有個setGroupingComparatorClass()方法,可用於設置排序group的key值,

 

reduce-side-join python代碼 

hadoop有個工具叫作steaming,可以支持python、shell、C++、PHP等其餘任何支持標準輸入stdin及標準輸出stdout的語言,其運行原理能夠經過和標準java的map-reduce程序對比來講明:

 

使用原生java語言實現Map-reduce程序
  1. hadoop準備好數據後,將數據傳送給java的map程序
  2. java的map程序將數據處理後,輸出O1
  3. hadoop將O1打散、排序,而後傳給不一樣的reduce機器
相關文章
相關標籤/搜索