前言:html
HADOOP的核心組成部分:HDFS文件系統和Mapreduce。在構建這個大數據分佈式應用框架的過程當中,解決了不少了共性問題而且都封裝爲開源的框架,這些框架徹底能夠拿來用《Hadoop API》。java
Pig 是一個基於Hadoop的大規模數據分析平臺,Pig爲複雜的海量數據並行計算提供了一個簡易的操做和編程接口。
Chukwa 是基於Hadoop的集羣監控系統,由yahoo貢獻。用於管理大型分佈式系統的數據收集系統(2000+以上的節點, 系統天天產生的監控數據量在T級別)。
Hive 是基於Hadoop的一個工具,提供完整的sql查詢功能,能夠將sql語句轉換爲MapReduce任務進行運行。
ZooKeeper 高效的,可擴展的協調系統,存儲和協調關鍵共享狀態。
HBase 是一個開源的,基於列存儲模型的分佈式數據庫。
HDFS 是一個分佈式文件系統。有着高容錯性的特色,而且設計用來部署在低廉的硬件上,適合那些有着超大數據集的應用程序。
MapReduce 是一種編程模型,用於大規模數據集(大於1TB)的並行運算。
Hadoop RPC 遠程過程調用RPC。
Hadoop YARN 任務調度和集羣資源管理。
其餘相關:
Avro 數據序列化系統,由Doug Cutting牽頭開發,是一個數據序列化系統。
Cassandra 可擴展的多主數據庫,沒有單點故障。是一套開源分佈式NoSQL數據庫系統。
Mahout Apache旗下的一個開源項目,提供一些可擴展的機器學習領域經典算法的實現,旨在幫助開發人員更加方便快捷地建立智能應用程序。
Tez 用於構建高性能批處理和交互式數據處理應用程序的可擴展框架,由Apache Hadoop中的YARN協調。node
一、HDFS文件系統linux
1.1 HDFS有不少特色:摘自http://www.javashuo.com/article/p-fmckmuuk-eo.html 源碼分析git
① 保存多個副本,且提供容錯機制,副本丟失或宕機自動恢復。默認存3份。github
② 運行在廉價的機器上。算法
③ 適合大數據的處理。多大?多小?HDFS默認會將文件分割成block,64M爲1個block。而後將block按鍵值對存儲在HDFS上,並將鍵值對的映射存到內存中。若是小文件太多,那內存的負擔會很重。sql
NameNode元數據管理機制(非HA模式):數據庫
如上圖所示,HDFS也是按照Master和Slave的結構。分NameNode、SecondaryNameNode、DataNode這幾個角色。 NameNode:是Master節點,是大領導。管理數據塊映射;處理客戶端的讀寫請求;配置副本策略;管理HDFS的名稱空間; SecondaryNameNode:是一個小弟,分擔大哥namenode的工做量;是NameNode的冷備份;合併fsimage和fsedits而後再發給namenode。 DataNode:Slave節點,奴隸,幹活的。負責存儲client發來的數據塊block;執行數據塊的讀寫操做。 熱備份:b是a的熱備份,若是a壞掉。那麼b立刻運行代替a的工做。 冷備份:b是a的冷備份,若是a壞掉。那麼b不能立刻代替a工做。可是b上存儲a的一些信息,減小a壞掉以後的損失。 fsimage:元數據鏡像文件(文件系統的目錄樹。) edits:元數據的操做日誌(針對文件系統作的修改操做記錄) namenode內存中存儲的是=fsimage+edits。 SecondaryNameNode負責定時默認1小時,從namenode上,獲取fsimage和edits來進行合併,而後再發送給namenode。減小namenode的工做量。
1.2 工做原理apache
寫操做
有一個文件FileA,100M大小。Client將FileA寫入到HDFS上。 HDFS按默認配置。 HDFS分佈在三個機架上Rack1,Rack2,Rack3。 a. Client將FileA按64M分塊。分紅兩塊,block1和Block2; b. Client向nameNode發送寫數據請求,如圖藍色虛線①------>。 c. NameNode節點,記錄block信息。並返回可用的DataNode,如粉色虛線②--------->。 Block1: host2,host1,host3 Block2: host7,host8,host4 原理: NameNode具備RackAware機架感知功能,這個能夠配置。 若client爲DataNode節點,那存儲block時,規則爲:副本1,同client的節點上;副本2,不一樣機架節點上;副本3,同第二個副本機架的另外一個節點上;其餘副本隨機挑選。 若client不爲DataNode節點,那存儲block時,規則爲:副本1,隨機選擇一個節點上;副本2,不一樣副本1,機架上;副本3,同副本2相同的另外一個節點上;其餘副本隨機挑選。 d. client向DataNode發送block1;發送過程是以流式寫入。 流式寫入過程, 1>將64M的block1按64k的package劃分; 2>而後將第一個package發送給host2; 3>host2接收完後,將第一個package發送給host1,同時client想host2發送第二個package; 4>host1接收完第一個package後,發送給host3,同時接收host2發來的第二個package。 5>以此類推,如圖紅線實線所示,直到將block1發送完畢。 6>host2,host1,host3向NameNode,host2向Client發送通知,說「消息發送完了」。如圖粉紅顏色實線所示。 7>client收到host2發來的消息後,向namenode發送消息,說我寫完了。這樣就真完成了。如圖黃色粗實線 8>發送完block1後,再向host7,host8,host4發送block2,如圖藍色實線所示。 9>發送完block2後,host7,host8,host4向NameNode,host7向Client發送通知,如圖淺綠色實線所示。 10>client向NameNode發送消息,說我寫完了,如圖黃色粗實線。。。這樣就完畢了。 分析,經過寫過程,咱們能夠了解到: ①寫1T文件,咱們須要3T的存儲,3T的網絡流量貸款。 ②在執行讀或寫的過程當中,NameNode和DataNode經過HeartBeat進行保存通訊,肯定DataNode活着。若是發現DataNode死掉了,就將死掉的DataNode上的數據,放到其餘節點去。讀取時,要讀其餘節點去。 ③掛掉一個節點,不要緊,還有其餘節點能夠備份;甚至,掛掉某一個機架,也不要緊;其餘機架上,也有備份。
讀操做
操做就簡單一些了,如圖所示,client要從datanode上,讀取FileA。而FileA由block1和block2組成。
那麼,讀操做流程爲:
a. client向namenode發送讀請求。
b. namenode查看Metadata信息,返回fileA的block的位置。
block1:host2,host1,host3
block2:host7,host8,host4
c. block的位置是有前後順序的,先讀block1,再讀block2。並且block1去host2上讀取;而後block2,去host7上讀取;
上面例子中,client位於機架外,那麼若是client位於機架內某個DataNode上,例如,client是host6。那麼讀取的時候,遵循的規律是:
優選讀取本機架上的數據。
二、MapReduce
MapReduce是一種編程模型,用於大規模數據集(大於1TB)的並行運算。概念"Map(映射)"和"Reduce(歸約)",是它們的主要思想。MapReduce極大地方便了編程人員在不會分佈式並行編程的狀況下,將本身的程序運行在分佈式系統上。摘自:http://www.cnblogs.com/wangxin37/p/6501495.html
補充: 多個job在同一個main方法中提交:使用Shell腳本組織並執行。
2.1 單詞分詞實例
WCMapper.java
import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; //4個泛型中,前兩個是指定mapper輸入數據的類型,KEYIN是輸入的key的類型,VALUEIN是輸入的value的類型 //map 和 reduce 的數據輸入輸出都是以 key-value對的形式封裝的 //默認狀況下,框架傳遞給咱們的mapper的輸入數據中,key是要處理的文本中一行的起始偏移量,這一行的內容做爲value public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ //mapreduce框架每讀一行數據就調用一次該方法 @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //具體業務邏輯就寫在這個方法體中,並且咱們業務要處理的數據已經被框架傳遞進來,在方法的參數中 key-value //key 是這一行數據的起始偏移量 value 是這一行的文本內容 //將這一行的內容轉換成string類型 String line = value.toString(); //對這一行的文本按特定分隔符切分 String[] words = StringUtils.split(line, " "); //遍歷這個單詞數組輸出爲kv形式 k:單詞 v : 1 for(String word : words){ context.write(new Text(word), new LongWritable(1)); } } }
WCReducer.java
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ //框架在map處理完成以後,將全部kv對緩存起來,進行分組,而後傳遞一個組<key,valus{}>,調用一次reduce方法 //<hello,{1,1,1,1,1,1.....}> @Override protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException { long count = 0; //遍歷value的list,進行累加求和 for(LongWritable value:values){ count += value.get(); } //輸出這一個單詞的統計結果 context.write(key, new LongWritable(count)); } }
WCRunner.java
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 用來描述一個特定的做業 * 好比,該做業使用哪一個類做爲邏輯處理中的map,哪一個做爲reduce * 還能夠指定該做業要處理的數據所在的路徑 * 還能夠指定改做業輸出的結果放到哪一個路徑 * .... * @author duanhaitao@itcast.cn * */ public class WCRunner { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job wcjob = Job.getInstance(conf); //設置整個job所用的那些類在哪一個jar包 wcjob.setJarByClass(WCRunner.class); //本job使用的mapper和reducer的類 wcjob.setMapperClass(WCMapper.class); wcjob.setReducerClass(WCReducer.class); //指定reduce的輸出數據kv類型 wcjob.setOutputKeyClass(Text.class); wcjob.setOutputValueClass(LongWritable.class); //指定mapper的輸出數據kv類型 wcjob.setMapOutputKeyClass(Text.class); wcjob.setMapOutputValueClass(LongWritable.class); //指定要處理的輸入數據存放路徑 FileInputFormat.setInputPaths(wcjob, new Path("hdfs://weekend110:9000/wc/srcdata/")); //指定處理結果的輸出數據存放路徑 FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://weekend110:9000/wc/output3/")); //將job提交給集羣運行 wcjob.waitForCompletion(true); } }
MR程序的幾種提交運行模式:
本地模型運行 1/在windows的eclipse裏面直接運行main方法,就會將job提交給本地執行器localjobrunner執行 ----輸入輸出數據能夠放在本地路徑下(c:/wc/srcdata/) ----輸入輸出數據也能夠放在hdfs中(hdfs://weekend110:9000/wc/srcdata) 2/在linux的eclipse裏面直接運行main方法,可是不要添加yarn相關的配置,也會提交給localjobrunner執行 ----輸入輸出數據能夠放在本地路徑下(/home/hadoop/wc/srcdata/) ----輸入輸出數據也能夠放在hdfs中(hdfs://weekend110:9000/wc/srcdata) 集羣模式運行 1/將工程打成jar包,上傳到服務器,而後用hadoop命令提交 hadoop jar wc.jar cn.itcast.hadoop.mr.wordcount.WCRunner 2/在linux的eclipse中直接運行main方法,也能夠提交到集羣中去運行,可是,必須採起如下措施: ----在工程src目錄下加入 mapred-site.xml 和 yarn-site.xml ----將工程打成jar包(wc.jar),同時在main方法中添加一個conf的配置參數 conf.set("mapreduce.job.jar","wc.jar"); 3/在windows的eclipse中直接運行main方法,也能夠提交給集羣中運行,可是由於平臺不兼容,須要作不少的設置修改 ----要在windows中存放一份hadoop的安裝包(解壓好的) ----要將其中的lib和bin目錄替換成根據你的windows版本從新編譯出的文件 ----再要配置系統環境變量 HADOOP_HOME 和 PATH ----修改YarnRunner這個類的源碼
2.2 MapReduce執行過程
MR的分發和管理依賴YARN,MR被放在Yarn上運行,實際是放在Yarn的ApplicationMaster中,由ApplicationMaster管理,ApplicationMaster在運行起來後,先向Yarn中ResourceManager註冊並所要資源,ResourceManager就通知每一個節點上的小弟:NodeManager,NodeManager接到通知,馬上執行,最終給ApplicationMaster提供一個運行的環境:Container。而後在Container中MapReduce(也能夠是Spark/Storm其餘計算)中就開始了本身計算。
MapReduce運行的時候,會經過Mapper運行的任務讀取HDFS中的數據文件(InputFormat),而後調用本身的方法,處理數據(Shuffle),最後輸出(OutputFormat)。Reducer任務會接收Mapper任務輸出的數據,做爲本身的輸入數據,調用本身的方法,最後輸出到HDFS的文件中。在這個過程當中,用戶能夠自定義Reduce的併發數,Map的併發數依賴切片Spilt數,切片數依賴每一個切片的大小,能夠設置。整個流程如圖:
1.執行MR的命令:
hadoop jar <jar在linux的路徑> <main方法所在的類的全類名> <參數> 例子:
hadoop jar /root/wc1.jar cn.itcast.d3.hadoop.mr.WordCount hdfs://itcast:9000/words /out2
2.MR執行流程
(1).客戶端提交一個mr的jar包給JobClient(提交方式:hadoop jar ...) (2).JobClient經過RPC和JobTracker進行通訊,返回一個存放jar包的地址(HDFS)和jobId (3).client將jar包寫入到HDFS當中(path = hdfs上的地址 + jobId) (4).開始提交任務(任務的描述信息,不是jar, 包括jobid,jar存放的位置,配置信息等等) (5).JobTracker進行初始化任務 (6).讀取HDFS上的要處理的文件,開始計算輸入分片,每個分片對應一個MapperTask (7).TaskTracker經過心跳機制領取任務(任務的描述信息) (8).下載所需的jar,配置文件等 (9).TaskTracker啓動一個java child子進程,用來執行具體的任務(MapperTask或ReducerTask) (10).將結果寫入到HDFS當中
2.3 Mapper任務詳解
每一個Mapper任務是一個java進程,它會讀取HDFS中的文件,解析成不少的鍵值對,通過咱們覆蓋的map方法處理後,轉換爲不少的鍵值對再輸出。整個Mapper任務的處理過程又能夠分爲如下幾個階段,如圖所示:
第一階段是把輸入文件按照必定的標準進行分片(InputSplit),每一個輸入片的大小是固定的。默認狀況下,輸入片(InputSplit)的大小與數據塊(Block)的大小是相同的。若是數據塊(Block)的大小是默認值64MB,輸入文件有兩個,一個是32MB,一個是72MB。那麼小的文件是一個輸入片,大文件會分爲兩個數據塊,那麼是兩個輸入片。一共產生三個輸入片。每個輸入片由一個Mapper進程處理。這裏的三個輸入片,會有三個Mapper進程處理。 第二階段是對輸入片中的記錄按照必定的規則解析成鍵值對。有個默認規則是把每一行文本內容解析成鍵值對。「鍵」是每一行的起始位置(單位是字節),「值」是本行的文本內容。 第三階段是調用Mapper類中的map方法。第二階段中解析出來的每個鍵值對,調用一次map方法。若是有1000個鍵值對,就會調用1000次map方法。每一次調用map方法會輸出零個或者多個鍵值對。 第四階段是按照必定的規則對第三階段輸出的鍵值對進行分區。比較是基於鍵進行的。好比咱們的鍵表示省份(如北京、上海、山東等),那麼就能夠按照不一樣省份進行分區,同一個省份的鍵值對劃分到一個區中。默認是隻有一個區。分區的數量就是Reducer任務運行的數量。默認只有一個Reducer任務。 第五階段是對每一個分區中的鍵值對進行排序。首先,按照鍵進行排序,對於鍵相同的鍵值對,按照值進行排序。好比三個鍵值對<2,2>、<1,3>、<2,1>,鍵和值分別是整數。那麼排序後的結果是<1,3>、<2,1>、<2,2>。若是有第六階段,那麼進入第六階段;若是沒有,直接輸出到本地的linux文件中。 第六階段是對數據進行歸約處理,也就是reduce處理。鍵相等的鍵值對會調用一次reduce方法。通過這一階段,數據量會減小。歸約後的數據輸出到本地的linxu文件中。本階段默認是沒有的,須要用戶本身增長這一階段的代碼。
2.4 Reducer任務詳解
每一個Reducer任務是一個java進程。Reducer任務接收Mapper任務的輸出,歸約處理後寫入到HDFS中,能夠分爲以下圖所示的幾個階段:
第一階段是Reducer任務會主動從Mapper任務複製其輸出的鍵值對。Mapper任務可能會有不少,所以Reducer會複製多個Mapper的輸出。
第二階段是把複製到Reducer本地數據,所有進行合併,即把分散的數據合併成一個大的數據。再對合並後的數據排序。
第三階段是對排序後的鍵值對調用reduce方法。鍵相等的鍵值對調用一次reduce方法,每次調用會產生零個或者多個鍵值對。最後把這些輸出的鍵值對寫入到HDFS文件中
2.5 Shuffle
Shuffle過程是MapReduce的核心,也被稱爲奇蹟發生的地方。摘自:http://blog.csdn.net/github_36444580/article/details/75208992
一、MapReduce中,Map階段處理的數據如何傳遞給Reduce階段,是MapReduce框架中最關鍵的一個流程,這個流程就叫Shuffle;
二、Shuffle: 洗牌、發牌(核心機制:數據分區、排序、緩存);
三、具體來講:就是將map task輸出的處理結果數據,分發給reduce task,並在分發的過程當中,對數據按key進行了分區和排序,分區和排序能夠分別根據WritableComparable接口、Partitioner接口( 見下圖:AreaPartitioner.java)實現自定義。
Shuffle是MR處理流程中的一個過程,它的每個處理步驟是分散在各個map task和reduce task節點上完成的。
Shuffle緩存流程
Shuffle運行機制
上面的流程是整個mapreduce最全工做流程,可是shuffle過程只是從第7步開始到第16步結束,具體shuffle過程詳解,以下: 1)maptask收集咱們的map()方法輸出的kv對,放到內存緩衝區中 2)從內存緩衝區不斷溢出本地磁盤文件,可能會溢出多個文件 3)多個溢出文件會被合併成大的溢出文件 4)在溢出過程當中,及合併的過程當中,都要調用partitoner進行分組和針對key進行排序 5)reducetask根據本身的分區號,去各個maptask機器上取相應的結果分區數據 6)reducetask會取到同一個分區的來自不一樣maptask的結果文件,reducetask會將這些文件再進行合併(歸併排序) 7)合併成大文件後,shuffle的過程也就結束了,後面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法) 注意 Shuffle中的緩衝區大小會影響到mapreduce程序的執行效率,原則上說,緩衝區越大,磁盤io的次數越少,執行速度就越快。 緩衝區的大小能夠經過參數調整,參數:io.sort.mb 默認100M
Partition分區
若是reduceTask的數量> getPartition的結果數,則會多產生幾個空的輸出文件part-r-000xx;
若是1<reduceTask的數量<getPartition的結果數,則有一部分分區數據無處安放,會Exception;
若是reduceTask的數量=1,則無論mapTask端輸出多少個分區文件,最終結果都交給這一個reduceTask,最終也就只會產生一個結果文件 part-r-00000;
例如:假設自定義分區數爲5,則
(1)job.setNumReduceTasks(1);會正常運行,只不過會產生一個輸出文件
(2)job.setNumReduceTasks(2);會報錯
(3)job.setNumReduceTasks(6);大於5,程序會正常運行,會產生空文件
改變分組數(Partition):直接影響就是最終Reduce輸出文件個數,Reduce的併發個數=Partintion個數。
AreaPartitioner.java
package cn.itcast.hadoop.mr.areapartition; import java.util.HashMap; import org.apache.hadoop.mapreduce.Partitioner; public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{ private static HashMap<String,Integer> areaMap = new HashMap<>(); static{ areaMap.put("135", 0); areaMap.put("136", 1); areaMap.put("137", 2); areaMap.put("138", 3); areaMap.put("139", 4); } @Override public int getPartition(KEY key, VALUE value, int numPartitions) { //從key中拿到手機號,查詢手機歸屬地字典,不一樣的省份返回不一樣的組號 int areaCoder = areaMap.get(key.toString().substring(0, 3))==null?5:areaMap.get(key.toString().substring(0, 3)); return areaCoder; } }
FlowSumArea.java
package cn.itcast.hadoop.mr.areapartition; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import cn.itcast.hadoop.mr.flowsum.FlowBean; /** * 對流量原始日誌進行流量統計,將不一樣省份的用戶統計結果輸出到不一樣文件 * 須要自定義改造兩個機制: * 一、改造分區的邏輯,自定義一個partitioner * 二、自定義reduer task的併發任務數 * * @author duanhaitao@itcast.cn * */ public class FlowSumArea { public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //拿一行數據 String line = value.toString(); //切分紅各個字段 String[] fields = StringUtils.split(line, "\t"); //拿到咱們須要的字段 String phoneNB = fields[1]; long u_flow = Long.parseLong(fields[7]); long d_flow = Long.parseLong(fields[8]); //封裝數據爲kv並輸出 context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow)); } } public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ @Override protected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException, InterruptedException { long up_flow_counter = 0; long d_flow_counter = 0; for(FlowBean bean: values){ up_flow_counter += bean.getUp_flow(); d_flow_counter += bean.getD_flow(); } context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumArea.class); job.setMapperClass(FlowSumAreaMapper.class); job.setReducerClass(FlowSumAreaReducer.class); //設置咱們自定義的分組邏輯定義 job.setPartitionerClass(AreaPartitioner.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //設置reduce的任務併發數,應該跟分組的數量保持一致 job.setNumReduceTasks(1); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } }
Combiner合併
1)combiner是MR程序中Mapper和Reducer以外的一種組件;
2)combiner組件的父類就是Reducer;
3)combiner和reducer的區別在於運行的位置:Combiner是在每個maptask所在的節點運行,Reducer是接收全局全部Mapper的輸出結果;
4)combiner的意義就是對每個maptask的輸出進行局部彙總,以減少網絡傳輸量;
6)combiner可以應用的前提是不能影響最終的業務邏輯,並且,combiner的輸出kv應該跟reducer的輸入kv類型要對應起來Mapper;