Hadoop之HDFS原理及文件上傳下載源碼分析(上)

HDFS原理

  首先說明下,hadoop的各類搭建方式再也不介紹,相信各位玩hadoop的同窗隨便都能搭出來。java

  樓主的環境:node

  •   操做系統:Ubuntu 15.10
  •   hadoop版本:2.7.3
  •   HA:否(隨便搭了個僞分佈式)

文件上傳

下圖描述了Client向HDFS上傳一個200M大小的日誌文件的大體過程:apache

  首先,Client發起文件上傳請求,即經過RPC與NameNode創建通信。分佈式

  NameNode與各DataNode使用心跳機制來獲取DataNode信息。NameNode收到Client請求後,獲取DataNode信息,並將可存儲文件的節點信息返回給Client。ide

  Client收到NameNode返回的信息,與對應的DataNode節點取得聯繫,並向該節點寫文件。oop

  文件寫入到DataNode後,以流水線的方式複製到其餘DataNode(固然,這裏面也有DataNode向NameNode申請block,這裏不詳細介紹),至於複製多少份,與所配置的hdfs-default.xml中的dfs.replication相關。源碼分析

  元數據存儲

  先明確幾個概念:fetch

  fsimage:元數據鏡像文件。存儲某一時段NameNode內存元數據信息。
  edits:操做日誌文件。
  fstime:保存最近一次checkpoint的時間this

  checkpoint可在hdfs-default.xml中具體配置,默認爲3600秒:spa

1 <property>
2   <name>dfs.namenode.checkpoint.period</name>
3   <value>3600</value>
4   <description>The number of seconds between two periodic checkpoints.
5   </description>
6 </property>

 

  fsimage和edits文件在namenode目錄能夠看到:

NameNode中的元數據信息:

 

  

  test.log文件上傳後,Namenode始終在內存中保存metedata,用於處理「讀請求」。metedata主要存儲了文件名稱(FileName),副本數量(replicas),分多少block存儲(block-ids),分別存儲在哪一個節點上(id2host)等。

  到有「寫請求」到來時,namenode會首先寫editlog到磁盤,即向edits文件中寫日誌,成功返回後,纔會修改內存,而且向客戶端返回
  hadoop會維護一個fsimage文件,也就是namenode中metedata的鏡像,可是fsimage不會隨時與namenode內存中的metedata保持一致,而是每隔一段時間經過合併edits文件來更新內容。此時Secondary namenode就派上用場了,合併fsimage和edits文件並更新NameNode的metedata。
  Secondary namenode工做流程:

  1. secondary通知namenode切換edits文件
  2. secondary經過http請求從namenode得到fsimage和edits文件
  3. secondary將fsimage載入內存,而後開始合併edits
  4. secondary將新的fsimage發回給namenode
  5. namenode用新的fsimage替換舊的fsimage

  經過一張圖能夠表示爲:

 文件下載

  文件下載相對來講就簡單一些了,如圖所示,Client要從DataNode上,讀取test.log文件。而test.log由block1和block2組成。

  



  文件下載的主要流程爲:

  • client向namenode發送請求。
  • namenode查看Metadata信息,返回test.log的block的位置。     

    Block1: h0,h1,h3
    Block2: h0,h2,h4

  • 開始從h0節點下載block1,block2。

源碼分析

  咱們先簡單使用hadoop提供的API來實現文件的上傳下載(文件刪除、更名等操做比較簡單,這裏不演示):

 

  

 1 package cn.jon.hadoop.hdfs;
 2 
 3 import java.io.FileInputStream;
 4 import java.io.FileOutputStream;
 5 import java.io.IOException;
 6 import java.io.InputStream;
 7 import java.io.OutputStream;
 8 import java.net.URI;
 9 import java.net.URISyntaxException;
