Hadoop大數據通用處理平臺

1.簡介node

Hadoop是一款開源的大數據通用處理平臺,其提供了分佈式存儲和分佈式離線計算,適合大規模數據、流式數據(寫一次,讀屢次),不適合低延時的訪問、大量的小文件以及頻繁修改的文件。數據庫

*Hadoop由HDFS、YARN、MapReduce組成。服務器

Hadoop的特色:網絡

1.高擴展(動態擴容):可以存儲和處理千兆字節數據(PB),可以動態的增長和卸載節點,提高存儲能力(可以達到上千個節點)框架

2.低成本:只須要普通的PC機就能實現,不依賴高端存儲設備和服務器。ssh

3.高效率:經過在Hadoop集羣中分化數據並行處理,使得處理速度很是快。分佈式

4.可靠性:數據有多份副本,而且在任務失敗後能自動從新部署。ide

Hadoop的使用場景:oop

1.日誌分析,將數據分片並行計算處理。大數據

2.基於海量數據的在線應用。

3.推薦系統,精準營銷。

4.搜索引擎。

Hadoop生態圈:

Hive:利用Hive能夠不須要編寫複雜的Hadoop程序,只須要寫一個SQL語句,Hive就會把SQL語句轉換成Hadoop的任務去執行,下降使用Hadoop離線計算的門檻。

HBase:海量數據存儲的非關係型數據庫,單個表中的數據可以容納百億行x百萬列。

ZooKeeper:監控Hadoop集羣中每一個節點的狀態,管理整個集羣的配置,維護節點間數據的一致性。

Flume:海量日誌採集系統。

2.內部結構

2.1 HDFS

HDFS是分佈式文件系統,存儲海量的文件,其中HDFS中包含NameNode、DataNode、SecondaryNameNode組件等。

Block數據塊

1.HDFS中基本的存儲單元,1.X版本中每一個Block默認是64M,2.X版本中每一個Block默認是128M。

2.一個大文件會被拆分紅多個Block進行存儲,若是一個文件少於Block的大小,那麼其實際佔用的空間爲文件自身大小。

3.每一個Block都會在不一樣的DataNode節點中存在備份(默認備份數是3)

DataNode

1.保存具體的Blocks數據。

2.負責數據的讀寫操做和複製操做。

3.DataNode啓動時會向NameNode彙報當前存儲的數據塊信息。

NameNode

1.存儲文件的元信息和文件與Block、DataNode的關係,NameNode運行時全部數據都保存在內存中,所以整個HDFS可存儲的文件數受限於NameNode的內存大小。

2.每一個Block在NameNode中都對應一條記錄,若是是大量的小文件將會消耗大量內存,所以HDFS適合存儲大文件。

3.NameNode中的數據會定時保存到本地磁盤中(只有元數據),但不保存文件與Block、DataNode的位置信息,這部分數據由DataNode啓動時上報和運行時維護。

*NameNode不容許DataNode具備同一個Block的多個副本,因此建立的最大副本數量是當時DataNode的總數。

*DataNode會按期向NameNode發送心跳信息,一旦在必定時間內NameNode沒有接收到DataNode發送的心跳則認爲其已經宕機,所以不會再給它任何IO請求。

*若是DataNode失效形成副本數量降低而且低於預先設置的閾值或者動態增長副本數量,則NameNode會在合適的時機從新調度DataNode進行復制。

SecondaryNameNode

1.定時與NameNode進行同步,合併HDFS中系統鏡像,定時替換NameNode中的鏡像。

HDFS寫入文件的流程

1.HDFS Client向NameNode申請寫入文件。

2.NameNode根據文件大小,返回文件要寫入的DataNode列表以及Block id (此時NameNode已存儲文件的元信息、文件與DataNode、Block之間的關係)

3.HDFS Client收到響應後,將文件寫入第一個DataNode中,第一個DataNode接收到數據後將其寫入本地磁盤,同時把數據傳遞給第二個DataNode,直到寫入備份數個DataNode。

4.每一個DataNode接收完數據後都會向前一個DataNode返回寫入成功的響應,最終第一個DataNode將返回HDFS Client客戶端寫入成功的響應。

5.當HDFS Client接收到整個DataNodes的確認請求後會向NameNode發送最終確認請求,此時NameNode纔會提交文件。

*當寫入某個DataNode失敗時,數據會繼續寫入其餘的DataNode,NameNode會從新尋找DataNode繼續複製,以保證數據的可靠性。

*每一個Block都會有一個校驗碼並存放在獨立的文件中,以便讀的時候來驗證數據的完整性。

