上一篇文章 我學習使用pandas進行簡單的數據分析,可是各位...... Pandas處理、分析不了TB級別數據的大數據,因而再看看Hadoop。html
另附上人心不足蛇吞象 對故事一的感悟:java
我接觸過的數據總結爲3類:node
1.結構化數據python
關係數據中的數據,有字段進行約束;(有規則)linux
2.半結構化數據git
HTMLXml/Json....這種數據雖然有結構,但約束不是很嚴格;(還有些規則可言)程序員
3.非結構化數據web
.text文本/日誌....這種數據沒有head、body、key這些標籤標記,更沒有什麼字段約束;(沒有規則可言)apache
4.如何儲存海量的非結構化數據?編程
那麼問題來了咱們如何把大量的非結構化/半結構化的數據儲存起來,進行高效得 分析、檢索呢?
Google公司經過論文方式 提出了的解決方案;(沒告訴咋實現哦!)
1.如何完成海量數據安全儲存?
把海量數據分佈式存儲,不一樣得服務器集羣節點;(分佈式:之後數據越大也不怕了,能夠動態擴展服務器來分解。)
2.如何對海量數據高效分析、檢索?
MapReduce:編程思想 Simplified Data Processing on Large Clusters
把一個某個複雜的計算任務 --------》分割成小的任務單元----------》並行在各個節點上運行
蒐集各個節點上運行結果---------》合併運行--------》二次map------>二次reduce........》直到 計算出結果位置;
5.什麼是Hadoop?
有一個大神級程序員 Dong Cutting,受Google以上三篇論文的啓發,用Java開發出來Hadoop,
6.python怎麼調用Hadoop?
hadoop的MapReduce這麼厲害,做爲python小白我怎麼調用它呢?Hadoop的調用API也叫MapReduce
HDFS集羣: data_node 數據存儲節點 name_node 名稱節點 、secondary_node輔助名稱節點
YARN:集羣資源管理
1.環境準備
centos7中通常已經自帶JDK
[root@localhost zhanggen]# java -version openjdk version "1.8.0_102" OpenJDK Runtime Environment (build 1.8.0_102-b14) OpenJDK 64-Bit Server VM (build 25.102-b14, mixed mode)
[root@localhost profile.d]# yum -y install java-1.8.0-openjdk*
Centos7關閉防火牆
查看狀態: systemctl status firewalld
開機禁用 : systemctl disable firewalld
開機啓用 : systemctl enable firewalld
Centos7 關閉selinux服務
[root@localhost hdfs]# setenforce 1 [root@localhost hdfs]# getenforce Enforcing [root@localhost hdfs]# setenforce 0 [root@localhost hdfs]# getenforce Permissive
3.編譯
[root@localhost bdapps]# mkdir /bdapps/ [root@localhost bdapps]# ls hadoop-2.6.2 [root@localhost bdapps]# tar -zxvf /home/zhanggen/Desktop/hadoop-2.6.2.tar.gz -C /bdapps/
[root@localhost bdapps]# ln -sv /bdapps/hadoop-2.6.2 /bdapps/hadoop ‘/bdapps/hadoop’ -> ‘/bdapps/hadoop-2.6.2’
4.設置Java和Hadoop相關環境變量
export HADOOP_PREFIX=/bdapps/hadoop export PATH=$PATH:${HADOOP_PREFIX}/bin:${HADOOP_PREFIX}/sbin export HADOOP_YARN_HOME=${HADOOP_PREFIX} export HADOOP_MAPPRED_HOME=${HADOOP_PREFIX} export HADOOP_COMMON_HOME=${HADOOP_PREFIX} export HADOOP_HDFS_HOME=${HADOOP_PREFIX}
export java_HOME=/usr
groupadd hadoop
useradd -g hadoop hadoop
mkdir -pv /data/hadoop/hdfs/{nn,dn,snn} chown -R hadoop:hadoop /data/hadoop/hdfs/
cd /bdapps/hadoop/ mkdir logs chown -R hadoop:hadoop ./*
PS:若是你的MapReduce任務執行失敗了,去獲取applicationId查看報錯信息
yarn logs -applicationId application_1551852706740_0001 #查看任務執行日誌
征服Hadoop的奧祕:首先取到程序運行日誌-------》再分析日誌中出現錯誤的緣由------》解決問題
8.Hadoop主要配置文件(/bdapps/hadoop/etc/hadoop)
8.0.core-site.xml
針對NameNode IP地址 、端口(默認爲8020)
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://0.0.0.0:8020</value> <final>false</final> </property> </configuration>
8.1.hdfs-site.xml
針對HDFS相關的屬性,每個數據塊的副本數量、NN和DA存儲數據的目錄 step6中建立的目錄。
<configuration> <property> <name>dfs.http.address</name> <value>0.0.0.0:50070</value> </property> <property> <name>dfs.datanode.http.address</name> <value>0.0.0.0:50075</value> </property> <property> <name>dfs.replication</name> <value>3</value> </property> <property> <name>dfs.namenode.name.dir</name> <value>file:///data/hadoop/hdfs/nn</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>file:///data/hadoop/hdfs/dn</value> </property> <property> <name>fs.checkpoint.dir</name> <value>file:///data/hadoop/hdfs/snn</value> </property> <property> <name>fs.checkpoint.edits.dir</name> <value>file:///data/hadoop/hdfs/snn</value> </property> </configuration>
8.2.mapred-site.xml(指定使用yarn)
指定MapReduce是單獨運行 仍是運行在yarn之上,Hadoop2確定是運行在yarn之上的;見 2、Hadoop的運行模型
<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
8.3.yarn-site.xml
yarn-site.xml 用於配置YARN進程及YARN的相關屬性,首先須要指定ResourceManager守護進程的主機和監聽的端口,對於僞分佈式模型來說,其主機爲localhost,
默認的端口爲8032;其次須要指定ResourceManager使用的scheduler,以及NodeManager的輔助服務。一個簡要的配置示例以下所示:
<configuration> <property> <name>yarn.resourcemanager.address</name> <value>0.0.0.0:8032</value> </property> <property> <name>yarn.resourcemanager.scheduler.address</name> <value>0.0.0.0:8030</value> </property> <property> <name>yarn.resourcemanager.resource-tracker.address</name> <value>0.0.0.0:8031</value> </property> <property> <name>yarn.resourcemanager.admin.address</name> <value>0.0.0.0:8033</value> </property> <property> <name>yarn.resourcemanager.webapp.address</name> <value>0.0.0.0:8088</value> </property> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.nodemanager.auxservices.mapreduce_shuffle.class</name> <value>org.apache.hadoop.mapred.ShuffleHandler</value> </property> <property> <name>yarn.resourcemanager.scheduler.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value> </property> </configuration>
8.4.slave文件
slave文件存儲了當前集羣全部slave節點的列表,對於僞分佈式模型,其文件內容僅應該爲localhost,這特的確是這個文件的默認值。所以,爲分佈式模型中,次文件的內容保持默認便可。
PS:
若是服務器/虛擬機的進程起不來請確保本地 IP和配置文件裏面的IP是否已經發生變化!
8.5.格式化HDFS
在HDFS的NN啓動以前須要先初始化其用於存儲數據的目錄。
若是hdfs-site.xml中dfs.namenode.name.dir屬性指定的目錄不存在,格式化命令會自動建立之;
若是事先存在,請確保其權限設置正確,此時格式操做會清除其內部的全部數據並從新創建一個新的文件系統,須要以hdfs用戶的身份執行以下命令
19/03/01 11:31:22 INFO namenode.FSImage: Allocated new BlockPoolId: BP-1276811871-127.0.0.1-1551411082356 19/03/01 11:31:22 INFO common.Storage: Storage directory /data/hadoop/hdfs/nn has been successfully formatted. 19/03/01 11:31:22 INFO namenode.NNStorageRetentionManager: Going to retain 1 images with txid >= 0 19/03/01 11:31:22 INFO util.ExitUtil: Exiting with status 0 19/03/01 11:31:22 INFO namenode.NameNode: SHUTDOWN_MSG: /************************************************************ SHUTDOWN_MSG: Shutting down NameNode at localhost/127.0.0.1 ************************************************************/ [hdfs@localhost hadoop]$ hdfs namenode -format
9.啓動Hadoop
HDFS格式化完成以後就能夠啓動 去/bdapps/hadoop/etc/hadoop目錄下啓動Hadoop的5大守護進程了。
9.1.啓動HDFS集羣
HDFS有3個守護進程:namenode、datanode和secondarynamenode,他們都表示經過hadoop-daemon.sh腳本啓動或中止。以hadoop用戶執行相關命令;
hadoop-daemon.sh start namenode hadoop-daemon.sh start secondarynamenode hadoop-daemon.sh start datanode jps #jps命令:專門用於查看當前運行的java程序的,還支持遠程,python有嗎? 61392 NameNode 61602 Jps 61480 SecondaryNameNode 61532 DataNode
HDFS集羣web訪問接口:
http://127.0.0.1:50070/dfshealth.html#tab-overview
9.2啓動yarn集羣
切換成yarn用戶:YARN有2個守護進程:resourcemanager和nodemanager,它們經過yarn-daemon.sh腳本啓動或者中止。以hadoop用戶執行相關命令便可。
yarn-daemon.sh start resourcemanager yarn-daemon.sh start nodemanager jps 61803 ResourceManager 62043 NodeManager 62142 Jps
yarn集羣web訪問接口:
http://127.0.0.1:8088/cluster
10.測試
使用Hadoop自帶的 hadoop-mapreduce-examples-2.6.2.jar,執行MapReduce任務是否能夠正常執行,若是能夠就意味着安裝成功了。
在執行任務是要切換到hdfs用戶下
[hdfs@localhost mapreduce]$ yarn jar hadoop-mapreduce-examples-2.6.2.jar wordcount /test/a.txt /test/a.out
[hdfs@localhost mapreduce]$ hdfs dfs -ls /test/a.out Found 2 items -rw-r--r-- 1 hdfs supergroup 0 2019-03-01 14:09 /test/a.out/_SUCCESS -rw-r--r-- 1 hdfs supergroup 54 2019-03-01 14:09 /test/a.out/part-r-00000 [hdfs@localhost mapreduce]$ hdfs dfs -cat /test/a.out/part-r-00000 aaaaaaaaa 1 aaaaaaaaaaaaaaaaa 1 aaaaaaaaaaaaaaaaaaa 1
十一、python3調用HDFS集羣API
Hadoop安裝好了;(雖然說是僞分佈式的,若是要作分佈式作好ssh免密碼登陸,把配置文件分發出去就行了)
可是我在網上看到python的pyhdfs模塊能夠調用HDFS集羣的API進行上傳、下載、查找....文件...因而儲備下來了,也許能夠用做後期 Hadoop自動化項目;
注意:在使用pyhdfs模塊以前必定要確保Hadoop的配置文件都監聽在外網端口並修改host文件。
192.168.226.142 localhost #windows hosts文件的路徑 C:\WINDOWS\system32\drivers\etc\host Linux /etc/host
pip install pyhdfs -i http://pypi.douban.com/simple --trusted-host pypi.douban.com
import pyhdfs fs = pyhdfs.HdfsClient(hosts='192.168.226.142,50070',user_name='hdfs') fs.get_home_directory()#返回這個用戶的根目錄 fs.get_active_namenode()#返回可用的namenode節點 path='/zhanggen/' file='myfile.txt' file_name=path+file #在上傳文件以前,請修改本地 host文件 192.168.226.142 localhost C:\WINDOWS\system32\drivers\etc\host print('路徑已經存在') if fs.exists(path) else fs.mkdirs(path) print('文件已存在') if fs.exists(path+file) else fs.copy_from_local('c.txt',path+file,) #上傳本地文件到HDFS集羣 fs.copy_to_local(path+file, 'zhanggen.txt')# 從HDFS集羣上copy 文件到本地 fs.listdir(path) #以列表形式['a.out', 'a.txt'],返回指定目錄下的全部文件 response=fs.open(path+file) #查看文件內容 print(response.read()) fs.append(file_name,'Thanks myself for fighting ',) #在HDFS集羣的文件裏面添加內容 response=fs.open(file_name) #查看文件內容 print(response.read()) print(fs.get_file_checksum(file_name)) #查看文件大小 print(fs.list_status(path))#查看單個路徑的狀態 print(fs.list_status(file_name))#查看單個文件狀態
pip3 install mrjob -i http://pypi.douban.com/simple --trusted-host pypi.douban.com
hadoop fs -chown -R hadoop:hadoop /tmp #在執行MapReduce任務的時候hadoop用戶會建立socket,經過jdbc訪問。因此在執行你寫得MapReduce以前必定要設置權限
MapReduce 任務工做流程(假設要對如下3行數據,統計詞頻):
a b c
a c
a
第1步:map 把每一個字符串映射成鍵、值對
(a,1)(b,1)(c,1)
(a,1)(c1)
(a1)
自動shuffle & sort:
shuffle: 把相同鍵的 值組合成1個的列表,(洗牌:平時玩牌的時候 把手裏數字/字母相同的撲克牌們,碼放在一塊兒例如3A一塊兒,兩個2一塊兒!)
sort: 再根據鍵排序;
(a,[1,1,1])
(b,[1])
(c,[1,1])
第2步:shuffle and sort以後,把鍵相同的值放到列表了,就方便reduce的時候對值進行計算、聚合操做(sum,mean,max)了!
(a,3)
(b,1)
(c,2)
#!/usr/bin/python # -*- coding: utf-8 -*- from mrjob.job import MRJob import re class MRwordCount(MRJob): ''' line:一行數據 (a,1)(b,1)(c,1) (a,1)(c1) (a1) ''' def mapper(self, _, line): pattern=re.compile(r'(\W+)') for word in re.split(pattern=pattern,string=line): if word.isalpha(): yield (word.lower(),1) def reducer(self, word, count): #shuff and sort 以後 ''' (a,[1,1,1]) (b,[1]) (c,[1]) ''' l=list(count) yield (word,sum(l)) if __name__ == '__main__': MRwordCount.run() #run()方法,開始執行MapReduce任務。
python /MyMapReduce.py /a.txt -r hadoop #在Hadoop集羣,執行Python的MapReduce任務。
[hdfs@localhost hadoop]$ python /MyMapReduce.py /a.txt -r hadoop No configs found; falling back on auto-configuration No configs specified for hadoop runner Looking for hadoop binary in /bdapps/hadoop/bin... Found hadoop binary: /bdapps/hadoop/bin/hadoop Using Hadoop version 2.6.2 Looking for Hadoop streaming jar in /bdapps/hadoop... Found Hadoop streaming jar: /bdapps/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.2.jar Creating temp directory /tmp/MyMapReduce.hdfs.20190304.084739.219477 Copying local files to hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477/files/... Running step 1 of 1... packageJobJar: [/tmp/hadoop-unjar1053011439569578237/] [] /tmp/streamjob2611643769127644921.jar tmpDir=null Connecting to ResourceManager at /192.168.226.142:8032 Connecting to ResourceManager at /192.168.226.142:8032 Total input paths to process : 1 number of splits:2 Submitting tokens for job: job_1551427459997_0003 Submitted application application_1551427459997_0003 The url to track the job: http://192.168.226.142:8088/proxy/application_1551427459997_0003/ Running job: job_1551427459997_0003 Job job_1551427459997_0003 running in uber mode : false map 0% reduce 0% map 50% reduce 0% map 100% reduce 0% map 100% reduce 100% Job job_1551427459997_0003 completed successfully Output directory: hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477/output Counters: 49 File Input Format Counters Bytes Read=18 File Output Format Counters Bytes Written=18 File System Counters FILE: Number of bytes read=54 FILE: Number of bytes written=331118 FILE: Number of large read operations=0 FILE: Number of read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=320 HDFS: Number of bytes written=18 HDFS: Number of large read operations=0 HDFS: Number of read operations=9 HDFS: Number of write operations=2 Job Counters Data-local map tasks=2 Launched map tasks=2 Launched reduce tasks=1 Total megabyte-seconds taken by all map tasks=20077568 Total megabyte-seconds taken by all reduce tasks=5390336 Total time spent by all map tasks (ms)=19607 Total time spent by all maps in occupied slots (ms)=19607 Total time spent by all reduce tasks (ms)=5264 Total time spent by all reduces in occupied slots (ms)=5264 Total vcore-seconds taken by all map tasks=19607 Total vcore-seconds taken by all reduce tasks=5264 Map-Reduce Framework CPU time spent (ms)=1990 Combine input records=0 Combine output records=0 Failed Shuffles=0 GC time elapsed (ms)=352 Input split bytes=302 Map input records=3 Map output bytes=36 Map output materialized bytes=60 Map output records=6 Merged Map outputs=2 Physical memory (bytes) snapshot=501116928 Reduce input groups=3 Reduce input records=6 Reduce output records=3 Reduce shuffle bytes=60 Shuffled Maps =2 Spilled Records=12 Total committed heap usage (bytes)=319430656 Virtual memory (bytes) snapshot=6355677184 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 job output is in hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477/output Streaming final output from hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477/output... "a" 3 "b" 1 "c" 2 Removing HDFS temp directory hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477... Removing temp directory /tmp/MyMapReduce.hdfs.20190304.084739.219477... [hdfs@localhost hadoop]$
1.MapReduce案例
統計一下本週的報警狀況
因爲遺留了Zabbix報警未分類的問題,致使zabbix報警-----》轉換到運維平臺的工單信息---------》都是一個text字段!
#!/usr/bin/python # -*- coding: utf-8 -*- from mrjob.job import MRJob import re,csv key_list=['Free disk space','Zabbix agent','Alive ecerpdb.com','Oracle','FTP service','No data received from Orabbix','Alive ecpim'] class MRwordCount(MRJob): def mapper(self, _, line): #文本有幾行mapper 就執行幾回 row = csv.reader([line]).__next__() #讀取CSV文件的每一行,變成列表形式! for key in key_list: if key in row[-1]: yield (key,1) #自動shuffle & reduce def reducer(self, word, count): #maper yeild 幾個key ,reducer就執行幾回 l=list(count) yield (word,sum(l)) if __name__ == '__main__': MRwordCount.run() #run()方法,開始執行MapReduce任務。
#!/usr/bin/python # -*- coding:utf-8 -*- from mrjob.job import MRJob import re,csv,sys class University_top10(MRJob): def mapper(self, _,line): row = csv.reader([line]).__next__() # 讀取CSV文件的每一行,變成列表形式! if not row[0].isdigit():#跳過['名次', '學校名稱', '總分', '類型', '所在省份', '所在城市', '辦學方向', '主管部門'] return yield ('top',(float(row[2]),row[1])) #學校名稱,總分 def reducer(self, top_key,score_and_university_name): top10=[] for key in list(score_and_university_name): top10.append(key) top10.sort() top10=top10[-10:] top10.reverse() for key in top10: yield key[1],key[0] if __name__ == '__main__': University_top10.run()
#!/usr/bin/python # -*- coding:utf-8 -*- from mrjob.job import MRJob import re,csv,sys class University_top10(MRJob): def mapper(self, _,line): row = csv.reader([line]).__next__() # 讀取CSV文件的每一行,變成列表形式! if not row[0].isdigit(): #跳過['名次', '學校名稱', '總分', '類型', '所在省份', '所在城市', '辦學方向', '主管部門','人均消費'] return yield ('top',(float(row[2]),row[1])) #yield('top',學校名稱,總分) if row[-1].isdigit(): yield ('cost',(float(row[-1]),row[1])) #yeild('coast',學校名稱,人均消費) def reducer(self,key,value): #因爲mapper方法yeild了2個key【top和coast】,因此reducer方法執行2次 top10=[] for list_item in list(value): top10.append(list_item) top10.sort() top10=top10[-10:] top10.reverse() if key=='top'else top10.sort()#求出得分前十的大學,和消費前十的大學 for list_item in top10: yield list_item[1],list_item[0] if __name__ == '__main__': University_top10.run()
#!/usr/bin/python # -*- coding:utf-8 -*- from mrjob.job import MRJob class Max_Mix_Temperature(MRJob): def mapper(self, _,line): row=line.split(',') if row[2]== 'min': yield 'min',(float(row[3]),row[1]) if row[2]=='max': yield 'max',(float(row[3]),row[1]) def reducer(self,key,value): l=list(value) if key=='max': yield key,max(l) elif key=='min': yield key,min(l) if __name__ == '__main__': Max_Mix_Temperature.run()
1 # #!/usr/bin/python 2 # # -*- coding:utf-8 -*- 3 from mrjob.job import MRJob,MRStep 4 5 class Top3_Mean_Friends(MRJob): 6 def mapper1(self, _,line): 7 row=line.split(',') 8 if row[2].isdigit() and row[3].isdigit(): 9 yield (row[2],int(row[3])) #返回年齡 和朋友個數 10 11 12 def reducer1(self,age,friends): 13 friends_count=list(friends) 14 yield (age, sum(friends_count)/len(friends_count)) #每一個年齡段的 平均朋友個數 15 16 def mapper2(self, age,average_coun): 17 yield (None,(average_coun,str(age)+'year')) 18 19 def reducer2(self, _,average_list): #在平均朋友個數的基礎上,求出朋友數數量最大的top3 20 l=list(average_list) 21 l.sort() 22 top3=l[-3:] 23 top3.reverse() 24 for i in top3: 25 yield (i[0],i[1]) 26 27 def steps(self): #鏈接多個mapper、reducer 28 return [ 29 MRStep(mapper=self.mapper1,reducer=self.reducer1), 30 MRStep(mapper=self.mapper2,reducer=self.reducer2) 31 ] 32 33 if __name__ == '__main__': 34 Top3_Mean_Friends.run()
from mrjob.job import MRJob,MRStep class Top_AnnualSalary_Job(MRJob): #ID,Name,JobTitle,AnnualSalary,GrossSpend def mapper1(self, _,line): row=line.split(',') if row[0]=='ID': return yield (row[2],int(row[3])) def reducer1(self,jobtitle,annualsalary): AnnualSalary=list(annualsalary) yield ('job_annualsalary',(sum(AnnualSalary)/len(AnnualSalary),jobtitle)) def mapper2(self,key,job_annualsalary): yield key,job_annualsalary def reducer2(self, key, values): l=list(values) print('old',l) new_l=[] for i in l: new_l.append(i) new_l.sort(reverse=True) new_l=new_l[0:3] for k in new_l: yield k[1],k[0] def steps(self): # 鏈接多個mapper、reducer return [ MRStep(mapper=self.mapper1, reducer=self.reducer1),MRStep(mapper=self.mapper2, reducer=self.reducer2)] if __name__ == '__main__': Top_AnnualSalary_Job.run()
2.map + combine +reduce
map節點把全部集合計算的工做經過TCP協議傳輸到reduce節點會出現 單點負載壓力的問題,因此combine出現了;
combine就是小的reduce,能夠在map發送數據給reduce以前進行在map節點作初步的聚合運算,減少reduce節點的壓力, 加速MapReduce任務的執行;