Hadoop要點總結

Hadoop是一個由Apache基金會所開發的分佈式系統基礎架構,主要解決,海量數據的存儲和海量數據的分析計算問題。java

HDFS

NameNode工做機制

  1. 加載fsimage(鏡像文件)和edits.001(編輯日誌)到內存中
  2. 客戶端向namenode發起增刪改查請求
  3. namenode記錄操做日誌,更新滾動日誌
  4. namenode在內存中對數據進行增刪改查

Secondary NameNode工做機制

  1. 向namenode詢問是否須要checkpointnode

    checkpoint觸發條件:web

    1. 定時時間到了
    2. edits中數據滿了
  2. 請求執行checkpoint
  3. namenode滾動正在寫的edits
  4. 拷貝編輯日誌、鏡像文件到secondary namenode中
  5. secondary namenode把拷貝來的鏡像文件和編輯日誌合併
  6. 生成新的fsimage命名爲fsimage.chkpoint
  7. 將fsimage.chkpoint拷貝到namenode
  8. 重命名生成fsimage

HDFS寫數據流程

  1. 客戶端向namenode請求上傳文件,namenode檢查目標文件是否已存在,父目錄是否存在。
  2. namenode返回是否能夠上傳
  3. 客戶端請求第一個block上傳到哪幾個datanode服務器上
  4. namenode返回三個datanode節點,分別爲dn1,dn2,dn3
  5. 客戶端請求dn1上傳數據,dn1收到請求會繼續調用dn2,而後dn2調用dn3,將這個通訊管道創建完成
  6. dn一、dn二、dn3逐級應答客戶端
  7. 客戶端開始往dn1上傳第一個block,以packet爲單位,dn1收到一個packet就會傳給dn2,dn2傳給dn3,dn1每傳一個packet會放入一個應答隊列等待應答。
  8. 當一個block傳輸完成以後,客戶端再次請求namenode上傳第二個block的服務器。(重複執行3-7步)

HDFS讀數據流程

  1. 客戶端向namenode請求下載文件,namenode經過查詢元數據,找到文件塊所在的datanode地址
  2. 挑選一臺datanode(就近原則,而後隨機)服務器,請求讀取數據
  3. datanode開始傳輸數據給客戶端(從磁盤裏讀取數據放入流,以packet爲單位作校驗)
  4. 客戶端以packet單位接收,先緩存在本地,而後寫入目標文件中

SecondaryNameNode目錄結構

$HADOOP_HOME/data/tmp/dfs/namesecondary/current這個目錄中查看SecondaryNameNode目錄結構。
在主namenode發生故障時(假設沒有及時備份數據),能夠從SecondaryNameNode恢復數據:
方案一:將SecondaryNameNode中數據拷貝到namenode存儲數據的目錄。
方案二:使用-importCheckpoint選項啓動namenode守護進程,從而將SecondaryNameNode中數據拷貝到namenode目錄中。(極慢)數據庫

DataNode工做機制

  1. 一個數據塊在datanode上以文件形式存儲在磁盤上,包括兩個文件,一個是數據自己,一個是元數據包括數據塊長度,塊數據校驗和以及時間戳
  2. datanode啓動後向namenode註冊,經過後,週期性(1小時)的向namenode上報全部的塊信息。
  3. 心跳是3秒一次,心跳返回結果帶有namenode給該datanode的命令如複製數據塊到另外一臺機器,或刪除某個數據塊。若是超過10分鐘沒有收到某個datanode的心跳,則認爲該節點不可用。
  4. 集羣運行中能夠安全加入和退出一些機器。

服役新節點

1.在namenode主機的$HADOOP_HOME/etc/hadoop目錄下建立dfs.hosts文件,在該文件中添加全部主機名(包括新節點)e.g.apache

hadoop102
hadoop103
hadoop104
hadoop105

2.在namenode的hdfs-site.xml配置文件中增長dfs.hosts屬性瀏覽器

<property>
    <name>dfs.hosts</name>
    <value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts</value>
</property>