*文件寫入完畢後,向NameNode發送確認請求,此時文件纔可見,若是發送確認請求以前NameNode宕機,那麼文件將會丟失,HDFS客戶端沒法進行讀取。

HDFS讀取文件的流程

1.HDFS Client向NameNode申請讀取指定文件。

2.NameNode返回文件全部的Block以及這些Block所在的DataNodes中(包括複製節點)

3.HDFS Client根據NameNode的返回,優先從與HDFS Client同節點的DataNode中直接讀取(若HDFS Client不在集羣範圍內則隨機選擇),若是從DataNode中讀取失敗則經過網絡從複製節點中進行讀取。

機架感知

分佈式集羣中一般包含很是多的機器,因爲受到機架槽位和交換機網口的限制,一般大型的分佈式集羣都會跨好幾個機架,由多個機架上的機器共同組成一個分佈式集羣。

機架內的機器之間的網絡速度一般都會高於跨機架機器之間的網絡速度,而且機架之間機器的網絡通訊一般受到上層交換機間網絡帶寬的限制。

Hadoop默認沒有開啓機架感知功能,默認狀況下每一個Block都是隨機分配DataNode,須要進行相關的配置,那麼在NameNode啓動時,會將機器與機架的對應信息保存在內存中,用於在HDFS Client申請寫文件時,可以根據預先定義的機架關係合理的分配DataNode。

Hadoop機架感知默認對3個副本的存放策略爲:

第1個Block副本存放在和HDFS Client所在的節點中(若HDFS Client不在集羣範圍內則隨機選取)

第2個Block副本存放在與第一個節點不一樣機架下的節點中(隨機選擇)

第3個Block副本存放在與第2個副本所在節點的機架下的另外一個節點中,若是還有更多的副本則隨機存放在集羣的節點中。

*使用此策略能夠保證對文件的訪問可以優先在本機架下找到,而且若是整個機架上發生了異常也能夠在另外的機架上找到該Block的副本。

2.2 YARN

YARN是分佈式資源調度框架(任務計算框架的資源調度框架),主要負責集羣中的資源管理以及任務調度而且監控各個節點。

ResourceManager

1.是整個集羣的資源管理者,管理並監控各個NodeManager。

2.處理客戶端的任務請求。

3.啓動和監控ApplicationMaster。

4.負責資源的分配以及調度。

NodeManager

1.是每一個節點的管理者,負責任務的執行。

2.處理來自ResourceManager的命令。

3.處理來自ApplicationMaster的命令。

ApplicationMaster

1.數據切分,用於並行計算處理。

2.計算任務所須要的資源。

3.負責任務的監控與容錯。

任務運行在YARN的流程

1.客戶端提交任務請求到ResourceManager。

2.ResourceManager生成一個ApplicationManager進程,用於任務的管理。

3.ApplicationManager建立一個Container容器用於存聽任務所須要的資源。

4.ApplicationManager尋找其中一個NodeManager,在此NodeManager中啓動一個ApplicationMaster,用於任務的管理以及監控。

5.ApplicationMaster向ResourceManager進行註冊,並計算任務所需的資源彙報給ResourceManager(CPU與內存)

6.ResourceManager爲此任務分配資源,資源封裝在Container容器中。

7.ApplicationMaster通知集羣中相關的NodeManager進行任務的執行。

8.各個NodeManager從Container容器中獲取資源並執行Map、Reduce任務。

2.3 MapReduce

MapReduce是分佈式離線並行計算框架,高吞吐量,高延時,原理是將分析的數據拆分紅多份,經過多臺節點並行處理,相對於Storm、Spark任務計算框架而言,MapReduce是最先出現的計算框架。

MapReduce、Storm、Spark任務計算框架對比:

MapReduce執行流程

MapReduce將程序劃分爲Map任務以及Reduce任務兩部分。

Map任務處理流程

1.讀取文件中的內容,解析成Key-Value的形式 (Key爲偏移量,Value爲每行的數據)

2.重寫map方法,編寫業務邏輯,生成新的Key和Value。

3.對輸出的Key、Value進行分區(Partitioner類)

4.對數據按照Key進行排序、分組,相同key的value放到一個集合中(數據彙總)

*處理的文件必需要在HDFS中。

Reduce任務處理流程

1.對多個Map任務的輸出,按照不一樣的分區,經過網絡複製到不一樣的reduce節點。

2.對多個Map任務的輸出進行合併、排序。

3.將reduce的輸出保存到文件,存放在HDFS中。

3.Hadoop的使用

3.1 安裝

因爲Hadoop使用Java語言進行編寫,所以須要安裝JDK。

