Python3調用Hadoop的API

前言:

上一篇文章 我學習使用pandas進行簡單的數據分析,可是各位...... Pandas處理、分析不了TB級別數據的大數據,因而再看看Hadoop。html

 另附上人心不足蛇吞象 對故事一的感悟:java

 人的慾望是無止境的,咱們每次欲求一點,慾望便增加一點。但身體是有極限的,總有一天,咱們由於渴求太多,最終全部都化爲飛灰。 

 

Hadoop背景

我接觸過的數據總結爲3類:node

1.結構化數據python

關係數據中的數據,有字段進行約束;(有規則)linux

2.半結構化數據git

HTMLXml/Json....這種數據雖然有結構,但約束不是很嚴格;(還有些規則可言)程序員

3.非結構化數據web

.text文本/日誌....這種數據沒有head、body、key這些標籤標記,更沒有什麼字段約束;(沒有規則可言)apache

4.如何儲存海量的非結構化數據?編程

那麼問題來了咱們如何把大量的非結構化/半結構化的數據儲存起來,進行高效得 分析、檢索呢?

Google公司經過論文方式 提出了的解決方案;(沒告訴咋實現哦!)

 

1.如何完成海量數據安全儲存?

把海量數據分佈式存儲,不一樣得服務器集羣節點;(分佈式:之後數據越大也不怕了,能夠動態擴展服務器來分解。)

2.如何對海量數據高效分析、檢索?

MapReduce:編程思想 Simplified Data Processing on Large Clusters  

把一個某個複雜的計算任務 --------》分割成小的任務單元----------》並行在各個節點上運行

蒐集各個節點上運行結果---------》合併運行--------》二次map------>二次reduce........》直到 計算出結果位置;

5.什麼是Hadoop?

有一個大神級程序員 Dong Cutting,受Google以上三篇論文的啓發,用Java開發出來Hadoop,

6.python怎麼調用Hadoop?

 

hadoop的MapReduce這麼厲害,做爲python小白我怎麼調用它呢?Hadoop的調用API也叫MapReduce

 

 

1、Hadoop v2 架構圖

 

2、Hadoop的運行模型

HDFS集羣:  data_node 數據存儲節點  name_node 名稱節點 、secondary_node輔助名稱節點

YARN:集羣資源管理

 

3、centos7安裝Hadoop2.6.3

1.環境準備 

centos7中通常已經自帶JDK

[root@localhost zhanggen]# java -version 
openjdk version "1.8.0_102"
OpenJDK Runtime Environment (build 1.8.0_102-b14)
OpenJDK 64-Bit Server VM (build 25.102-b14, mixed mode)
[root@localhost profile.d]# yum -y install java-1.8.0-openjdk* 

 

Centos7關閉防火牆

查看狀態: systemctl status firewalld 
開機禁用  : systemctl disable firewalld
開機啓用  : systemctl enable firewalld

 

Centos7 關閉selinux服務

[root@localhost hdfs]# setenforce 1
[root@localhost hdfs]# getenforce 
Enforcing
[root@localhost hdfs]# setenforce 0
[root@localhost hdfs]# getenforce 
Permissive

 

 

2.下載源碼包

 

3.編譯

[root@localhost bdapps]# mkdir /bdapps/
[root@localhost bdapps]# ls
hadoop-2.6.2
[root@localhost bdapps]# tar -zxvf /home/zhanggen/Desktop/hadoop-2.6.2.tar.gz -C /bdapps/
[root@localhost bdapps]# ln -sv /bdapps/hadoop-2.6.2 /bdapps/hadoop
‘/bdapps/hadoop’ -> ‘/bdapps/hadoop-2.6.2

 

4.設置Java和Hadoop相關環境變量

export HADOOP_PREFIX=/bdapps/hadoop

export PATH=$PATH:${HADOOP_PREFIX}/bin:${HADOOP_PREFIX}/sbin

export HADOOP_YARN_HOME=${HADOOP_PREFIX}

export HADOOP_MAPPRED_HOME=${HADOOP_PREFIX}

export HADOOP_COMMON_HOME=${HADOOP_PREFIX}

export HADOOP_HDFS_HOME=${HADOOP_PREFIX}
source /etc/profile.d/hadoop.sh
export java_HOME=/usr
source /etc/profile.d/java.sh

 

groupadd hadoop
useradd -g hadoop hadoop
5.建立Hadoop用戶

 

