Hadoop淺度學習指南(HDFS、YARN、MapReduce)

大數據

  1. 概念:big data
  2. 5V特徵:java

    1. Volume:量大
    2. Value:價值高,價值密度低
    3. Variety:多樣性
    4. Velocity:速度快
    5. Veracity:準確性

hadoop

主要組成

GFS --> HDFS
MapReduce --> MapReduce
BigTable -- > HBasenode

模塊

  • Hadoop Common: The common utilities that support the other Hadoop modules.支持hadoop其餘模塊的通常工具
  • Hadoop Distributed File System (HDFS™): A distributed file system that provides high-throughput access to application data.高吞吐分佈式文件系統
  • Hadoop YARN: A framework for job scheduling and cluster resource management. 資源調度和任務管理
  • Hadoop MapReduce: A YARN-based system for parallel processing of large data sets.基於yarn的大數據並行處理系統

HDFS

組成

  • namenode:管理元數據,處理來自客戶端的請求
    元數據:描述數據屬性的數據、描述數據的數據
  • secondarynamenode:元數據的合併
  • datanode:具體數據的讀寫
  • client:文件讀寫請求的發起

HDFS機制

namenode

  • 負責元數據的管理,DataNode負責處理文件內容的讀寫請求
  • 處理client的讀寫的請求,負責管理文件系統的名字空間(namespace)以及客戶端對文件的訪問
  • 副本存放在哪些DataNode上由 NameNode來控制,根據全局狀況作出塊放置決定,讀取文件時NameNode儘可能讓用戶先讀取最近的副本,下降帶塊消耗和讀取時延
  • 全權管理數據塊的複製、它週期性地從集羣中的每一個Datanode接收心跳信號和塊狀態報告(Blockreport),塊狀態報告包含了一個該Datanode上全部數據塊的列表

datanode

  • 一個數據塊在DataNode以文件存儲在磁盤上,包括數據塊自己、數據塊的元數據(數據塊的長度,塊數據的校驗和,以及時間戳)
  • DataNode啓動後向NameNode註冊,經過後,週期性(1小時)的向NameNode上報全部的塊信息
  • 心跳是每3秒一次,心跳返回結果帶有NameNode給該DataNode的命令如複製塊數據到另外一臺機器,或刪除某個數據塊。若是超過10分鐘沒有收到某個DataNode 的心跳,則認爲該節點不可用。

文件

  • block 默認128M,每一個塊有多個副本存儲在不一樣的機器上
  • NameNode 是主節點,存儲文件的元數據如文件名,文件目錄結構,文件屬性(生成時間,副本數,文件權限),以及每一個文件的塊列表以及塊所在的DataNode等等
  • DataNode 在本地文件系統存儲文件塊數據,以及塊數據的校驗和
  • 能夠建立、刪除、移動或重命名文件,當文件建立、寫入和關閉以後不能修改文件內容

namenode從datanode接受心跳和塊報告

  • namenode啓動後,datanode向namenode進行註冊
  • 心跳

    心跳是每3秒一次,linux

    心跳返回結果帶有NameNode給該DataNode的命令如刪除塊,
    複製塊等apache

    若是超過10分鐘沒有收到某個DataNode 的心跳,則認爲該
    節點不可用編程

  • 塊報告

    DataNode啓動後向NameNode註冊,windows

    經過後,週期性(1小時)的向NameNode上報全部的塊信息centos

  • 塊損壞

    當DataNode讀取block的時候,從新計算checksum,和建立
    時的對比緩存

    DataNode 在其文件建立後三週驗證其checksum安全

  • HDFS有哪些進程

    NameNode服務器

    DataNode

    NodeManager

    ResourceManager