從CDH中下載Hadoop 2.X並進行解壓,CDH是Cloudrea公司對各類開源框架的整合與優化(較穩定)

etc目錄:Hadoop配置文件存放目錄。

logs目錄:Hadoop日誌存放目錄。

bin目錄、sbin目錄:Hadoop可執行命令存放目錄。

etc目錄

bin目錄

sbin目錄

3.2 Hadoop配置

1.配置環境

編輯etc/hadoop/hadoop-env.sh文件,修改JAVA_HOME配置項爲本地JAVA的HOME目錄,此文件是Hadoop啓動時加載的環境變量。

編輯/etc/hosts文件,添加主機名與IP的映射關係。

2.配置Hadoop公共屬性(core-site.xml)

複製代碼
<configuration>
<!-- Hadoop工做目錄,用於存放Hadoop運行時產生的臨時數據 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/hadoop/hadoop-2.9.0/data</value>
</property>
<!-- NameNode的通訊地址,1.x默認9000,2.x可使用8020 -->
<property>
<name>fs.default.name</name>
<value>hdfs://192.168.1.80:8020</value>
</property>
</configuration>
複製代碼

3.配置HDFS(hdfs-site.xml)

複製代碼
<configuration>
<!--指定block的副本數量(將block複製到集羣中備份數-1個節點的DataNode中)-->
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<!-- 關閉HDFS的訪問權限 -->
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
</configuration>
複製代碼

4.配置YARN(yarn-site.xml)

複製代碼
<configuration>
<!-- 配置Reduce取數據的方式是shuffle(隨機) -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
複製代碼

5.配置MapReduce(mapred-site.xml)

複製代碼
<configuration>
<!-- 讓MapReduce任務使用YARN進行調度 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
複製代碼

6.配置SSH

因爲在啓動hdfs、yarn時都須要對用戶的身份進行驗證,所以能夠配置SSH設置免密碼登陸。

//生成祕鑰
ssh-keygen -t rsa

//複製祕鑰到本機
ssh-copy-id 192.168.1.80

3.3 啓動HDFS

1.格式化NameNode

2.啓動HDFS,將會啓動NameNode、DataNode、SecondaryNameNode三個進程,能夠經過jps命令進行查看。

*若啓動時出現錯誤,則能夠進入logs目錄查看相應的日誌文件。

當HDFS啓動完畢後,能夠訪問http://localhost:50070進入HDFS的可視化管理界面,能夠在此頁面中監控整個HDFS集羣的情況而且進行文件的上傳以及下載。

*進入HDFS監控頁面下載文件時,會將請求重定向,重定向後的地址的主機名爲NameNode的主機名,所以客戶端本地的host文件中須要配置NameNode主機名與IP的映射關係。

3.4 啓動YARN

啓動YARN後,將會啓動ResourceManager以及NodeManager進程,能夠經過jps命令進行查看。

當YARN啓動完畢後,能夠訪問http://localhost:8088進入YARN的可視化管理界面,能夠在此頁面中查看任務的執行狀況以及資源的分配。

3.5 使用Shell命令操做HDFS

HDFS中的文件系統與Linux相似,由/表明根目錄。

複製代碼
hadoop fs -cat <src>:顯示文件中的內容。

hadoop fs -copyFromLocal <localsrc> <dst>:將本地中的文件上傳到HDFS。

hadoop fs -copyToLocal <src> <localdst>:將HDFS中的文件下載到本地。

hadoop fs -count <path>:查詢指定路徑下文件的個數。

hadoop fs -cp <src> <dst>:在HDFS內對文件進行復制。

hadoop fs -get <src> <localdst>:將HDFS中的文件下載到本地。

hadoop fs -ls <path>:顯示指定目錄下的內容。

hadoop fs -mkdir <path>:建立目錄。

hadoop fs -moveFromLocal <localsrc> <dst>:將本地中的文件剪切到HDFS中。

hadoop fs -moveToLocal <src> <localdst> :將HDFS中的文件剪切到本地中。

hadoop fs -mv <src> <dst> :在HDFS內對文件進行移動。

hadoop fs -put <localsrc> <dst>:將本地中的文件上傳到HDFS。

hadoop fs -rm <src>:刪除HDFS中的文件。
複製代碼

3.6 JAVA中操做HDFS