3.刷新namenode:hdfs dfsadmin -refreshNodes
4.更新resourcemanager節點:yarn rmadmin -refreshNodes
5.在namenode的slaves文件中增長新主機名稱
6.單獨命令啓動新的數據節點和節點管理器
hadoop-daemon.sh start datanode
yarn-daemon.sh start nodemanager緩存

退役舊節點

1.在namenode的/opt/module/hadoop-2.7.2/etc/hadoop目錄下建立dfs.hosts.exclude文件,向其中添加要退役的主機名稱
2.在namenode的hdfs-site.xml配置文件中增長dfs.hosts.exclude屬性安全

<property>
    <name>dfs.hosts.exclude</name>
    <value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts.exclude</value>
</property>

3.刷新namenode、刷新resourcemanager
hdfs dfsadmin -refreshNodes
yarn rmadmin -refreshNodes
4.在web瀏覽器端檢查,退役節點的狀態爲decommission in progress(退役中),說明數據節點正在複製塊到其餘節點。
5.等待退役節點狀態爲decommissioned(全部塊已經複製完成),中止該節點及節點資源管理器。
hadoop-daemon.sh stop datanode
yarn-daemon.sh stop nodemanager性能優化

PS:若是副本數是3,服役的節點小於等於3,是不能退役成功的,須要修改副本數後才能退役。

6.從include文件中刪除退役節點,再運行刷新節點的命令
7.從namenode的slave文件中刪除退役節點服務器

MapReduce

Writable序列化

Java的序列化是一個重量級序列化框架(Serializable),一個對象被序列化後,會附帶不少額外的信息(各類校驗信息,header,繼承體系等),不便於在網絡中高效傳輸。因此,hadoop本身開發了一套序列化機制(Writable),精簡、高效。

經常使用的數據類型對應的hadoop數據序列化類型

boolean --> BooleanWritable
byte --> ByteWritable
int --> IntWritable
float --> FloatWritable
long --> LongWritable
double --> DoubleWritable
string --> Text
map --> MapWritable
array --> ArrayWritable

自定義bean對象實現序列化接口

  • 必須實現Writable接口
  • 反序列化時,須要反射調用空參構造函數,因此必須有空參構造
  • 重寫序列化方法
  • 重寫反序列化方法(注意反序列化的順序和序列化的順序徹底一致)
  • 要想把結果顯示在文件中,須要重寫toString()
  • 若是須要將自定義的bean放在key中傳輸,則還須要實現comparable接口,由於mapreduce框中的shuffle過程必定會對key進行排序

MapTask並行度決定機制

  • 一個job的map階段MapTask並行度(個數),由客戶端提交job時的切片個數決定。
  • 每個切片分配一個MapTask並行實例
  • 切片大小默認=blocksize
  • 切片時針對每個文件單獨切片,不考慮數據集總體

MapTask工做機制

Read階段

maptask調用InputFormat,InputFormat又調用RecordReader從輸入文件中解析出一個個K/V。

Map階段

將解析出的K/V交給客戶端map()方法處理,併產生新的K/V。

Collect階段

當map()方法處理完數據後,通常會調用OutputCollector.collect()輸出結果,在該函數內部調用Partitioner對K/V進行分區,且根據K進行分區內排序,並寫入一個環形緩衝區中。

溢寫階段

當環形緩衝區達到80%時,會將數據寫到本地磁盤上生成一個臨時文件。

將數據寫入本地磁盤以前,先要對數據進行一次本地排序,並在必要時對數據進行合併、壓縮等操做。

Combine階段

當全部數據處理完成後,MapTask對全部臨時文件進行一次合併,以確保最終只會生成一個數據文件,可避免同時打開大量文件和同時讀取大量小文件產生的隨機讀取帶來的開銷。

自定義InputFormat

編寫一個類繼承FileInputFormat

public class WholeInputFormat extends FileInputFormat<NullWritable, BytesWritable> {

    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        // 自定義recordreader
        WholeRecordReader recordReader = new WholeRecordReader();
        recordReader.initialize(split, context);
        return recordReader;
    }

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }    
}

