hadoop核心組件(一)

  hadoop的核心組件:hdfs(分佈式文件系統)、mapreduce(分佈式計算框架)、Hive(基於hadoop的數據倉庫)、HBase(分佈式列存數據庫)、Zookeeper(分佈式協做服務)、Sqoop(數據同步工具)和Flume(日誌手機工具)
 
hdfs(分佈式文件系統):
由client、NameNode、DataNode組成
  • client負責切分文件,並與NameNode交互,獲取文件位置;與DataNode交互,讀取和寫入數據
  • NameNode是Master節點,管理HDFS的名稱空間和數據塊映射信息,配置副本策略,處理客戶端請求
  • DataNode是Slave節點,存儲實際數據,彙報存儲信息給NameNode
  • DataNode與NameNode保持心跳,提交block列表
在hadoop1.x的時候還有Secondary NameNode,負責輔助NameNode,分擔其工做量;按期合併fsimage和fsedits,推送給NameNode;緊急狀況下,可輔助恢復NameNode
 
存儲模型
(1)文件線性切割成Block  offset
(2)Block分散存儲在集羣節點中,Block是HDFS的基本存儲單元,默認大小是64M
(3)單一文件Block大小一致,文件與文件能夠不一致
(4)Block可設置副本數(小於節點數),分散在不一樣節點
(5)文件上傳能夠設置Block大小和副本數
(6)已上傳的文件Block副本數能夠調整,大小不變
(7)只支持一次寫入屢次讀取,同一時刻只有一個寫入者
(8)能夠append追加數據
 
架構模型
(1)NameNode節點保存文件元數據
(2)DataNode節點保存文件Block數據
(3)DataNode與NameNode保持心跳,提交Block列表
(4)HdfsClient與NameNode交互元數據信息
(5)HdfsClient與DataNode交互文件Block數據
 
hdfs結構
1、NameNode(不會與磁盤發生交換)
    (1)基於內存存儲
  • 只存在內存中
  • 持久化
    • 啓動後, 元數據(metadate)信息加載到內存
    • metadata的磁盤文件名爲」fsimage」
    • Block的位置信息不會保存到fsimage
    • (journalNode的做用是存放EditLog的)edits記錄對metadata的操做日誌
 
    (2)功能
  • 接收客戶端讀寫
  • 收集DataNode彙報的block列表信息
    (3) metadata
  • 文件ownership, permissions(文件全部權、權限)
  • 文件大小, 時間
  • (block列表,block偏移量)--->會持久化, 位置信息--->不會持久化(啓動時候由DataNode彙報過來)
  • block每一個副本位置(dataNode上報)
2、DataNode
    (1)本地文件形式存儲block
    (2)存儲Block的元數據信息文件
    (3)啓動DN時會向NN彙報block信息
    (4)經過向NameNode發送心跳(3秒一次),若是NameNode 10分鐘沒有收到,則認爲已經lost,並copy其上的block到其它DN
3、SecondaryNameNode/Qurom Journal Manager
    合併時機
        fs.checkpoint.period  3600s
        fs.checkpoint.size  64MB
4、ZooKeeper Failover Controller(HDFS 2.0 HA)
    (1)監控NameNode健康狀態
    (2)向Zookeeper註冊NameNode
    (3)NameNode掛掉後,ZKFC爲NameNode競爭鎖,得到ZKFC 鎖的NameNode變爲active
 
五、Block副本放置位置
(1)第一個副本:放置在上傳文件的DN;若是是集羣外提交,則隨機挑選一臺磁盤不太滿,CPU不太忙的節點
(2)第二個副本:放置在於第一個副本不一樣的機架的節點上
(3)第三個副本:與第二個副本相同機架的節點
(4)更多副本:隨機節點
 
六、安全模式
(1)NameNode啓動, fsimage載入內存, 執行edits
(2)成功創建元數據映射後, 建立新的fsimage文件(無需SNN)和空的edits
(3)檢查副本數, 數量正常後,過若干時間, 解除安全模式
 
七、優缺點
優勢:
    高容錯性(多副本, 自動恢復)
    適合批處理(計算移動, 數據位置暴露給計算框架(block))
    適合大數據處理(GB TB PB級數據)
    可構建在廉價機器上
    高吞吐
缺點:
    高延遲
    小文件存取(佔用namenode內存, 尋道時間超過讀取時間)
    併發寫入、文件隨機修改(一個文件一個寫入者, 只能append)
 
hdfs寫流程

 

client切分文件與NanmeNode交互,獲取DataNode列表,驗證DataNode後鏈接DataNode,各節點之間兩兩交互,肯定可用後,
client以更小單位流式傳輸數據;
Block傳輸數據結束後,DataNode向NameNode彙報Block信息,DataNode向Client彙報完成,Client向NameNode彙報完成,獲
取去下一個Block存放的DataNode列表,循環以上步驟,最終client彙報完成,NameNode會在寫流程更新文件狀態。
 