複製代碼
/**

  • @Auther: ZHUANGHAOTANG
  • @Date: 2018/11/6 11:49
  • @Description:
    */
    public class HDFSUtils {

    /**

    • HDFS NamenNode URL
      */
      private static final String NAMENODE_URL = "hdfs://hadoop1:8020";

    /**

    • 配置項
      */
      private static Configuration conf = null;

    static {
    conf = new Configuration();
    }

    /**

    • 建立目錄
      */
      public static void mkdir(String dir) throws Exception {
      if (StringUtils.isBlank(dir)) {
      throw new Exception("Parameter Is NULL");
      }
      dir = NAMENODE_URL + dir;
      FileSystem fs = FileSystem.get(URI.create(NAMENODE_URL), conf);
      if (!fs.exists(new Path(dir))) {
      fs.mkdirs(new Path(dir));
      }
      fs.close();
      }

    /**

    • 刪除目錄或文件
      */
      public static void delete(String dir) throws Exception {
      if (StringUtils.isBlank(dir)) {
      throw new Exception("Parameter Is NULL");
      }
      dir = NAMENODE_URL + dir;
      FileSystem fs = FileSystem.get(URI.create(NAMENODE_URL), conf);
      fs.delete(new Path(dir), true);
      fs.close();
      }

    /**

    • 遍歷指定路徑下的目錄和文件
      */
      public static List<String> listAll(String dir) throws Exception {
      List<String> names = new ArrayList<>();
      if (StringUtils.isBlank(dir)) {
      throw new Exception("Parameter Is NULL");
      }
      dir = NAMENODE_URL + dir;
      FileSystem fs = FileSystem.get(URI.create(dir), conf);
      FileStatus[] files = fs.listStatus(new Path(dir));
      for (int i = 0, len = files.length; i < len; i++) {
      if (files[i].isFile()) { //文件
      names.add(files[i].getPath().toString());
      } else if (files[i].isDirectory()) { //目錄
      names.add(files[i].getPath().toString());
      } else if (files[i].isSymlink()) { //軟或硬連接
      names.add(files[i].getPath().toString());
      }
      }
      fs.close();
      return names;
      }

    /**

    • 上傳當前服務器的文件到HDFS中
      */
      public static void uploadLocalFileToHDFS(String localFile, String hdfsFile) throws Exception {
      if (StringUtils.isBlank(localFile) || StringUtils.isBlank(hdfsFile)) {
      throw new Exception("Parameter Is NULL");
      }
      hdfsFile = NAMENODE_URL + hdfsFile;
      FileSystem fs = FileSystem.get(URI.create(NAMENODE_URL), conf);
      Path src = new Path(localFile);
      Path dst = new Path(hdfsFile);
      fs.copyFromLocalFile(src, dst);
      fs.close();
      }

    /**

    • 經過流上傳文件
      */
      public static void uploadFile(String hdfsPath, InputStream inputStream) throws Exception {
      if (StringUtils.isBlank(hdfsPath)) {
      throw new Exception("Parameter Is NULL");
      }
      hdfsPath = NAMENODE_URL + hdfsPath;
      FileSystem fs = FileSystem.get(URI.create(NAMENODE_URL), conf);
      FSDataOutputStream os = fs.create(new Path(hdfsPath));
      BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
      byte[] data = new byte[1024];
      while (bufferedInputStream.read(data) != -1) {
      os.write(data);
      }
      os.close();
      fs.close();
      }

    /**

    • 從HDFS中下載文件
      */
      public static byte[] readFile(String hdfsFile) throws Exception {
      if (StringUtils.isBlank(hdfsFile)) {
      throw new Exception("Parameter Is NULL");
      }
      hdfsFile = NAMENODE_URL + hdfsFile;
      FileSystem fs = FileSystem.get(URI.create(NAMENODE_URL), conf);
      Path path = new Path(hdfsFile);
      if (fs.exists(path)) {
      FSDataInputStream is = fs.open(path);
      FileStatus stat = fs.getFileStatus(path);
      byte[] data = new byte[(int) stat.getLen()];
      is.readFully(0, data);
      is.close();
      fs.close();
      return data;
      } else {
      throw new Exception("File Not Found In HDFS");
      }
      }

}
複製代碼

3.7 執行一個MapReduce任務

Hadoop中提供了hadoop-mapreduce-examples-2.9.0.jar,其封裝了一些任務計算方法,能夠直接進行調用。

*使用hadoop jar命令執行JAR包。

1.建立一個文件,將此文件上傳到HDFS中。

2.使用Hadoop提供的hadoop-mapreduce-examples-2.9.0.jar執行wordcount詞頻統計功能,而後在YARN管理頁面中進行查看。

YARN管理頁面中能夠查看任務的執行進度:

3.當任務執行完畢後,能夠查看任務的執行結果。

*任務的執行結果將會放到HDFS的文件中。

相關文章
相關標籤/搜索