自定義一個RecordReader

public class WholeRecordReader extends RecordReader<NullWritable, BytesWritable> {

    private FileSplit split;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();

    private boolean processed = false;

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

        this.split = (FileSplit) split;
        this.conf = context.getConfiguration();

    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
            byte[] contents = new byte[(int) split.getLength()];
            Path path = split.getPath();
            FileSystem fs = path.getFileSystem(conf);
            FSDataInputStream fis = null;

            try {
                fis = fs.open(path);
                IOUtils.readFully(fis, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            } finally {
                IOUtils.closeStream(fis);
            }
            processed = true;
            return true;
        }
        return false;
    }

    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return this.value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return processed ? 1 : 0;
    }

    @Override
    public void close() throws IOException {

    }
}

自定義RecordReader重點是nextKeyValue()方法,它定義瞭如何封裝向maptask輸入的鍵值對,本例中是將每一個文件的全部內容做爲value輸入到maptask中。

大量小文件的切片優化(CombineTextInputFormat)

// 若是不設置InputFormat,它默認用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

通過以上設置,可有效減小切片數
number of splits

Shuffle機制

MapReduce保證每一個reducer的輸入都是按鍵有序排列的,系統執行排序的過程(即將map輸出做爲輸入傳給reducer)稱爲shuffle。
shuffle

Partition分區

默認partition分區

public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}

默認採用的是org.apache.hadoop.mapreduce.lib.partition.HashPartitioner,當中的getPartition()方法是根據key的hashCode對reduceTasks個數取模獲得的。

用戶自定義分區

1.繼承Partitioner,重寫getPartition()方法

public class MyPartitioner extends Partitioner<K, V> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        int partition=xxx;

        if (exp0) {
            partition = 0;
        }else if (exp1) {
            partition = 1;
        }else if (exp2) {
            partition = 2;
        }else if (exp3) {
            partition = 3;
        }
        return partition;
    }
}

2.在job驅動中,設置自定義partitioner

job.setPartitionerClass(MyPartitioner.class)

3.根據自定義partitioner的邏輯設置相應數量的reduce task

job.setNumReduceTasks(5);
PS:
若是reduceTask的數量> getPartition的結果數,則會多產生幾個空的輸出文件part-r-000xx;
若是1<reduceTask的數量<getPartition的結果數,則有一部分分區數據無處安放,會產生異常;
若是reduceTask的數量=1,則無論mapTask端輸出多少個分區文件,最終結果都交給這一個reduceTask,最終也就只會產生一個結果文件 part-r-00000。

WritableComparable排序

bean對象實現WritableComparable接口重寫compareTo方法,就能夠實現排序

@Override
public int compareTo(Bean o) {
    // 倒序排列,從大到小
    return this.xxx > o.getXxx() ? -1 : 1;
}

GroupingComparator分組

對reduce階段的數據根據某一個或幾個字段進行分組。

public class OrderGroupingComparator extends WritableComparator {

    protected OrderGroupingComparator() {
        // 記得此處設置true,不然會報空指針異常
        super(OrderBean.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean abean = (OrderBean) a;
        OrderBean bbean = (OrderBean) b;
        return abean.getOrder_id().compareTo(bbean.getOrder_id());
    }
}

Combiner合併

combiner是MR程序中Mapper和Reducer以外的一種組件,combiner組件的父類就是Reducer,combiner和reducer的區別在於運行的位置:Combiner是在每個maptask所在的節點運行,Reducer是接收全局全部Mapper的輸出結果。combiner的意義就是對每個maptask的輸出進行局部彙總,以減少網絡傳輸量。

自定義Combiner實現步驟

1.自定義一個combiner繼承Reducer,重寫reduce方法

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {
        int count = 0;
        for(IntWritable v :values){
            count = v.get();
        }
        context.write(key, new IntWritable(count));
    }
}

2.在job驅動類中設置

