當數據集的大小超過一臺獨立的物理計算機的存儲能力時,就有必要對它進行分區並存儲到若干臺單獨的計算機上。HDFS是hadoop的主要分佈式存儲系統,一個HDFS集羣主要包括NameNode用來管理文件系統的metadata,DataNode用來存儲實際的數據。下面是HDFS的一些特色java
HDFS採用master/slave架構。一個HDFS集羣通常有一個Namenode和必定數目的Datanode組成。Namenode是一箇中心服務器,負責管理文件系統的namespace和客戶端對文件的訪問。Datanode在集羣中通常是一個節點,負責管理節點上它們附帶的存儲。在內部,一個文件其實分紅一個或多個block,這些block存儲在Datanode集合裏。Namenode執行文件系統的namespace操做,例如打開、關閉、重命名文件和目錄,同時決定block到具體Datanode節點的映射。Datanode在Namenode的指揮下進行block的建立、刪除和複製
node
NameNode
是整個系統的管理節點,它維護這整個文件系統的文件目錄樹,文件/目錄的元信息和每一個文件對應的數據塊列表。接收用戶的操做請求
維護文件包括:web
鏡像
,可是fsimage不會隨時與NN內存中的metadata保持一致,而是每隔一段時間經過合併edits文件來更新內容(secondary NN是用來合併fsimage和edits文件來更新NN的metadata)當NameNode啓動時,它首先須要從fsimage
讀取HDFS的狀態,而後從edit log file讀取edits
的信息。NameNode再將新的HDFS的狀態寫入到fsimage
和新建一個空的edits file。在啓動階段,NameNode須要合併fsimage
和edits file
,edits log file在一個忙碌的集羣中文件大小會變得很大,這會致使在重啓NameNode時候耗費較長的時間。
Secondary NameNode按期的合併fsimage
和edits log
並保持edits log文件在一個適度的大小,它每每運行在不一樣的機器上面。在啓動Secondary NameNode上checkpoint
過程主要收到兩個參數的 影響shell
dfs.namenode.checkpoint.period
:默認的是1 hour,兩次連續的checkpoint的最大時間間隔dfs.namenode.checkpoint.period
:默認爲1 million 超過這個大小就督促進行checkpointed transactions,即便時間間隔還沒到
小程序
5.讀取結束。客戶端直接從DataNode上讀取文件,在此過程當中NN不參與文件的傳輸slave經過RPC請求master,master的方法會被調用服務器
下圖是數據塊的分佈
上傳成功後NN始終在內存中保存metadata,用於處理讀請求
,metadata主要存儲文件的名稱FileName
,副本數量replicas
,分多少block存儲block-ids
,分別存儲在哪幾個節點上id2host
。架構
上面講了那麼多不一樣角色之間的交互,這些進程間的交互都是經過RPC(Remote Procdure Call,遠程過程調用)來進行的,它容許一個進程去訪問另外一個進程的方法,這些對於用戶都是透明的,能夠說Hadoop的運行是創建在RPC基礎之上的,在瞭解RPC以前咱們須要先了解一項技術動態代理
,動態代理能夠提供對另外一個對象的訪問,同時隱藏實際對象的具體事實,代理對象對客戶隱藏了實際對象。在Hadoop中DataNode端是經過得到NameNode的代理,經過該代理和NameNode進行通訊的,爲了更好的分析hadoop的RPC機制我想先分析一下動態代理是怎麼實現。
目前Java開發包中提供了對動態代理的支持,但如今只支持對接口的實現,咱們須要定義一個接口。併發
interface DynamicService { public void show(); }
當實現動態代理的時候,須要實現InvocationHandler類,而且覆寫其Invoker方法分佈式
class ClassA implements DynamicService { @Override public void show() { System.out.println("this is class A"); } } class ClassB implements DynamicService { @Override public void show() { System.out.println("this is class B"); } } class Invoker implements InvocationHandler { DynamicService ds; public Invoker(DynamicService ds) { this.ds = ds; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //add some dynamic methods here method.invoke(ds, args); //after return null; } }
下面是例子的測試ide
public static void main(String[] args) { Invoker inv1 = new Invoker(new ClassA()); DynamicService ds = (DynamicService) Proxy.newProxyInstance(DynamicService.class.getClassLoader(), new Class[] {DynamicService.class}, inv1); ds.show(); //添加另一個 Invoker inv2 = new Invoker(new ClassB()); DynamicService dss = (DynamicService) Proxy.newProxyInstance(DynamicService.class.getClassLoader(), new Class[] {DynamicService.class}, inv2); dss.show(); }
如今咱們就須要去實現Hadoop RPC,主要分爲如下幾步
4.構造RPC Client 併發送請求:使用RPC.getProxy()構造客戶端代理對象,經過代理對象訪問遠程端的方法
有了上面的幾步,咱們如今就能夠本身寫一個RPC小程序了
public interface RPCService { public static final long versionID = 10010L;//版本號,不一樣版本號的RPC Client和Server之間不能相互通訊 public String sayHi(String name); }
public class RPCServer implements RPCService { @Override public String sayHi(String name) { return "server response"+name; } public static void main(String[] args) throws HadoopIllegalArgumentException, IOException { Configuration conf = new Configuration(); Server server = new RPC.Builder(conf)// .setProtocol(RPCService.class)// .setBindAddress("10.30.100.11")// 服務器地址 .setPort(1234)//端口 .setInstance(new RPCServer())// 設置託管對象 .build(); server.start(); } }
啓動服務器以後,服務器就在指定端口監聽客戶端的請求。服務器就處於監聽狀態等待客戶端請求到達。
public class RPCClient { public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); //接口類型,就能夠調用接口中的方法.在客戶端獲取代理對象,有了代理對象就能夠調用目標對象(RPCServer)的方法了 RPCService proxy = RPC.getProxy(RPCService.class, 10010, new InetSocketAddress("10.30.100.11", 1234), conf);//server的代理對象,server必須實現這個接口 String result = proxy.sayHi("boyaa"); System.out.println(result); RPC.stopProxy(proxy); } }