hdfs讀流程
 
client與NameNode交互,獲取Block存放的DataNode列表(Block副本的位置信息),線性和DataNode交互,獲取Block,最終
合併爲一個文件,其中,在Block副本列表中按距離擇優選取DataNode節點獲取Block塊。
 
mapreduce(分佈式計算框架)

 MR運行原理:java

    一、客戶端提交做業以前,檢查輸入輸出路徑,首先建立切片列表
            反射出做業中設置的input對象,默認是TextInputFormat類
            經過input類獲得切片列表(getSpilits()方法)
                        最小值 minSize 默認爲1,若是設置就取設置的值
                        最大值 maxSize 默認爲long的最大值
                        根據輸入路徑取出文件,獲取每一個文件的全部block列表,接着建立splits列表(包含文件名,偏移量,長度和位置信息)
                        切片大小根據最大最小值取,默認爲block的大小
                   一個split對應一個map
        提交做業到集羣(submitJob()方法)
    二、mapInput:
            input.initialize    輸入初始化
                    拿到taskContext(上下文)
                    建立mapper(默認爲Mapper類,通常取用戶設置的)
                    獲取InputFomat類(輸入格式化的類)
                    獲取split
                    根據以上信息建立input(NewTrackingRecordReader)
                        input初始化
                           獲取split的開始和結束位置和文件,開啓對文件的IO流,將起始偏移量個IO設置一下
                               若是不是第一個切片(split),每次讀取放棄第一行(跳過第一行數據),只有第一個切片纔會讀取第一行數據
            mapper.run
    三、output:
            MapOutputBuffer初始化
                    環形緩衝區的閾值0.八、大小(100M)    默認值
                    sorter :QuickSort算法
                    反射獲取比較器 OutputKeyComparator
                    排序,溢寫,一些一次觸發一次combiner
                    溢寫達到3次的時候還會觸發一次combiner
            經過反射獲取Partitioner類,默認爲HashPartitoner
            write(k,v)
                collector.collect(key,value,partition)
            output.close()
                merger
                若是numSplits<minSpillsForCombiner 判斷溢寫的次數是否是小於設置的合併的溢寫次數(默認是3),成立的話combiner
    四、reduce:
            shuffle:copy
            sort:SecondarySort
            reduce
 
一、mapreduce shuffle
(1)maptask的輸入是hdfs上的block塊,maptask只讀取split,block與split的對應關係默認是一對一
(2)進過map端的運行後,輸出的格式爲key/value,Mapreduce提供接口partition,他的做用是根據maptask輸出的key hash後與
reduce數量取模,來決定當前的輸出對應到哪一個reduce處理,也能夠自定義partition
(3)map運行後的數據序列化到緩衝區,默認這個緩衝區大小爲100M,做用是收集這個map的結果,當數據達到溢寫比例
(默認是spill.percent=0.8)後,所定這80M的內存,對這80M內存中的key作排序(sort),maptask的輸出結果還能夠往剩下的20M內
存中寫,互不影響。以後執行溢寫的線程會往磁盤中寫數據。每次溢寫都會產生一個溢寫小文件,map執行完後,會合並這些溢寫小文件,
這個過程叫Merge。
(4)若是客戶端設置了Combiner,那麼會優化MapReduce的中間結果,合併map端的數據(至關於reduce端的預處理),Combiner
不能改變最終的計算結果。
(5)reduce在執行以前就是從各個maptask執行完後的溢寫文件中拿到所對應的數據,而後作合併(Merge),最終造成的文件做爲
reduce的輸入文件,這個過程是歸併排序。最後就是reduce計算,把結果放到hdfs上面。
 
hdfs參數調優
io.file.buffer.size:4096 (core-default.xml) SequenceFiles在讀寫中可使用緩存大小,可減小I/O次數;在大型Hadoop cluster,建議可設定爲65536-131072
dfs.blockes:134217728( hdfs-default.xml ) hdfs中一個文件的Block塊的大小,CDH5中默認爲128M;設置太大影響map同時計算的數量,設置較少會浪費map個數資源
mapred.reduce.tasks(mapreduce.job.reduces):1 默認啓動的reduce數
mapreduce.task.io.sort.factor:10 reduce task中合併文件時,一次合併的文件數據
mapred.child.java.opts:-Xmx200m jvm啓動子線程可使用的最大內存
mapred.reduce.parallel.copies:5 Reduce copy數據的線程數量,默認值是5
 