job.setCombinerClass(WordcountCombiner.class);
combiner可以應用的前提是不能影響最終的業務邏輯,並且,combiner的輸出kv應該跟reducer的輸入kv類型要對應起來。

從控制檯觀察使用combiner先後變化:
使用combiner前使用combiner後

數據傾斜&Distributedcache

數據傾斜緣由

若是是多張表的操做都是在reduce階段完成,reduce端的處理壓力太大,map節點的運算負載則很低,資源利用率不高,且在reduce階段極易產生數據傾斜。

解決方案

在map端緩存多張表,提早處理業務邏輯,這樣增長map端業務,減小reduce端數據的壓力,儘量的減小數據傾斜。

  1. 在mapper的setup階段,將文件讀取到緩存集合中
  2. 在驅動函數中加載緩存
job.addCacheFile(new URI("file:/e:/cache/pd.txt")); // 緩存普通文件到task運行節點

自定義OutputFormat

1.自定義一個outputformat繼承org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)
            throws IOException, InterruptedException {
        // 建立一個RecordWriter
        return new FilterRecordWriter(job);
    }
}

2.具體的寫數據RecordWriter

public class FilterRecordWriter extends RecordWriter<Text, NullWritable> {
    FSDataOutputStream atguiguOut = null;
    FSDataOutputStream otherOut = null;

    public FilterRecordWriter(TaskAttemptContext job) {
        // 1 獲取文件系統
        FileSystem fs;
        try {
            fs = FileSystem.get(job.getConfiguration());
            // 2 建立輸出文件路徑
            Path atguiguPath = new Path("e:/atguigu.log");
            Path otherPath = new Path("e:/other.log");
            // 3 建立輸出流
            atguiguOut = fs.create(atguiguPath);
            otherOut = fs.create(otherPath);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        // 判斷是否包含「atguigu」輸出到不一樣文件
        if (key.toString().contains("atguigu")) {
            atguiguOut.write(key.toString().getBytes());
        } else {
            otherOut.write(key.toString().getBytes());
        }
    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        // 關閉資源
        if (atguiguOut != null) {
            atguiguOut.close();
        }
        
        if (otherOut != null) {
            otherOut.close();
        }
    }
}

數據壓縮

在Map輸出端採用壓縮

// 開啓map端輸出壓縮
configuration.setBoolean("mapreduce.map.output.compress", true);
// 設置map端輸出壓縮方式
configuration.setClass("mapreduce.map.output.compress.codec", GzipCodec.class, CompressionCodec.class);

在reduce輸出端壓縮

// 設置reduce端輸出壓縮開啓
FileOutputFormat.setCompressOutput(job, true);
//      設置壓縮的方式
        FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); 
//        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); 
//        FileOutputFormat.setOutputCompressorClass(job, Lz4Codec.class); 
//        FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);

ReduceTask工做機制

Copy階段

ReduceTask從各個MapTask遠程拷貝一片數據,若是某一片數據大小超過閾值,則寫到磁盤上,不然直接放在內存中。

Merge階段

在遠程拷貝數據時,reducetask後臺啓動了兩個線程對內存和磁盤上的文件進行合併,以防止內存或硬盤使用過多。

Sort階段

用戶編寫的reduce()方法輸入數據是按照key進行彙集的,爲了將key相同的數據聚在一塊兒,Hadoop採用了基於排序的策略。因爲各個MapTask已經實現對本身的處理結果進行了局部排序,所以,ReduceTask只需對全部數據進行一次歸併排序便可。

Reduce階段

reduce()函數將計算結果寫到HDFS上。

MapReduce參數優化

資源相關參數