10 
11 import org.apache.hadoop.conf.Configuration;
12 import org.apache.hadoop.fs.FileSystem;
13 import org.apache.hadoop.fs.Path;
14 import org.apache.hadoop.io.IOUtils;
15 import org.junit.Before;
16 import org.junit.Test;
17 
18 public class HDFSDemo {
19     FileSystem fs = null;    
20     @Before
21     public void init(){
22         try {
23             //初始化文件系統
24             fs = FileSystem.get(new URI("hdfs://hadoopmaster:9000"), new Configuration(), "root");
25         } catch (IOException e) {
26             e.printStackTrace();
27         } catch (InterruptedException e) {
28             e.printStackTrace();
29         } catch (URISyntaxException e) {
30             e.printStackTrace();
31         }
32     }
33     public static void main(String[] args) {
34         
35     }
36     @Test
37     /**
38      * 文件上傳
39      */
40     public void testFileUpload(){
41         try {
42             OutputStream os = fs.create(new Path("/test.log"));
43             FileInputStream fis = new FileInputStream("I://test.log");
44             IOUtils.copyBytes(fis, os, 2048,true);
45             //可使用hadoop提供的簡單方式
46             fs.copyFromLocalFile(new Path("I://test.log"), new Path("/test.log"));
47         } catch (IllegalArgumentException | IOException e) {
48             e.printStackTrace();
49         }
50     }
51     @Test    
52     /**
53      * 文件下載
54      */
55     public void testFileDownload(){
56         try {
57             InputStream is = fs.open(new Path("/test.log"));
58             FileOutputStream fos = new FileOutputStream("E://test.log");            
59             IOUtils.copyBytes(is, fos, 2048);
60             //可使用hadoop提供的簡單方式
61             fs.copyToLocalFile(new Path("/test.log"), new Path("E://test.log"));
62         } catch (IllegalArgumentException | IOException e) {
63             e.printStackTrace();
64         }
65     }
66 
67 }

  顯而易見,只要是對hdfs上的文件進行操做,必須對FileSystem進行初始化,咱們先來分析FileSystem的初始化:

  

1  public static FileSystem get(URI uri, Configuration conf) throws IOException {
2     return CACHE.get(uri, conf);//部分方法我只截取了部分代碼,這裏進入get()方法
3   }
1    FileSystem get(URI uri, Configuration conf) throws IOException{
2       Key key = new Key(uri, conf);
3       return getInternal(uri, conf, key);//調用getInternal()
4     }
 1 private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
 2      //使用單例模式建立FileSystem,這是因爲FS的初始化須要大量的時間,使用單例保證只是第一次加載慢一些,返回FileSystem的子類實現DistributedFileSystem
 3       FileSystem fs;
 4       synchronized (this) {
 5         fs = map.get(key);
 6       }
 7       if (fs != null) {
 8         return fs;
 9       }
10 
11       fs = createFileSystem(uri, conf);
12       synchronized (this) { // refetch the lock again
13         FileSystem oldfs = map.get(key);
14         if (oldfs != null) { // a file system is created while lock is releasing
15           fs.close(); // close the new file system
16           return oldfs;  // return the old file system
17         }
18         
19         // now insert the new file system into the map
20         if (map.isEmpty()
21                 && !ShutdownHookManager.get().isShutdownInProgress()) {
22           ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
23         }
24         fs.key = key;
25         map.put(key, fs);
26         if (conf.getBoolean("fs.automatic.close", true)) {
27           toAutoClose.add(key);
28         }
29         return fs;
30       }
31     }

 

 1 public void initialize(URI uri, Configuration conf) throws IOException {
 2     super.initialize(uri, conf);
 3     setConf(conf);
 4 
 5     String host = uri.getHost();
 6     if (host == null) {
 7       throw new IOException("Incomplete HDFS URI, no host: "+ uri);
 8     }
 9     homeDirPrefix = conf.get(
10         DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,
11         DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);
12     
13     this.dfs = new DFSClient(uri, conf, statistics);//實例化DFSClient,並將它做爲DistributedFileSystem的引用,下面咱們跟進去
14     this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
15     this.workingDir = getHomeDirectory();
16   }

 

 1 public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
 2       Configuration conf, FileSystem.Statistics stats)
 3     throws IOException {
 4     //該構造太長,樓主只截取了重要部分給你們展現,有感興趣的同窗能夠親手進源碼瞧瞧     
 5     NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
 6     //這裏聲明瞭NameNode的代理對象,跟咱們前面討論的rpc就息息相關了
 7     if (proxyInfo != null) {
 8       this.dtService = proxyInfo.getDelegationTokenService();
 9       this.namenode = proxyInfo.getProxy();
10     } else if (rpcNamenode != null) {   
11       Preconditions.checkArgument(nameNodeUri == null);
12       this.namenode = rpcNamenode;
13       dtService = null;
14     } else {
15       Preconditions.checkArgument(nameNodeUri != null,
16           "null URI");
17       proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
18           ClientProtocol.class, nnFallbackToSimpleAuth);
19       this.dtService = proxyInfo.getDelegationTokenService();
20       this.namenode = proxyInfo.getProxy();//獲取NameNode代理對象引用並本身持有,this.namenode類型爲ClientProtocol,它是一個接口,咱們看下這個接口
21     }
22   }

 