mapreduce.tasktracker.http.threads:40 map和reduce是經過http進行傳輸的,這個設置傳輸的並行線程數
mapreduce.map.output.compress:flase map輸出是否進行壓縮,若是壓縮就會多耗cpu,可是減小傳輸時間,若是不壓縮,就須要較多的傳輸帶寬。配合 mapreduce.map.output.compress.codec使用,默認是 org.apache.hadoop.io.compress.DefaultCodec,能夠根據須要設定數據壓縮方式。
mapreduce.tasktracker.tasks.reduce.maximum:2 一個tasktracker併發執行的reduce數,建議爲cpu核數
mapreduce.map.sort.spill.percent:0.8 溢寫比例
min.num.spill.for.combine:3 spill的文件達到設置的參數進行combiner
 
避免推測執行
mapred.map.tasks.speculative.execution=true
mapred.reduce.tasks.speculative.execution=true
 
自定義partition
適當添加combiner
 
自定義reduce端的grouping Comparator
- mapred.reduce.tasks:手動設置reduce個數
- mapreduce.map.output.compress:map輸出結果是否壓縮
  - mapreduce.map.output.compress.codec
- mapreduce.output.fileoutputformat.compress:job輸出結果是否壓縮
  - mapreduce.output.fileoutputformat.compress.type
  - mapreduce.output.fileoutputformat.compress.codec
 
九、調優文件以及參數
 
1、調優的目的
    充分的利用機器的性能,更快的完成mr程序的計算任務。甚至是在有限的機器條件下,可以支持運行足夠多的mr程序。
 
2、調優的整體概述
    從mr程序的內部運行機制,咱們能夠了解到一個mr程序由mapper和reducer兩個階段組成,其中mapper階段包括數據的讀取、map處理以及寫出操做(排序和合並/sort&merge),而reducer階段包含mapper輸出數據的獲取、數據合併(sort&merge)、reduce處理以及寫出操做。那麼在這七個子階段中,可以進行較大力度的進行調優的就是map輸出、reducer數據合併以及reducer個數這三個方面的調優操做。也就是說雖然性能調優包括cpu、內存、磁盤io以及網絡這四個大方面,可是從mr程序的執行流程中,咱們能夠知道主要有調優的是內存、磁盤io以及網絡。在mr程序中調優,主要考慮的就是減小網絡傳輸和減小磁盤IO操做,故本次課程的mr調優主要包括服務器調優、代碼調優、mapper調優、reducer調優以及runner調優這五個方面。
 
3、服務器調優
    服務器調優主要包括服務器參數調優和jvm調優。在本次項目中,因爲咱們使用hbase做爲咱們分析數據的原始數據存儲表,因此對於hbase咱們也須要進行一些調優操做。除了參數調優以外,和其餘通常的java程序同樣,還須要進行一些jvm調優。
    hdfs調優
    1. dfs.datanode.failed.volumes.tolerated: 容許發生磁盤錯誤的磁盤數量,默認爲0,表示不容許datanode發生磁盤異常。當掛載多個磁盤的時候,能夠修改該值。
    2. dfs.replication: 複製因子,默認3
    3. dfs.namenode.handler.count: namenode節點併發線程量,默認10
    4. dfs.datanode.handler.count:datanode之間的併發線程量,默認10。
    5. dfs.datanode.max.transfer.threads:datanode提供的數據流操做的併發線程量,默認4096。
        通常將其設置爲linux系統的文件句柄數的85%~90%之間,查看文件句柄數語句ulimit -a,修改vim /etc/security/limits.conf, 不能設置太大文件末尾,添加
            * soft nofile 65535
            * hard nofile 65535
            注意:句柄數不可以太大,能夠設置爲1000000如下的全部數值,通常不設置爲-1。
            異常處理:當設置句柄數較大的時候,從新登陸可能出現unable load session的提示信息,這個時候採用單用戶模式進行修改操做便可。
                單用戶模式:
                    啓動的時候按'a'鍵,進入選擇界面,而後按'e'鍵進入kernel修改界面,而後選擇第二行'kernel...',按'e'鍵進行修改,在最後添加空格+single便可,按回車鍵回到修改界面,最後按'b'鍵進行單用戶模式啓動,當啓動成功後,還原文件後保存,最後退出(exit)重啓系統便可。
    6. io.file.buffer.size: 讀取/寫出數據的buffer大小,默認4096,通常不用設置,推薦設置爲4096的整數倍(物理頁面的整數倍大小)。
    mapreduce調優
    1. mapreduce.task.io.sort.factor: mr程序進行合併排序的時候,打開的文件數量,默認爲10個.
    2. mapreduce.task.io.sort.mb: mr程序進行合併排序操做的時候或者mapper寫數據的時候,內存大小,默認100M
    3. mapreduce.map.sort.spill.percent: mr程序進行flush操做的閥值,默認0.80。
    4. mapreduce.reduce.shuffle.parallelcopies:mr程序reducer copy數據的線程數,默認5。
    5. mapreduce.reduce.shuffle.input.buffer.percent: reduce複製map數據的時候指定的內存堆大小百分比,默認爲0.70,適當的增長該值能夠減小map數據的磁盤溢出,可以提升系統性能。
    6. mapreduce.reduce.shuffle.merge.percent:reduce進行shuffle的時候,用於啓動合併輸出和磁盤溢寫的過程的閥值,默認爲0.66。若是容許,適當增大其比例可以減小磁盤溢寫次數,提升系統性能。同mapreduce.reduce.shuffle.input.buffer.percent一塊兒使用。
    7. mapreduce.task.timeout:mr程序的task執行狀況彙報過時時間,默認600000(10分鐘),設置爲0表示不進行該值的判斷。
    