NameNode啓動過程

  • NameNode元數據/命名空間持久化fsimage與edits
  • NameNode格式化,具體作什麼事

    建立fsimage文件,存儲fsimage信息

    建立edits文件

  • NameNode 啓動過程

    加載fsimage和edits文件

    生成新的fsimage和edits文件

    等待DataNode註冊與發送Block Report

  • DataNode 啓動過程

    向NameNode註冊、發送Block Report

  • NameNode SafeMode 安全模式

    namenode啓動時會進入安全模式,此時只可讀不可寫


  1. Name啓動的時候首先將fsimage(鏡像)載入內存,並執行(replay)編輯日誌editlog的的各項操做;
  2. 一旦在內存中創建文件系統元數據映射,則建立一個新的fsimage文件(這個過程不需SecondaryNameNode) 和一個空的editlog;
  3. 在安全模式下,各個datanode會向namenode發送塊列表的最新狀況;
  4. 此刻namenode運行在安全模式。即NameNode的文件系統對於客服端來講是隻讀的。(顯示目錄,顯示文件內容等。寫、刪除、重命名都會失敗);
  5. NameNode開始監聽RPC和HTTP請求
    解釋RPC:RPC(Remote Procedure Call Protocol)——遠程過程經過協議,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議;
  6. 系統中數據塊的位置並非由namenode維護的,而是以塊列表形式存儲在datanode中;
  7. 在系統的正常操做期間,namenode會在內存中保留全部塊信息的映射信息。

HDFS啓動流程及元數據的同步

  • 元數據的同步
    流程圖:
    觸發的閾值(hdfs-default.xml)
    dfs.namenode.checkpoint.period 3600
    dfs.namenode.checkpoint.txns 1百萬個事務
  • NameNode 啓動過程

    1. 加載fsimage和edits文件
    2. 合併生成新的fsimage,並生成edits文件
    3. 等待DataNode註冊與發送心跳和Block Report
    4. NameNode 啓動過程當中會進入SafeMode(安全模式)

安全模式

在安全模式下,文件系統不容許修改
目的,是在系統啓動時檢查各個datanode數據的有效性

進入安全模式的三種方式

  1. 手動進入

    $ bin/hdfs dfsadmin -safemode enter

    $ bin/hdfs dfsadmin -safemode leave

  2. namenode啓動會自動進入
  3. 正常塊的個數/總的塊個數<0.999 也會進入安全模式
<property>
        <name>dfs.namenode.safemode.threshold-pct</name>
        <value>0.999f</value>
    </property>

HDFS特色

  • 優勢

    1. 處理超大文件
    2. 一次寫入,屢次讀取
    3. 運行與廉價服務器
    4. 不移動數據到計算點,而是就地計算,減小網絡阻塞
  • 缺點:

    1. 高延遲,不適合接入前臺業務
    2. 不支持任意的修改

HDFS API

Java API

package com.ct.test;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;

import org.apache.commons.compress.utils.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.junit.Before;
import org.junit.Test;

public class TestDemo {
    
