首先說明下,hadoop的各類搭建方式再也不介紹,相信各位玩hadoop的同窗隨便都能搭出來。java
樓主的環境:node
下圖描述了Client向HDFS上傳一個200M大小的日誌文件的大體過程:apache
首先,Client發起文件上傳請求,即經過RPC與NameNode創建通信。分佈式
NameNode與各DataNode使用心跳機制來獲取DataNode信息。NameNode收到Client請求後,獲取DataNode信息,並將可存儲文件的節點信息返回給Client。ide
Client收到NameNode返回的信息,與對應的DataNode節點取得聯繫,並向該節點寫文件。oop
文件寫入到DataNode後,以流水線的方式複製到其餘DataNode(固然,這裏面也有DataNode向NameNode申請block,這裏不詳細介紹),至於複製多少份,與所配置的hdfs-default.xml中的dfs.replication相關。源碼分析
先明確幾個概念:fetch
fsimage:元數據鏡像文件。存儲某一時段NameNode內存元數據信息。
edits:操做日誌文件。
fstime:保存最近一次checkpoint的時間this
checkpoint可在hdfs-default.xml中具體配置,默認爲3600秒:spa
1 <property> 2 <name>dfs.namenode.checkpoint.period</name> 3 <value>3600</value> 4 <description>The number of seconds between two periodic checkpoints. 5 </description> 6 </property>
fsimage和edits文件在namenode目錄能夠看到:
NameNode中的元數據信息:
test.log文件上傳後,Namenode始終在內存中保存metedata,用於處理「讀請求」。metedata主要存儲了文件名稱(FileName),副本數量(replicas),分多少block存儲(block-ids),分別存儲在哪一個節點上(id2host)等。
到有「寫請求」到來時,namenode會首先寫editlog到磁盤,即向edits文件中寫日誌,成功返回後,纔會修改內存,而且向客戶端返回
hadoop會維護一個fsimage文件,也就是namenode中metedata的鏡像,可是fsimage不會隨時與namenode內存中的metedata保持一致,而是每隔一段時間經過合併edits文件來更新內容。此時Secondary namenode就派上用場了,合併fsimage和edits文件並更新NameNode的metedata。
Secondary namenode工做流程:
經過一張圖能夠表示爲:
文件下載相對來講就簡單一些了,如圖所示,Client要從DataNode上,讀取test.log文件。而test.log由block1和block2組成。
文件下載的主要流程爲:
Block1: h0,h1,h3
Block2: h0,h2,h4
咱們先簡單使用hadoop提供的API來實現文件的上傳下載(文件刪除、更名等操做比較簡單,這裏不演示):
1 package cn.jon.hadoop.hdfs; 2 3 import java.io.FileInputStream; 4 import java.io.FileOutputStream; 5 import java.io.IOException; 6 import java.io.InputStream; 7 import java.io.OutputStream; 8 import java.net.URI; 9 import java.net.URISyntaxException; 10 11 import org.apache.hadoop.conf.Configuration; 12 import org.apache.hadoop.fs.FileSystem; 13 import org.apache.hadoop.fs.Path; 14 import org.apache.hadoop.io.IOUtils; 15 import org.junit.Before; 16 import org.junit.Test; 17 18 public class HDFSDemo { 19 FileSystem fs = null; 20 @Before 21 public void init(){ 22 try { 23 //初始化文件系統 24 fs = FileSystem.get(new URI("hdfs://hadoopmaster:9000"), new Configuration(), "root"); 25 } catch (IOException e) { 26 e.printStackTrace(); 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 } catch (URISyntaxException e) { 30 e.printStackTrace(); 31 } 32 } 33 public static void main(String[] args) { 34 35 } 36 @Test 37 /** 38 * 文件上傳 39 */ 40 public void testFileUpload(){ 41 try { 42 OutputStream os = fs.create(new Path("/test.log")); 43 FileInputStream fis = new FileInputStream("I://test.log"); 44 IOUtils.copyBytes(fis, os, 2048,true); 45 //可使用hadoop提供的簡單方式 46 fs.copyFromLocalFile(new Path("I://test.log"), new Path("/test.log")); 47 } catch (IllegalArgumentException | IOException e) { 48 e.printStackTrace(); 49 } 50 } 51 @Test 52 /** 53 * 文件下載 54 */ 55 public void testFileDownload(){ 56 try { 57 InputStream is = fs.open(new Path("/test.log")); 58 FileOutputStream fos = new FileOutputStream("E://test.log"); 59 IOUtils.copyBytes(is, fos, 2048); 60 //可使用hadoop提供的簡單方式 61 fs.copyToLocalFile(new Path("/test.log"), new Path("E://test.log")); 62 } catch (IllegalArgumentException | IOException e) { 63 e.printStackTrace(); 64 } 65 } 66 67 }
顯而易見,只要是對hdfs上的文件進行操做,必須對FileSystem進行初始化,咱們先來分析FileSystem的初始化:
1 public static FileSystem get(URI uri, Configuration conf) throws IOException { 2 return CACHE.get(uri, conf);//部分方法我只截取了部分代碼,這裏進入get()方法 3 }
1 FileSystem get(URI uri, Configuration conf) throws IOException{ 2 Key key = new Key(uri, conf); 3 return getInternal(uri, conf, key);//調用getInternal() 4 }
1 private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{ 2 //使用單例模式建立FileSystem,這是因爲FS的初始化須要大量的時間,使用單例保證只是第一次加載慢一些,返回FileSystem的子類實現DistributedFileSystem 3 FileSystem fs; 4 synchronized (this) { 5 fs = map.get(key); 6 } 7 if (fs != null) { 8 return fs; 9 } 10 11 fs = createFileSystem(uri, conf); 12 synchronized (this) { // refetch the lock again 13 FileSystem oldfs = map.get(key); 14 if (oldfs != null) { // a file system is created while lock is releasing 15 fs.close(); // close the new file system 16 return oldfs; // return the old file system 17 } 18 19 // now insert the new file system into the map 20 if (map.isEmpty() 21 && !ShutdownHookManager.get().isShutdownInProgress()) { 22 ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY); 23 } 24 fs.key = key; 25 map.put(key, fs); 26 if (conf.getBoolean("fs.automatic.close", true)) { 27 toAutoClose.add(key); 28 } 29 return fs; 30 } 31 }
1 public void initialize(URI uri, Configuration conf) throws IOException { 2 super.initialize(uri, conf); 3 setConf(conf); 4 5 String host = uri.getHost(); 6 if (host == null) { 7 throw new IOException("Incomplete HDFS URI, no host: "+ uri); 8 } 9 homeDirPrefix = conf.get( 10 DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY, 11 DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT); 12 13 this.dfs = new DFSClient(uri, conf, statistics);//實例化DFSClient,並將它做爲DistributedFileSystem的引用,下面咱們跟進去 14 this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority()); 15 this.workingDir = getHomeDirectory(); 16 }
1 public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, 2 Configuration conf, FileSystem.Statistics stats) 3 throws IOException { 4 //該構造太長,樓主只截取了重要部分給你們展現,有感興趣的同窗能夠親手進源碼瞧瞧 5 NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null; 6 //這裏聲明瞭NameNode的代理對象,跟咱們前面討論的rpc就息息相關了 7 if (proxyInfo != null) { 8 this.dtService = proxyInfo.getDelegationTokenService(); 9 this.namenode = proxyInfo.getProxy(); 10 } else if (rpcNamenode != null) { 11 Preconditions.checkArgument(nameNodeUri == null); 12 this.namenode = rpcNamenode; 13 dtService = null; 14 } else { 15 Preconditions.checkArgument(nameNodeUri != null, 16 "null URI"); 17 proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, 18 ClientProtocol.class, nnFallbackToSimpleAuth); 19 this.dtService = proxyInfo.getDelegationTokenService(); 20 this.namenode = proxyInfo.getProxy();//獲取NameNode代理對象引用並本身持有,this.namenode類型爲ClientProtocol,它是一個接口,咱們看下這個接口 21 } 22 }
1 public interface ClientProtocol{ 2 public static final long versionID = 69L; 3 //還有不少對NameNode操做的方法申明,包括對文件上傳,下載,刪除等 4 //樓主特地把versionID貼出來了,這就跟咱們寫的RPCDemo中的MyBizable接口徹底相似,因此說Client一旦拿到該接口實現類的代理對象(NameNodeRpcServer),Client就能夠實現與NameNode的RPC通訊,咱們繼續跟進 5 }
1 public static <T> ProxyAndInfo<T> createProxy(Configuration conf, 2 URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth) 3 throws IOException { 4 AbstractNNFailoverProxyProvider<T> failoverProxyProvider = 5 createFailoverProxyProvider(conf, nameNodeUri, xface, true, 6 fallbackToSimpleAuth); 7 if (failoverProxyProvider == null) { 8 // 若是不是HA的建立方式,樓主環境是僞分佈式,因此走這裏,咱們跟進去 9 return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface, 10 UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth); 11 } else { 12 // 若是有HA的建立方式 13 Conf config = new Conf(conf); 14 T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, 15 RetryPolicies.failoverOnNetworkException( 16 RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts, 17 config.maxRetryAttempts, config.failoverSleepBaseMillis, 18 config.failoverSleepMaxMillis)); 19 return new ProxyAndInfo<T>(proxy, dtService, 20 NameNode.getAddress(nameNodeUri)); 21 } 22 }
最終返回的爲ClientProtocol接口的子類代理對象,而NameNodeRpcServer類實現了ClientProtocol接口,所以返回的爲NameNode的代理對象,當客戶端拿到了NameNode的代理對象後,即與NameNode創建了RPC通訊:
1 private static ClientProtocol createNNProxyWithClientProtocol( 2 InetSocketAddress address, Configuration conf, UserGroupInformation ugi, 3 boolean withRetries, AtomicBoolean fallbackToSimpleAuth) 4 throws IOException { 5 RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);//是否是感受愈來愈像咱們前面說到的RPC 6 7 final RetryPolicy defaultPolicy = 8 RetryUtils.getDefaultRetryPolicy(//加載默認策虐 9 conf, 10 DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, 11 DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, 12 DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY, 13 DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT, 14 SafeModeException.class); 15 16 final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); 17 //看到versionId了嗎?這下明白了rpc的使用中目標接口必需要有這個字段了吧 18 ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( 19 ClientNamenodeProtocolPB.class, version, address, ugi, conf, 20 NetUtils.getDefaultSocketFactory(conf), 21 org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy, 22 fallbackToSimpleAuth).getProxy(); 23 //看到沒?這裏使用 RPC.getProtocolProxy()來建立ClientNamenodeProtocolPB對象,調試時能夠清楚的看見,該對象引用的是一個代理對象,值爲$Proxy12,由JDK的動態代理來實現。 24 //前面咱們寫RPCDemo程序時,用的是RPC.getProxy(),可是各位你們能夠去看RPC源碼,RPC.getProtocolProxy()最終仍是調用的getProxy() 25 if (withRetries) { 26 Map<String, RetryPolicy> methodNameToPolicyMap 27 = new HashMap<String, RetryPolicy>(); 28 ClientProtocol translatorProxy = 29 new ClientNamenodeProtocolTranslatorPB(proxy); 30 return (ClientProtocol) RetryProxy.create(//這裏再次使用代理模式對代理對象進行包裝,也能夠理解爲裝飾者模式 31 ClientProtocol.class, 32 new DefaultFailoverProxyProvider<ClientProtocol>( 33 ClientProtocol.class, translatorProxy), 34 methodNameToPolicyMap, 35 defaultPolicy); 36 } else { 37 return new ClientNamenodeProtocolTranslatorPB(proxy); 38 } 39 }
整個FileSystem的初始化用時序圖表示爲:
到此,FileSystem的初始化就基本完成。因爲文章篇幅過大的問題,因此樓主把HDFS原理及源碼分析拆分紅了兩部分,上半部分主要是HDFS原理與FileSystem的初始化介紹,那在下半部分將會具體介紹HDFS文件上傳、下載的源碼解析。
另外,文章用到的一些示例代碼,將會在下半部分發布後,樓主一塊兒上傳到GitHub。