4、代碼調優
    代碼調優,主要是mapper和reducer中,針對屢次建立的對象,進行代碼提出操做。這個和通常的java程序的代碼調優同樣。
    
5、mapper調優
    mapper調優主要就是就一個目標:減小輸出量。咱們能夠經過增長combine階段以及對輸出進行壓縮設置進行mapper調優。
    combine介紹:
        實現自定義combine要求繼承reducer類,特色:
        以map的輸出key/value鍵值對做爲輸入輸出鍵值對,做用是減小網絡輸出,在map節點上就合併一部分數據。
        比較適合,map的輸出是數值型的,方便進行統計。
    壓縮設置:
        在提交job的時候分別設置啓動壓縮和指定壓縮方式。
 
6、reducer調優
    reducer調優主要是經過參數調優和設置reducer的個數來完成。
    reducer個數調優:
        要求:一個reducer和多個reducer的執行結果一致,不能由於多個reducer致使執行結果異常。
        規則:通常要求在hadoop集羣中的執行mr程序,map執行完成100%後,儘可能早的看到reducer執行到33%,能夠經過命令hadoop job -status job_id或者web頁面來查看。
            緣由: map的執行process數是經過inputformat返回recordread來定義的;而reducer是有三部分構成的,分別爲讀取mapper輸出數據、合併全部輸出數據以及reduce處理,其中第一步要依賴map的執行,因此在數據量比較大的狀況下,一個reducer沒法知足性能要求的狀況下,咱們能夠經過調高reducer的個數來解決該問題。
        優勢:充分利用集羣的優點。
        缺點:有些mr程序無法利用多reducer的優勢,好比獲取top n的mr程序。
 
7、runner調優
    runner調優其實就是在提交job的時候設置job參數,通常均可以經過代碼和xml文件兩種方式進行設置。
    1~8詳見ActiveUserRunner(before和configure方法),9詳解TransformerBaseRunner(initScans方法)
    1. mapred.child.java.opts: 修改childyard進程執行的jvm參數,針對map和reducer均有效,默認:-Xmx200m
    2. mapreduce.map.java.opts: 需改map階段的childyard進程執行jvm參數,默認爲空,當爲空的時候,使用mapred.child.java.opts。
    3. mapreduce.reduce.java.opts:修改reducer階段的childyard進程執行jvm參數,默認爲空,當爲空的時候,使用mapred.child.java.opts。
    4. mapreduce.job.reduces: 修改reducer的個數,默認爲1。能夠經過job.setNumReduceTasks方法來進行更改。
    5. mapreduce.map.speculative:是否啓動map階段的推測執行,默認爲true。其實通常狀況設置爲false比較好。可經過方法job.setMapSpeculativeExecution來設置。
    6. mapreduce.reduce.speculative:是否須要啓動reduce階段的推測執行,默認爲true,其實通常狀況設置爲fase比較好。可經過方法job.setReduceSpeculativeExecution來設置。
    7. mapreduce.map.output.compress:設置是否啓動map輸出的壓縮機制,默認爲false。在須要減小網絡傳輸的時候,能夠設置爲true。
    8. mapreduce.map.output.compress.codec:設置map輸出壓縮機制,默認爲org.apache.hadoop.io.compress.DefaultCodec,推薦使用SnappyCodec
    9. hbase參數設置
        因爲hbase默認是一條一條數據拿取的,在mapper節點上執行的時候是每處理一條數據後就從hbase中獲取下一條數據,經過設置cache值能夠一次獲取多條數據,減小網絡數據傳輸。
 
源碼:
一、設置map端的數量:mapreduce.input.fileinputformat.split.minsize
位置FileInputFormat.getSplits()方法
(1)輸入文件size巨大,但不是小文件
減少map的數量:增大mapred.min.split.size的值
(2)輸入文件數量巨大,且都是小文件
使用FileInputFormat衍生的CombineFileInputFormat將多個input path合併成一個InputSplit送給mapper處理,從而減小mapper的數量
 
二、增長Map-Reduce job 啓動時建立的Mapper數量
能夠經過減少每一個mapper的輸入作到,即減少blockSize或者減少mapred.min.split.size的值,設置blockSize通常不可行
相關文章
相關標籤/搜索