mkdir -pv /data/hadoop/hdfs/{nn,dn,snn}
chown -R hadoop:hadoop /data/hadoop/hdfs/
6.建立存儲DataNode和 NameNode存儲數據的目錄

 

cd /bdapps/hadoop/
mkdir logs
chown -R hadoop:hadoop ./*
7.建立Hadoop存儲日誌的目錄

 

PS:若是你的MapReduce任務執行失敗了,去獲取applicationId查看報錯信息

 yarn  logs  -applicationId application_1551852706740_0001  #查看任務執行日誌

征服Hadoop的奧祕:首先取到程序運行日誌-------》再分析日誌中出現錯誤的緣由------》解決問題

 

8.Hadoop主要配置文件(/bdapps/hadoop/etc/hadoop)

8.0.core-site.xml

針對NameNode  IP地址 、端口(默認爲8020)

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://0.0.0.0:8020</value>
        <final>false</final>
    </property>
</configuration>
core-site.xml

 

8.1.hdfs-site.xml

針對HDFS相關的屬性,每個數據塊的副本數量、NN和DA存儲數據的目錄 step6中建立的目錄。

<configuration>
   <property>
        <name>dfs.http.address</name>
        <value>0.0.0.0:50070</value>
    </property>
    <property>
        <name>dfs.datanode.http.address</name>
        <value>0.0.0.0:50075</value>
    </property>

    <property>
        <name>dfs.replication</name>
        <value>3</value>
    </property>
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>file:///data/hadoop/hdfs/nn</value>
    </property>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>file:///data/hadoop/hdfs/dn</value>
    </property>
    <property>
        <name>fs.checkpoint.dir</name>
        <value>file:///data/hadoop/hdfs/snn</value>
    </property>
    <property>
        <name>fs.checkpoint.edits.dir</name>
        <value>file:///data/hadoop/hdfs/snn</value>
    </property>
</configuration>
hdfs-site.xml

 

8.2.mapred-site.xml(指定使用yarn)

指定MapReduce是單獨運行 仍是運行在yarn之上,Hadoop2確定是運行在yarn之上的;見 2、Hadoop的運行模型

<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>
mapred-site.xml

8.3.yarn-site.xml

yarn-site.xml 用於配置YARN進程及YARN的相關屬性,首先須要指定ResourceManager守護進程的主機和監聽的端口,對於僞分佈式模型來說,其主機爲localhost,

默認的端口爲8032;其次須要指定ResourceManager使用的scheduler,以及NodeManager的輔助服務。一個簡要的配置示例以下所示:

<configuration>
    <property>
        <name>yarn.resourcemanager.address</name>
        <value>0.0.0.0:8032</value>
    </property>
    <property>
        <name>yarn.resourcemanager.scheduler.address</name>
        <value>0.0.0.0:8030</value>
    </property>
    <property>
        <name>yarn.resourcemanager.resource-tracker.address</name>
        <value>0.0.0.0:8031</value>
    </property>
    <property>
        <name>yarn.resourcemanager.admin.address</name>
        <value>0.0.0.0:8033</value>
    </property>
    <property>
        <name>yarn.resourcemanager.webapp.address</name>
        <value>0.0.0.0:8088</value>
    </property>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <name>yarn.nodemanager.auxservices.mapreduce_shuffle.class</name>
        <value>org.apache.hadoop.mapred.ShuffleHandler</value>
    </property>
    <property>
        <name>yarn.resourcemanager.scheduler.class</name>
     <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
    </property>
</configuration>
vim yarn-site.xml

 

8.4.slave文件

slave文件存儲了當前集羣全部slave節點的列表,對於僞分佈式模型,其文件內容僅應該爲localhost,這特的確是這個文件的默認值。所以,爲分佈式模型中,次文件的內容保持默認便可。 

PS:

若是服務器/虛擬機的進程起不來請確保本地 IP和配置文件裏面的IP是否已經發生變化!

 

8.5.格式化HDFS

在HDFS的NN啓動以前須要先初始化其用於存儲數據的目錄。

若是hdfs-site.xml中dfs.namenode.name.dir屬性指定的目錄不存在,格式化命令會自動建立之;

若是事先存在,請確保其權限設置正確,此時格式操做會清除其內部的全部數據並從新創建一個新的文件系統,須要以hdfs用戶的身份執行以下命令

19/03/01 11:31:22 INFO namenode.FSImage: Allocated new BlockPoolId: BP-1276811871-127.0.0.1-1551411082356
19/03/01 11:31:22 INFO common.Storage: Storage directory /data/hadoop/hdfs/nn has been successfully formatted.
19/03/01 11:31:22 INFO namenode.NNStorageRetentionManager: Going to retain 1 images with txid >= 0
19/03/01 11:31:22 INFO util.ExitUtil: Exiting with status 0
19/03/01 11:31:22 INFO namenode.NameNode: SHUTDOWN_MSG: 
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at localhost/127.0.0.1
************************************************************/
[hdfs@localhost hadoop]$ hdfs namenode -format

 