1 public interface ClientProtocol{
2       public static final long versionID = 69L;
3       //還有不少對NameNode操做的方法申明,包括對文件上傳,下載,刪除等
4       //樓主特地把versionID貼出來了,這就跟咱們寫的RPCDemo中的MyBizable接口徹底相似,因此說Client一旦拿到該接口實現類的代理對象(NameNodeRpcServer),Client就能夠實現與NameNode的RPC通訊,咱們繼續跟進
5 }

 

 1  public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
 2       URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)
 3       throws IOException {
 4     AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
 5         createFailoverProxyProvider(conf, nameNodeUri, xface, true,
 6           fallbackToSimpleAuth);  
 7     if (failoverProxyProvider == null) {
 8       // 若是不是HA的建立方式,樓主環境是僞分佈式,因此走這裏,咱們跟進去
 9       return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
10           UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
11     } else {
12       // 若是有HA的建立方式
13       Conf config = new Conf(conf);
14       T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
15           RetryPolicies.failoverOnNetworkException(
16               RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,
17               config.maxRetryAttempts, config.failoverSleepBaseMillis,
18               config.failoverSleepMaxMillis));
19       return new ProxyAndInfo<T>(proxy, dtService,
20           NameNode.getAddress(nameNodeUri));
21     }
22   }

   最終返回的爲ClientProtocol接口的子類代理對象,而NameNodeRpcServer類實現了ClientProtocol接口,所以返回的爲NameNode的代理對象,當客戶端拿到了NameNode的代理對象後,即與NameNode創建了RPC通訊:

 1 private static ClientProtocol createNNProxyWithClientProtocol(
 2       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
 3       boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
 4       throws IOException {
 5     RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);//是否是感受愈來愈像咱們前面說到的RPC
 6 
 7     final RetryPolicy defaultPolicy = 
 8         RetryUtils.getDefaultRetryPolicy(//加載默認策虐
 9             conf, 
10             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, 
11             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, 
12             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
13             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
14             SafeModeException.class);
15     
16     final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
17     //看到versionId了嗎?這下明白了rpc的使用中目標接口必需要有這個字段了吧
18     ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
19         ClientNamenodeProtocolPB.class, version, address, ugi, conf,
20         NetUtils.getDefaultSocketFactory(conf),
21         org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
22         fallbackToSimpleAuth).getProxy();
23     //看到沒?這裏使用 RPC.getProtocolProxy()來建立ClientNamenodeProtocolPB對象,調試時能夠清楚的看見,該對象引用的是一個代理對象,值爲$Proxy12,由JDK的動態代理來實現。
24     //前面咱們寫RPCDemo程序時,用的是RPC.getProxy(),可是各位你們能夠去看RPC源碼,RPC.getProtocolProxy()最終仍是調用的getProxy()
25     if (withRetries) {
26       Map<String, RetryPolicy> methodNameToPolicyMap 
27                  = new HashMap<String, RetryPolicy>();    
28       ClientProtocol translatorProxy =
29         new ClientNamenodeProtocolTranslatorPB(proxy);
30       return (ClientProtocol) RetryProxy.create(//這裏再次使用代理模式對代理對象進行包裝,也能夠理解爲裝飾者模式
31           ClientProtocol.class,
32           new DefaultFailoverProxyProvider<ClientProtocol>(
33               ClientProtocol.class, translatorProxy),
34           methodNameToPolicyMap,
35           defaultPolicy);
36     } else {
37       return new ClientNamenodeProtocolTranslatorPB(proxy);
38     }
39   }

  整個FileSystem的初始化用時序圖表示爲:

 

  到此,FileSystem的初始化就基本完成。因爲文章篇幅過大的問題,因此樓主把HDFS原理及源碼分析拆分紅了兩部分,上半部分主要是HDFS原理與FileSystem的初始化介紹,那在下半部分將會具體介紹HDFS文件上傳、下載的源碼解析。

  另外,文章用到的一些示例代碼,將會在下半部分發布後,樓主一塊兒上傳到GitHub。

相關文章
相關標籤/搜索