用戶在mr應用程序中配置能夠生效:

  • mapreduce.map.memory.mb 一個Map Task可以使用的資源上限(單位:MB),默認爲1024。若是Map Task實際使用的資源量超過該值,則會被強制殺死。
  • mapreduce.reduce.memory.mb 一個Reduce Task可以使用的資源上限(單位:MB),默認爲1024。若是Reduce Task實際使用的資源量超過該值,則會被強制殺死。
  • mapreduce.map.java.opts Map Task的JVM參數,你能夠在此配置默認的java heap size等參數, e.g."-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc" (@taskid@會被Hadoop框架自動換爲相應的taskid), 默認值: ""
  • mapreduce.reduce.java.opts Reduce Task的JVM參數,你能夠在此配置默認的java heap size等參數, e.g."-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc", 默認值: ""
  • mapreduce.map.cpu.vcores 每一個Map task可以使用的最多cpu core數目, 默認值: 1
  • mapreduce.reduce.cpu.vcores 每一個Reduce task可以使用的最多cpu core數目, 默認值: 1

應該在yarn啓動以前就配置在服務器的配置文件中才能生效:

  • yarn.scheduler.minimum-allocation-mb 1024 給應用程序container分配的最小內存
  • yarn.scheduler.maximum-allocation-mb 8192 給應用程序container分配的最大內存

shuffle性能優化的關鍵參數,應在yarn啓動以前就配置好:

  • mapreduce.task.io.sort.mb 100 shuffle的環形緩衝區大小,默認100m
  • mapreduce.map.sort.spill.percent 0.8 環形緩衝區溢出的閾值,默認80%

容錯相關參數

  • mapreduce.map.maxattempts 每一個Map Task最大重試次數,一旦重試參數超過該值,則認爲Map Task運行失敗,默認值:4。
  • mapreduce.reduce.maxattempts 每一個Reduce Task最大重試次數,一旦重試參數超過該值,則認爲Map Task運行失敗,默認值:4。
  • mapreduce.map.failures.maxpercent 當失敗的Map Task失敗比例超過該值爲,整個做業則失敗,默認值爲0. 若是你的應用程序容許丟棄部分輸入數據,則該該值設爲一個大於0的值,好比5,表示若是有低於5%的Map Task失敗(若是一個Map Task重試次數超過mapreduce.map.maxattempts,則認爲這個Map Task失敗,其對應的輸入數據將不會產生任何結果),整個做業扔認爲成功。
  • mapreduce.reduce.failures.maxpercent 當失敗的Reduce Task失敗比例超過該值爲,整個做業則失敗,默認值爲0。
  • mapreduce.task.timeout Task超時時間,常常須要設置的一個參數,該參數表達的意思爲:若是一個task在必定時間內沒有任何進入,即不會讀取新的數據,也沒有輸出數據,則認爲該task處於block狀態,多是卡住了,也許永遠會卡主,爲了防止由於用戶程序永遠block住不退出,則強制設置了一個該超時時間(單位毫秒),默認是300000。若是你的程序對每條輸入數據的處理時間過長(好比會訪問數據庫,經過網絡拉取數據等),建議將該參數調大,該參數太小常出現的錯誤提示是「AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.」。

Yarn

Yarn是一個資源調度平臺,負責爲運算程序提供服務器運算資源,至關於一個分佈式的操做系統平臺,而mapreduce等運算程序則至關於運行於操做系統之上的應用程序。

yarn工做機制

  1. MR程序提交到客戶端所在節點
  2. yarnrunner向resourcemanager申請一個application
  3. rm將該應用程序的資源路徑返回給yarnrunner
  4. 將改程序所須要的資源提交到HDFS上
  5. 程序資源提交完畢後,申請運行MrAppMaster
  6. mr將用戶請求初始化成一個task
  7. 其中一個nodemanager領取到task任務
  8. 該nodemanager建立容器container併產生MRAppMaster
  9. container從HDFS上拷貝資源到本地
  10. MrAppMaster向RM申請運行maptask容器
  11. rm將運行maptask任務分配給另外兩個nodemanager,另外兩個nodemanager分別領取任務並建立容器
  12. MR向兩個接收到任務的nodemanager發送程序啓動腳本,這兩個nodemanager分別啓動maptask,maptask對數據分區排序
  13. MRAppMaster向rm申請兩個容器,運行reducetask
  14. reducetask向maptask獲取相應分區數據
  15. 程序運行結束後,MR向RM註銷本身
相關文章
相關標籤/搜索