9.啓動Hadoop

HDFS格式化完成以後就能夠啓動  去/bdapps/hadoop/etc/hadoop目錄下啓動Hadoop的5大守護進程了。

 

9.1.啓動HDFS集羣

HDFS有3個守護進程:namenode、datanode和secondarynamenode,他們都表示經過hadoop-daemon.sh腳本啓動或中止。以hadoop用戶執行相關命令;

hadoop-daemon.sh start namenode
hadoop-daemon.sh start secondarynamenode
hadoop-daemon.sh start datanode

jps #jps命令:專門用於查看當前運行的java程序的,還支持遠程,python有嗎?
61392 NameNode
61602 Jps
61480 SecondaryNameNode
61532 DataNode
啓動HDFS集羣

 HDFS集羣web訪問接口:

http://127.0.0.1:50070/dfshealth.html#tab-overview

9.2啓動yarn集羣

切換成yarn用戶:YARN有2個守護進程:resourcemanager和nodemanager,它們經過yarn-daemon.sh本啓動或者中止。以hadoop用戶執行相關命令便可。

yarn-daemon.sh start resourcemanager
yarn-daemon.sh start nodemanager
jps
61803 ResourceManager
62043 NodeManager
62142 Jps
啓動yarn集羣

 yarn集羣web訪問接口:

http://127.0.0.1:8088/cluster

 

10.測試

使用Hadoop自帶的 hadoop-mapreduce-examples-2.6.2.jar,執行MapReduce任務是否能夠正常執行,若是能夠就意味着安裝成功了。

在執行任務是要切換到hdfs用戶下

[hdfs@localhost mapreduce]$ yarn jar hadoop-mapreduce-examples-2.6.2.jar wordcount /test/a.txt /test/a.out
執行任務
[hdfs@localhost mapreduce]$ hdfs dfs -ls /test/a.out
Found 2 items
-rw-r--r--   1 hdfs supergroup          0 2019-03-01 14:09 /test/a.out/_SUCCESS
-rw-r--r--   1 hdfs supergroup         54 2019-03-01 14:09 /test/a.out/part-r-00000
[hdfs@localhost mapreduce]$ hdfs dfs -cat /test/a.out/part-r-00000
aaaaaaaaa    1
aaaaaaaaaaaaaaaaa    1
aaaaaaaaaaaaaaaaaaa    1
查看任務執行結果

 

十一、python3調用HDFS集羣API 

Hadoop安裝好了;(雖然說是僞分佈式的,若是要作分佈式作好ssh免密碼登陸,把配置文件分發出去就行了)

可是我在網上看到python的pyhdfs模塊能夠調用HDFS集羣的API進行上傳、下載、查找....文件...因而儲備下來了,也許能夠用做後期 Hadoop自動化項目;

注意:在使用pyhdfs模塊以前必定要確保Hadoop的配置文件都監聽在外網端口並修改host文件。

192.168.226.142 localhost  #windows hosts文件的路徑  C:\WINDOWS\system32\drivers\etc\host  Linux /etc/host
pip install pyhdfs -i http://pypi.douban.com/simple --trusted-host pypi.douban.com
import pyhdfs
fs = pyhdfs.HdfsClient(hosts='192.168.226.142,50070',user_name='hdfs')
fs.get_home_directory()#返回這個用戶的根目錄
fs.get_active_namenode()#返回可用的namenode節點