    FileSystem fs = null;
    
//    public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
//        
////        FileSystem fs = FileSystem.get(new URI("hdfs://centos01:8020"),
////                new Configuration(),
////                "chen");
////        
////        boolean success = fs.mkdirs(new Path("/test"));
////        
////        System.out.println(success);
////        test.setUp();
////        test.testMkdir();
////        test.testDelete();
//        
//        
//        
//        
//    }
    @Before
    //獲取文件對象
    public void setUp() {
        Configuration conf = new Configuration();
        conf.set("dfs.replication", "7");
        
        try {
            fs = FileSystem.get(new URI("hdfs://centos01:8020"), 
                    conf, 
                    "chen");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (URISyntaxException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    //建立文件夾
    @Test
    public void testMkdir() throws IllegalArgumentException, IOException {
        boolean success = fs.mkdirs(new Path("/result"));
        System.out.println(success);
    }
    
    
    //刪除文件夾
    public void testDelete() throws IllegalArgumentException, IOException {
        fs.delete(new Path("/result"), true);
    }
    
    @Test
    //上傳文件
    public void testUpload() throws IllegalArgumentException, IOException {
        FSDataOutputStream out = fs.create(new Path("/input/testUpload.log"));
        FileInputStream input = new FileInputStream("F:/test.txt");
        
        IOUtils.copy(input, out, 1024);
    }
    
    @Test
    public void testDownload() throws IllegalArgumentException, IOException {
        FSDataInputStream input = fs.open(new Path("/input/testUpload.log"));
        FileOutputStream out = new FileOutputStream("F:/test-copy.txt");
        
        IOUtils.copy(input, out, 1024);

    }
    
    @Test
    public void testList() throws FileNotFoundException, IllegalArgumentException, IOException {
        RemoteIterator<LocatedFileStatus> ri = fs.listFiles(new Path("/input"), true);
        
        while(ri.hasNext()) {
            LocatedFileStatus next = ri.next();
            next.getBlockLocations();
            String group = next.getGroup();
            long len = next.getLen();
            String owner = next.getOwner();
            FsPermission permission = next.getPermission();
            long blockSize = next.getBlockSize();
            short rep = next.getReplication();
            
            System.out.println(permission+"\t"+owner+"\t"+group);
            System.out.println(len+"\t"+blockSize+"\t"+rep);

            BlockLocation[] blockLocations = next.getBlockLocations();
            for (BlockLocation blktn : blockLocations) {
                System.out.println("length:"+blktn.getLength());
                System.out.println("offset:"+blktn.getOffset());
                System.out.println(Arrays.toString(blktn.getHosts()));
            }
            
        }
    }

}

HDFS讀流程

read

  1. 打開分佈式文件調用 分佈式文件DistributedFileSystem.open()方法
  2. 從 NameNode 得到 DataNode 地址DistributedFileSystem 使用 RPC 調用 NameNode,NameNode返回存有該副本的 DataNode 地址,DistributedFileSystem 返回一個輸入流 FSDataInputStream對象,該對象封存了輸入流DFSInputStream
  3. 鏈接到DataNode調用 輸入流 FSDataInputStream 的 read() 方法,從而 輸入流DFSInputStream 鏈接 DataNodes
  4. 讀取DataNode反覆調用 read()方法,從而將數據從 DataNode 傳輸到客戶端
  5. 讀取另外的DataNode直到完成到達塊的末端時候,輸入流 DFSInputStream 關閉與DataNode鏈接, 尋找下一個 DataNode
  6. 完成讀取,關閉鏈接,即調用輸入流 FSDataInputStream.close()

HDFS寫流程

write

  1. 發送建立文件請求:調用分佈式文件系統DistributedFileSystem.create()方法
  2. NameNode中建立文件記錄:分佈式文件系統DistributedFileSystem 發送 RPC 請求給namenode,namenode 檢查權限後建立一條記錄,返回輸出流 FSDataOutputStream,封裝了輸出流 DFSOutputDtream
  3. 客戶端寫入數據:輸出流 DFSOutputDtream 將數據分紅一個個的數據包,並寫入內部隊列。DataStreamer 根據 DataNode 列表來要求 namenode 分配適合的新塊來存儲數據備份。一組DataNode 構成管線(管線的 DataNode 之間使用 Socket 流式通訊)
  4. 使用管線傳輸數據:DataStreamer 將數據包流式傳輸到管線第一個DataNode,第一個DataNode 再傳到第二個DataNode ,直到完成。
  5. 確認隊列:DataNode 收到數據後發送確認,管線的DataNode全部的確認組成一個確認隊列。全部DataNode 都確認,管線數據包刪除。
  6. 關閉:客戶端對數據量調用close()方法。將剩餘全部數據寫入DataNode管線,並聯系NameNode且發送文件寫入完成信息以前等待確認。
  7. NameNode確認
  8. 故障處理:若過程當中發生故障,則先關閉管線, 把隊列中全部數據包添加回去隊列,確保數據包不漏。爲另外一個正常DataNode的當前數據塊指定一個新的標識,並將該標識傳送給NameNode, 一遍故障DataNode在恢復後刪除上面的不完整數據塊. 從管線中刪除故障DataNode 並把餘下的數據塊寫入餘下正常的DataNode。NameNode發現複本兩不足時,會在另外一個節點建立一個新的複本

YARN

組成

  • resourcemanger:負責全局的任務調度和資源管理(內存、CPU)、啓動/監控applicationMaster 、監控NodeManager
  • nodemanger:單個節點的資源管理、處理來自resourcemanger和applicationmaster的任務請求
  • client:發起任務的請求
  • container:對環境的抽象,封裝了CPU、內存、環境變量
  • applicationmaster:負責管理應用,爲應用申請資源,任務的監控和容錯

服務功能

  • ResourceManager

    • 處理客戶端請求
    • 啓動/監控ApplicationMaster
    • 監控NodeManager
    • 資源分配與調度
  • NodeManager

    • 單個節點上的資源管理
    • 處理來自ResourceManager的命令
    • 處理來自ApplicationMaster的命令
  • ApplicationMaster

    • 數據切分
    • 爲應用程序申請資源,並分配給內部任務
    • 任務監控與容錯
  • Container

    • 對任務運行環境的抽象,封裝了CPU、內存等多維資源以及環境變量、啓動命令等任務運行相關的信息

YARN工做流程

  1. 客戶端向ResourceManager提交應用程序,其中包括ApplicationMaster、啓動ApplicationMaster的命令、用戶程序等;
  2. ResourceManager爲該應用程序分配第一個Container,並與對應NodeManager通訊,要求它在這個Container中啓動應用程序的ApplicationMaster;
  3. ApplicationMaster向ResourceManager註冊本身,啓動成功後與ResourceManager保持心跳;
  4. ApplicationMaster向ResourceManager申請資源;
  5. 申請資源成功後,由ApplicationMaster進行初始化,而後與NodeManager通訊,要求NodeManager啓動Container。而後ApplicationMaster與NodeManager保持心跳,從而對NodeManager上運行的任務進行監控和管理;
  6. Container運行期間,向ApplicationMaster彙報本身的進度和狀態信息,以便ApplicationMaster掌握任務運行狀態,從而在任務失敗是能夠從新啓動;
  7. 應用運行結束後,ApplicationMaster向ResourceManager註銷本身,容許其所屬的Container回收。

MapReduce

Map和Reduce 計算框架,編程模型 「分而治之」的思想, 分佈式並行計算

Mapper

對一些獨立元素組成的列表的每個元素進行制定的操做,可高度並行

// step 1: Map Class
    /**
     * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     * 
     */
    //TODO update paragram
    public static class ModuleMapper extends
            Mapper<LongWritable, Text, Text, IntWritable> {
 
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
        }

Reducer

對一個列表元素進行合併

// step 2: Reduce Class
    /**
     * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     * 
     */
    //TODO
    public static class ModuleReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
 
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            // TODO Auto-generated method stub
        }
    }

Job

// step 3: Driver ,component job, implements Tool
    public int run(String[] args) throws Exception {
        // 1: get configration
        Configuration configuration = getConf();
 
        // 2: create Job
        Job job = Job.getInstance(configuration, this.getClass()
                .getSimpleName());
        // run jar
        job.setJarByClass(this.getClass());
 
        // 3: set job
        // input -> map -> reduce -> output
        // 3.1 input
        Path inPath = new Path(args[0]);
        FileInputFormat.addInputPath(job, inPath);
 
        // 3.2: map
        job.setMapperClass(ModuleMapper.class);
        //TODO update paragram
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
 
        // 3.3: reduce
        job.setReducerClass(ModuleReducer.class);
        //TODO
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
 
        // 3.4: output
        Path outPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outPath);
 