path='/zhanggen/'
file='myfile.txt'
file_name=path+file
#在上傳文件以前,請修改本地 host文件 192.168.226.142 localhost C:\WINDOWS\system32\drivers\etc\host
print('路徑已經存在') if fs.exists(path) else fs.mkdirs(path)
print('文件已存在') if fs.exists(path+file) else fs.copy_from_local('c.txt',path+file,) #上傳本地文件到HDFS集羣
fs.copy_to_local(path+file, 'zhanggen.txt')# 從HDFS集羣上copy 文件到本地
fs.listdir(path) #以列表形式['a.out', 'a.txt'],返回指定目錄下的全部文件
response=fs.open(path+file) #查看文件內容
print(response.read())

fs.append(file_name,'Thanks myself for fighting ',) #在HDFS集羣的文件裏面添加內容
response=fs.open(file_name) #查看文件內容
print(response.read())
print(fs.get_file_checksum(file_name)) #查看文件大小
print(fs.list_status(path))#查看單個路徑的狀態
print(fs.list_status(file_name))#查看單個文件狀態
使用pyhdfs模塊調用HDFS集羣API

 

 

4、Python3調用Hadoop MapReduce API

 pip3 install mrjob -i http://pypi.douban.com/simple --trusted-host pypi.douban.com
hadoop fs -chown -R hadoop:hadoop  /tmp    #在執行MapReduce任務的時候hadoop用戶會建立socket,經過jdbc訪問。因此在執行你寫得MapReduce以前必定要設置權限

MapReduce 任務工做流程(假設要對如下3行數據,統計詞頻):

a b c
a c
a

 

第1步:map 把每一個字符串映射成鍵、值對

(a,1)(b,1)(c,1)

(a,1)(c1)

(a1)

 

自動shuffle & sort

shuffle: 把相同鍵的  值組合成1個的列表,(洗牌:平時玩牌的時候 把手裏數字/字母相同的撲克牌們,碼放在一塊兒例如3A一塊兒,兩個2一塊兒!)

sort:     再根據鍵排序;

(a,[1,1,1])

(b,[1])

(c,[1,1])

 

第2步:shuffle  and sort以後,把鍵相同的值放到列表了,就方便reduce的時候對值進行計算、聚合操做(sum,mean,max)了!

(a,3) 

(b,1)

(c,2)

#!/usr/bin/python
# -*- coding: utf-8 -*-
from mrjob.job import MRJob
import re


class MRwordCount(MRJob):
    '''
        line:一行數據
        (a,1)(b,1)(c,1)
        (a,1)(c1)
        (a1)
       '''
    def mapper(self, _, line):
        pattern=re.compile(r'(\W+)')
        for word in re.split(pattern=pattern,string=line):
            if word.isalpha():
                yield (word.lower(),1)


    def reducer(self, word, count):
        #shuff and sort 以後
        '''
        (a,[1,1,1])
        (b,[1])
        (c,[1])
        '''
        l=list(count)
        yield (word,sum(l))

if __name__ == '__main__':
    MRwordCount.run() #run()方法,開始執行MapReduce任務。
python3版wordCount
python /MyMapReduce.py /a.txt -r hadoop  #在Hadoop集羣,執行Python的MapReduce任務。
[hdfs@localhost hadoop]$ python /MyMapReduce.py /a.txt -r hadoop 
No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /bdapps/hadoop/bin...
Found hadoop binary: /bdapps/hadoop/bin/hadoop
Using Hadoop version 2.6.2
Looking for Hadoop streaming jar in /bdapps/hadoop...
Found Hadoop streaming jar: /bdapps/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.2.jar
Creating temp directory /tmp/MyMapReduce.hdfs.20190304.084739.219477
Copying local files to hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477/files/...
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar1053011439569578237/] [] /tmp/streamjob2611643769127644921.jar tmpDir=null
  Connecting to ResourceManager at /192.168.226.142:8032
  Connecting to ResourceManager at /192.168.226.142:8032
  Total input paths to process : 1
  number of splits:2
  Submitting tokens for job: job_1551427459997_0003
  Submitted application application_1551427459997_0003
  The url to track the job: http://192.168.226.142:8088/proxy/application_1551427459997_0003/
  Running job: job_1551427459997_0003
  Job job_1551427459997_0003 running in uber mode : false
   map 0% reduce 0%
   map 50% reduce 0%
   map 100% reduce 0%
   map 100% reduce 100%
  Job job_1551427459997_0003 completed successfully
  Output directory: hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477/output
Counters: 49
    File Input Format Counters 
        Bytes Read=18
    File Output Format Counters 
        Bytes Written=18
    File System Counters
        FILE: Number of bytes read=54
        FILE: Number of bytes written=331118
        FILE: Number of large read operations=0
        FILE: Number of read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=320
        HDFS: Number of bytes written=18
        HDFS: Number of large read operations=0
        HDFS: Number of read operations=9
        HDFS: Number of write operations=2
    Job Counters 
        Data-local map tasks=2
        Launched map tasks=2
        Launched reduce tasks=1
        Total megabyte-seconds taken by all map tasks=20077568
        Total megabyte-seconds taken by all reduce tasks=5390336
        Total time spent by all map tasks (ms)=19607
        Total time spent by all maps in occupied slots (ms)=19607
        Total time spent by all reduce tasks (ms)=5264
        Total time spent by all reduces in occupied slots (ms)=5264
        Total vcore-seconds taken by all map tasks=19607
        Total vcore-seconds taken by all reduce tasks=5264
    Map-Reduce Framework
        CPU time spent (ms)=1990
        Combine input records=0
        Combine output records=0
        Failed Shuffles=0
        GC time elapsed (ms)=352
        Input split bytes=302
        Map input records=3
        Map output bytes=36
        Map output materialized bytes=60
        Map output records=6
        Merged Map outputs=2
        Physical memory (bytes) snapshot=501116928
        Reduce input groups=3
        Reduce input records=6
        Reduce output records=3
        Reduce shuffle bytes=60
        Shuffled Maps =2
        Spilled Records=12
        Total committed heap usage (bytes)=319430656
        Virtual memory (bytes) snapshot=6355677184
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
job output is in hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477/output
Streaming final output from hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477/output...
"a"    3
"b"    1
"c"    2
Removing HDFS temp directory hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477...
Removing temp directory /tmp/MyMapReduce.hdfs.20190304.084739.219477...
[hdfs@localhost hadoop]$ 
執行結果

 

1.MapReduce案例

統計一下本週的報警狀況

因爲遺留了Zabbix報警未分類的問題,致使zabbix報警-----》轉換到運維平臺的工單信息---------》都是一個text字段!

#!/usr/bin/python
# -*- coding: utf-8 -*-
from mrjob.job import MRJob
import re,csv

key_list=['Free disk space','Zabbix agent','Alive ecerpdb.com','Oracle','FTP service','No data received from Orabbix','Alive ecpim']

class MRwordCount(MRJob):
    def mapper(self, _, line):              #文本有幾行mapper 就執行幾回
        row = csv.reader([line]).__next__() #讀取CSV文件的每一行,變成列表形式!
        for key in key_list:
            if key in row[-1]:
                yield (key,1)
                                            #自動shuffle & reduce
    def reducer(self, word, count):         #maper yeild 幾個key ,reducer就執行幾回
        l=list(count)
        yield (word,sum(l))

if __name__ == '__main__':
    MRwordCount.run()                       #run()方法,開始執行MapReduce任務。
本週報警信息
#!/usr/bin/python
# -*- coding:utf-8 -*-
from mrjob.job import MRJob
import re,csv,sys

class University_top10(MRJob):
    def mapper(self, _,line):
        row = csv.reader([line]).__next__()  # 讀取CSV文件的每一行,變成列表形式!
        if not row[0].isdigit():#跳過['名次', '學校名稱', '總分', '類型', '所在省份', '所在城市', '辦學方向', '主管部門']
            return
        yield ('top',(float(row[2]),row[1])) #學校名稱,總分

    def reducer(self, top_key,score_and_university_name):
        top10=[]
        for key in  list(score_and_university_name):
            top10.append(key)
            top10.sort()
            top10=top10[-10:]
        top10.reverse()
        for key in top10:
            yield  key[1],key[0]

if __name__ == '__main__':
    University_top10.run()
中國大學top10
#!/usr/bin/python
# -*- coding:utf-8 -*-
from mrjob.job import MRJob
import re,csv,sys