        // 4: submit job
        boolean isSuccess = job.waitForCompletion(true);
 
        return isSuccess ? 0 : 1;
    }

WordCount

package com.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCountDemo extends Configured implements Tool {

    /**
     * map 任務的定義
     * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     * KEYIN    偏移量                    LongWritable
     * VALUEIN    一行文本                    Text
     * KEYOUT    單詞                        Text
     * VALUEOUT    1                        IntWritable
     * 
     * map任務
     * 將一行文本拆分紅單詞
     * 
     *
     */
    
    public static class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        
        Text keyOut = new Text();
        IntWritable valueOut = new IntWritable();
        
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            
            System.out.println("keyIn:"+key+"\t\t"+"valueIn:"+value);
            //1. 單詞拆分
            String[] vals = value.toString().split(" ");
            
            //2. 遍歷輸出
            for (String val : vals) {
                keyOut.set(val);
                valueOut.set(1);
                context.write(keyOut, valueOut);
                
                System.out.println("keyOut:"+keyOut+"\t\t"+"valueOut:"+valueOut);
            }
        }
    }
    
    
    /**
     * 
     * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     * KEYIN    單詞                        Text
     * VALUEIN    單詞次數的集合                list的元素    IntWritable
     * KEYOUT    單詞                        Text
     * VALUEOUT    總次數                    IntWritable
     *
     */
    
    public static class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable>    {
        
        IntWritable valueOut = new IntWritable();
        
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            
            System.out.print("keyIn:"+key+"\t\t[");
            //1. 求次數綜合
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
                
                System.out.print(value+",\t");
            }
            System.out.println("]");
            //2. 輸出
            valueOut.set(sum);
            context.write(key, valueOut);
        }
    }
    
    
    
    @Override
    public int run(String[] args) throws Exception {
        //1 設置job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(this.getClass());
        job.setJobName(this.getClass().getSimpleName());
        
        //2. 設置map類和reduce類
        job.setMapperClass(WCMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        job.setReducerClass(WCReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        //3 設置輸入輸出路徑
        FileInputFormat.setInputPaths(job, args[0]);
        
        Path out = new Path(args[1]);
        FileSystem fs = out.getFileSystem(conf);
        if(fs.exists(out)) {
            fs.delete(out, true);
        }
        FileOutputFormat.setOutputPath(job, out);
        boolean success = job.waitForCompletion(true);
        return success?1:0;
    }
    
    public static void main(String[] args) {
        try {
            int run = ToolRunner.run(new WordCountDemo(), args);
            System.out.println(run==1?"成功":"失敗");
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

MapReduce實現表的join

map join

適合大小表join,將小表緩存在內存中,join發生在map端

只緩存一次,在Mapper子類中重寫setup方法,在setup方法中將小表文件裝入內存中

Mapper子類中map方法讀取大表

package com.join;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MapJoin extends Configured implements Tool {
    

    
    public static class MJMapper extends Mapper<LongWritable, Text, Text, Text> {
        
        HashMap<String, String> cacheMap = new HashMap<String, String>();
        
        // 首相將小表讀入內存
        // 該方法只在每次任務開始時加載一次
        @Override
        protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String path = "F:\\input\\join\\dept.log";
            
            FileReader fr = new FileReader(path);
            BufferedReader br = new BufferedReader(fr);
            
            String line = null;
            while((line=br.readLine()) != null) {
                String[] vals = line.split("\t");
                cacheMap.put(vals[0], vals[1]);
            }
            
            br.close();
            fr.close();
        }
        
        // map端根據兩張表的key進行合併
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String[] vals = value.toString().split("\t");
            
            String deptno = cacheMap.get(vals[2]);
            String dname = cacheMap.get(deptno);
            
            context.write(new Text(deptno), new Text(dname+"\t"+vals[0]+vals[1]));
        }
    }
    
    @Override
    public int run(String[] args) throws Exception {
        //1 設置job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(this.getClass());
        job.setJobName(this.getClass().getSimpleName());
        //2 設置map類和reduce
        job.setMapperClass(MJMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        
        //3 設置輸入輸出路徑
        FileInputFormat.setInputPaths(job, args[0]);
        
        Path out = new Path(args[1]);
        FileSystem fs = out.getFileSystem(conf);
        if(fs.exists(out)) {
            fs.delete(out, true);
        }
        FileOutputFormat.setOutputPath(job, out);
        //4 提交
        boolean success = job.waitForCompletion(true);
        return success?1:0;
    }

    
    public static void main(String[] args) {
        try {
            int run = ToolRunner.run(new MapJoin(), args);
            System.out.println(run==1?"成功":"失敗");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


}

reduce join

適合兩張大表join

package com.join;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReduceJoin extends Configured implements Tool {
    /*
     * 1    技術部
     * 1002    rose    1
     */
    
    public static class RJMapper extends Mapper<LongWritable, Text, Text, Text>{
        Text keyOut = new Text();
        Text valueOut = new Text();

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String[] vals = value.toString().split("\t");
            
            if(vals.length == 2) {
                keyOut.set(vals[0]);
                valueOut.set(vals[1]);
            }else {
                keyOut.set(vals[2]);
                valueOut.set(vals[0]+"\t"+vals[1]);
            }
            context.write(keyOut, valueOut);
            
        }
    }
    
    /*
     * keyIn:1
     * valueIn    List{[1007    lily], [1002    rose], [1001    jack], [技術部]}
     */
     
    // reduce端合併是依靠MapReduce shuffle過程當中將相同key的行放入同一臺機器
    
    public static class RJReducer extends Reducer<Text, Text, Text, Text> {
        ArrayList<String> employees = new ArrayList<String>();
        
        @Override
        protected void reduce(Text keyIn, Iterable<Text> valueIn, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String department = null;
            employees.clear();    //這裏要注意清空list
            
            for (Text tmp : valueIn) {
                String[] vals = tmp.toString().split("\t");
                // 根據length判斷這是張什麼表
                if(vals.length == 1) {
                    department = vals[0];
                }else if(vals.length == 2) {
                    employees.add(tmp.toString());
                }
            }
            
            for (String employee : employees) {
                context.write(keyIn, new Text(employee+"\t"+department));
            }
            
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        //1 設置job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(this.getClass());
        job.setJobName(this.getClass().getSimpleName());
        //2 設置map類和reduce
        job.setMapperClass(RJMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setReducerClass(RJReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //3 設置輸入輸出路徑
        FileInputFormat.setInputPaths(job, args[0]);
        
        Path out = new Path(args[1]);
        FileSystem fs = out.getFileSystem(conf);
        if(fs.exists(out)) {
            fs.delete(out, true);
        }
        FileOutputFormat.setOutputPath(job, out);
        //4 提交
        boolean success = job.waitForCompletion(true);
        return success?1:0;
    }

    
    public static void main(String[] args) {
        try {
            int run = ToolRunner.run(new ReduceJoin(), args);
            System.out.println(run==1?"成功":"失敗");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


}

Hadoop的安裝模式

  1. 單機模式
  2. 僞分佈模式(pseudo)
  3. 徹底分佈模式

hadoop 開發環境搭建

maven環境搭建

  1. 安裝maven

    1. 解壓apache-maven-3.0.5.tar.gz
    2. 配置maven環境變量
      MAVEN_HOME=[maven的解壓目錄]
      %MAVEN_HOME%/bin;
    3. 命令提示符 mvn -version
  2. 解壓repository.tar.gz到windows磁盤(如 E:toolsrepository)
  3. 修改settings.xml配置文件中指定的repository(修改apache-maven-3.0.5confsettings.xml)

    <localRepository>D:/repository</localRepository>
  4. 配置eclipse的maven環境
    windows->preferences->maven->

    ->installations->add->勾選本身安裝的maven
    ->user settings->選擇mave家目錄/conf/settings
  5. 建立maven工程
  6. 將${hadoop_Home}/ect/hadoop/log4j.properties拷貝到項目的src目錄
  7. 修改pom.xml

windows下搭建 hadoop開發環境

  1. Windows安裝hadoop

    1. 解壓hadoop-2.5.0.tar.gz到本地windows磁盤
    2. 配置hadoop的環境變量

      添加環境變量            HADOOP_HOME=hadoop解壓目錄
      在PATH環境變量中追加    %HADOOP_HOME%/bin;
    3. 測試

      hadoop -h
  2. eclipse安裝插件

    1. 解壓eclipse
    2. 將hadoop-eclipse-plugin-2.6.0.jar拷貝到${MyEclispe_HOME}/plugins
    3. 打開(重啓)eclispe,菜單欄->windows->Preferneces->Hadoop MapReduce
  3. eclipse配置插件參數,鏈接HDFS

    1. 在linux中的hadoop安裝目錄下的etc/hadoop/hdfs-site.xml添加以下配置,重啓HDFS的進程
    <!--關閉hdfs的文件權限控制-->
     <property>
         <name>dfs.permissions</name>
             <value>false</value>        
     </property>

    eclipse->windows->show views->other->輸入MapReduce->點擊map reduce locations
    右擊->new hadoop locations

    1. Map/Reduce Master

      Mapreduce(V2) 
      host:[hostname]
      port:8032            //resourcemanager 的默認端口號
    2. DFS Master

      DFS Master
      host:[hostname]
      port:8020
  4. 拷貝winutils.exe 和hadoop.dll到${hadoop_HOME}/bin
  5. 單獨拷貝hadoop.dll到C:WindowsSystem32
  6. 建立maven工程,經過pom.xml導包

    將lo4j.perperties文件拷貝到src/main/resources

打jar包,提交集羣運行

  1. jar包時,指定主類

    yarn jar pv.jar /input/2015082818 /output

  2. jar包時,不指定主類

    yarn jar pv.jar 類的全限定名 /input/2015082818 /output
    不一樣包中可能有相同類名,因此要指定類的全限定名

Shuffle

shuffle

MapReduce框架核心部分(設計精髓):內核

shuffle 定義

​ map() 輸出開始 到 reduce()輸入開始 此階段是shuffle
​ input -> map -> shuffle -> reduce -> output

shuffle分爲兩個階段

​ map shuffle phase

​ reduce shuffle phase

shuffle主要操做

​ partitioner - map

​ sorter - map & reduce

​ combiner: map phase局部聚合操做 不是全部的MapReduce程序均可以進行局部聚合的

​ compress:map phase的輸出數據壓縮 針對全部MapReduce程序均可以進行設置

​ group - reduce

shuffle詳解

全部操做都是針對map()輸出的<key, value>數據進行的

map shuffle phase

  1. 進入環形緩衝區(默認100MB)

    當達到環形緩衝區內存的80%默認狀況下,將會將緩衝區中的數據spill到本地磁盤中(溢出到MapTask所運行的NodeManager機器的本地磁盤中)

  2. 溢寫

    並非當即將緩衝區中的數據溢寫到本地磁盤,而是須要通過一些操做

    1. 分區paritioner

      依據此MapReduce Job中Reduce Task個數進行分區決定map輸出的數據被哪一個reduce任務進行處理分析默認狀況下,依據key採用HashPartitioner

// 經過取餘將數據分配到哪一個reduce處理
HashPartitioner
    int getParitition(key, value, numreducetask) {
        return ( key.hashCode&Integer.maxValue)%numreducetask;
    }
    1. 排序sorter

      會對每一個分區中的數據進行排序,默認狀況下依據key進行排序

    2. spill溢寫

      將分區排序後的數據寫到本地磁盤的一個文件中

      反覆上述的操做,產生多個小文件

    1. 當溢寫結束後

      • 此時將spill到本地磁盤的小文件進行一次合併。
      • combiner: (可選)map端的reduce
      • compress:(可配置) 數據減小了, 減小網絡IO; 但壓縮消耗CPU性能,也須要時間

    reduce shuffle phase

    • merge 合併

      各個分區的數據合併在一塊兒(當MapTask處理數據完成之後,告知AppMaster,而後AppMaster通知全部的ReduceTask,各個ReduceTask主動到已經完成的MapTask的本地磁盤,去拉取屬於本身要處理的數據(分區中))

    • 排序 對各個分區中的數據進行排序

      最後每一個分區造成一個文件(map輸出的數據最後在個文件中),分區的,而且各個分區的數據已經進行了排序。

    • 分組group

      將相同key的value值存入到list集合,造成新的key, list(value),將key/value對數據傳遞給reduce()函數進行處理。

    最後將(key, list(value))傳給 reduce()

    map個數及reduce個數肯定

    map個數肯定

    FileInputFormat.setMaxInputSplitSize(job, size);        設置切片最大值
    FileInputFormat.setMinInputSplitSize(job, size);        設置切片最小值

    FileInputFormat
            public List<InputSplit> getSplits(JobContext job){。。。}
                
            protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
                        return Math.max(minSize, Math.min(maxSize, blockSize));
            }
        
            // minSize<=maxSize<blockSize    提升併發
            // minSize>blockSize            下降併發

    reduce個數肯定

    job.setNumReduceTasks(2);
    HashParitioner 決定map輸出的類被哪一個reduce處理

    自定義shuffle

    自定義key

    • key 和 value 均可以使用自定義類
    • 自定義的類不使用 Java 自帶的 serializable 接口,改用hadoop 提供的Writable 接口
    • 注意重寫 toString、 write、readFields
    package com.flow;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    /**
     * 不用serializable
     * 
     * 用Hadoop的Writable
     *
     */
    
    public class Flow implements Writable {
        
        private long up;
        private long down;
        private long sum;
        
        
        public long getUp() {
            return up;
        }
        public void setUp(long up) {
            this.up = up;
        }
        public long getDown() {
            return down;
        }
        public void setDown(long down) {
            this.down = down;
        }
        public long getSum() {
            return sum;
        }
        public void setSum(long sum) {
            this.sum = sum;
        }
        
        
    
        @Override
        public String toString() {
            return up + "\t" + down + "\t" + sum;
        }
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(up);
            out.writeLong(down);
            out.writeLong(sum);
    
        }
        @Override
        public void readFields(DataInput in) throws IOException {
            up = in.readLong();
            down = in.readLong();
            sum = in.readLong();
        }
    }

    自定義分區

    • 調用 job 的 setNumReduceTasks 方法設置reduce 個數
    • setPartitionerClass 設置分區
    public static class MyPartitioner extends Partitioner<Text, Flow> {
    
            @Override
            public int getPartition(Text key, Flow value, int numPartitions) {
                if(value.getSum()<1024) {
                    return 0;
                }else if(value.getSum()<10*1024) {
                    return 1;
                }
                return 2;
            }    
        }

    排序

    只能按照key排序,若是須要多重排序,須要自定義key
    在shuffle過程當中自動排序,無需手動調用方法

    public class MyKey implements WritableComparable<MyKey>
    //要排序的類要實現WritableComparable接口
    
        @Override
        public int compareTo(MyKey o) {
            long result = o.getSum() - this.getSum();
            if(result>0) {
                return 1;
            }else if(result<0) {
                return -1;
            }
            return o.getPhone().compareTo(this.getPhone());
        }

    combiner

    map端的小reduce,對每一個map後的value進行reduce,減小數據傳輸

    能夠經過設置job.setCombinerClass(WCReducer.class);設置combiner

    先後效果對比

    原始數據
    hello world
    hello hadoop
    
    hello world
    hello java
    
    keyIn:hadoop        [1,    ]
    keyIn:hello        [1,    1,    1,    1,    ]
    keyIn:java        [1,    ]
    keyIn:world        [1,    1,    ]
    
    
    
    keyIn:hadoop        [1,    ]
    keyIn:hello        [2,    2,    ]
    keyIn:java        [1,    ]
    keyIn:world        [1,    1,    ]

    分組

    根據需求將key中相同的字段做爲同一個key以減小鍵值對,做爲一種優化的手段

    重寫 RawComparator 方法合併key中相同字段

    經過 job.setGroupingComparatorClass(Mygroup.class); 調用

    public static class Mygroup implements RawComparator<Person> {
    
            @Override
            public int compare(Person o1, Person o2) {
                // TODO Auto-generated method stub
                return 0;
            }
    
            @Override
            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
                return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4);
            }
            
        }

    hadoop優化

    1. 能夠設置block默認大小
    2. 設置map個數
    3. 調整環形緩衝區大小
    4. 自定義分區 --> 解決數據清傾斜問題
    5. 自定義 combiner --> map端的小reduce,減小網絡傳輸損耗
    6. 自定義分組 --> 減小鍵值對
    7. 設置reduce個數 --> 加快處理速度
    8. CombinerFileInputFormat --> 合併小文件
    9. 根據業務自定義key和value

    Java MapReduce編程錯誤

    • org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.IntWritable

      map方法把文件的行號當成key,因此要用LongWritable。
    相關文章
    相關標籤/搜索