class University_top10(MRJob):
    def mapper(self, _,line):
        row = csv.reader([line]).__next__()            # 讀取CSV文件的每一行,變成列表形式!
        if not row[0].isdigit():                      #跳過['名次', '學校名稱', '總分', '類型', '所在省份', '所在城市', '辦學方向', '主管部門','人均消費']
            return
        yield ('top',(float(row[2]),row[1]))         #yield('top',學校名稱,總分)
        if row[-1].isdigit():
            yield ('cost',(float(row[-1]),row[1]))   #yeild('coast',學校名稱,人均消費)

    def reducer(self,key,value):
                                                      #因爲mapper方法yeild了2個key【top和coast】,因此reducer方法執行2次
        top10=[]
        for list_item in list(value):
            top10.append(list_item)
            top10.sort()
            top10=top10[-10:]
        top10.reverse() if key=='top'else top10.sort()#求出得分前十的大學,和消費前十的大學
        for list_item in top10:
            yield  list_item[1],list_item[0]

if __name__ == '__main__':
    University_top10.run()
求出得分前十的大學,和低消費 前十的大學
#!/usr/bin/python
# -*- coding:utf-8 -*-
from mrjob.job import MRJob

class Max_Mix_Temperature(MRJob):
    def mapper(self, _,line):
        row=line.split(',')
        if row[2]== 'min':
            yield 'min',(float(row[3]),row[1])
        if row[2]=='max':
            yield 'max',(float(row[3]),row[1])

    def reducer(self,key,value):  
        l=list(value)
        if key=='max':
            yield key,max(l)
        elif key=='min':
            yield key,min(l)
            
if __name__ == '__main__':
    Max_Mix_Temperature.run()
求唐縣各鄉/鎮出現的最低、最高溫度
 1 # #!/usr/bin/python
 2 # # -*- coding:utf-8 -*-
 3 from mrjob.job import MRJob,MRStep
 4 
 5 class Top3_Mean_Friends(MRJob):
 6     def mapper1(self, _,line):
 7         row=line.split(',')
 8         if row[2].isdigit() and row[3].isdigit():
 9             yield (row[2],int(row[3]))                      #返回年齡 和朋友個數
10 
11 
12     def reducer1(self,age,friends):
13         friends_count=list(friends)
14         yield (age, sum(friends_count)/len(friends_count))  #每一個年齡段的 平均朋友個數
15 
16     def mapper2(self, age,average_coun):
17         yield (None,(average_coun,str(age)+'year'))
18 
19     def reducer2(self, _,average_list):                     #在平均朋友個數的基礎上,求出朋友數數量最大的top3
20         l=list(average_list)
21         l.sort()
22         top3=l[-3:]
23         top3.reverse()
24         for i in top3:
25             yield (i[0],i[1])
26 
27     def steps(self):                                        #鏈接多個mapper、reducer
28         return [
29             MRStep(mapper=self.mapper1,reducer=self.reducer1),
30             MRStep(mapper=self.mapper2,reducer=self.reducer2)
31         ]
32 
33 if __name__ == '__main__':
34     Top3_Mean_Friends.run()
MRStep鏈接多個mapper、reducer函數
from mrjob.job import  MRJob,MRStep

class Top_AnnualSalary_Job(MRJob): #ID,Name,JobTitle,AnnualSalary,GrossSpend
    def mapper1(self, _,line):
        row=line.split(',')
        if row[0]=='ID':
            return
        yield (row[2],int(row[3]))

    def reducer1(self,jobtitle,annualsalary):
        AnnualSalary=list(annualsalary)
        yield ('job_annualsalary',(sum(AnnualSalary)/len(AnnualSalary),jobtitle))

    def mapper2(self,key,job_annualsalary):
        yield key,job_annualsalary

    def reducer2(self, key, values):
        l=list(values)
        print('old',l)
        new_l=[]
        for i in l:
            new_l.append(i)
            new_l.sort(reverse=True)
            new_l=new_l[0:3]
        for k in new_l:
            yield k[1],k[0]

    def steps(self):  # 鏈接多個mapper、reducer
        return [ MRStep(mapper=self.mapper1, reducer=self.reducer1),MRStep(mapper=self.mapper2, reducer=self.reducer2)]


if __name__ == '__main__':
    Top_AnnualSalary_Job.run()
求收入top3的行業

 

2.map + combine +reduce

map節點把全部集合計算的工做經過TCP協議傳輸到reduce節點會出現 單點負載壓力的問題,因此combine出現了;

combine就是小的reduce,能夠在map發送數據給reduce以前進行在map節點作初步的聚合運算,減少reduce節點的壓力, 加速MapReduce任務的執行;

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Hadoop安裝參考

Hadoop啓動參見報錯

Python管理HDFS數據

相關文章
相關標籤/搜索