網絡通訊模塊是分佈式系統中最底層的模塊,他直接支撐了上層分佈式環境下複雜的進程間通訊邏輯,是全部分佈式系統的基礎。遠程過程調用(RPC)是一種經常使用的分佈式網絡通訊協議,他容許運行於一臺計算機的程序調用另外一臺計算機的子程序,同時將網絡的通訊細節隱藏起來,使得用戶無需額外地爲這個交互做用編程,大大的簡化了分佈式程序開發
做爲一個分佈式文件系統,Hadoop實現了本身的RPC通訊協議,他是上層多個分佈式子系統(MapReduce,Yarn,HDFS等)公用的網絡通訊模塊
目錄
一. RPC通訊模型
二. Hadoop RPC的特色概述
三. RPC整體架構
四. Hadoop RPC使用方法
五. Hadoop RPC類詳解
六. Hadoop RPC參數調優
一. RPC通訊模型
RPC是一種提供網絡從遠程計算機上請求服務,但不須要了解底層網絡技術的協議
RPC一般採用客戶機/服務器模型。請求程序是一個客戶機,而服務提供程序則是一個服務器。一個典型的RPC框架,主要包括如下幾個部分 :
-
通訊模塊。兩個相互協做的通訊模塊實現請求 - 應答協議,它們在客戶和服務器之間傳遞請求和應答消息,通常不會對數據包進行任何處理。請求 - 應答協議的實現通常有同步方式和異步方式兩種
-
Stub程序。客戶端和服務器端均包含Stub程序,能夠將之看做代理程序。它使得遠程函數調用表現的跟本地調用同樣,對用戶程序徹底透明。在客戶端,Stub程序像一個本地程序,但不直接執行本地調用,而是將請求信息提供網絡模塊發送給服務器端,服務器端給客戶端發送應答後,客戶端Stub程序會解碼對應結果。在服務器端,Stub程序依次進行解碼請求消息中的參數、調用相應的服務過程和編碼應答結果的返回值等處理
-
調度程序。調度程序接收來自通訊模塊的請求信息,並根據其中的標識選擇一個Stub程序進行處理。一般客戶端併發請求量比較大時,會採用線程池提升處理效率
-
客戶程序/服務過程。請求的發出者和請求的處理者
![](http://static.javashuo.com/static/loading.gif)
同步模式和異步模式對比
一個RPC請求從發送到獲取處理結果,所經歷的步驟 :
- 客戶程序以本地方式調用系統產生的Stub程序
- 該Stub程序將函數調用信息按照網絡通訊模塊的要求封裝成消息包,並交給通訊模塊發送給遠程服務器端
- 遠程服務器端接收此消息後,將此消息發送給相應的Stub程序
- Stub程序拆封消息,造成被調過程要求的形式,並調用對應函數
- 被調用函數按照所獲參數執行,並將結果返回給Stub程序
- Stub程序將此結果封裝成消息,經過網絡通訊模塊逐級地傳送給客戶程序
二. Hadoop RPC的特色概述
RPC實際上時分佈式計算中C/S(Client/Server)模型的一個應用案例
對於Hadoop而言,RPC具備如下幾個特色 :
- 透明性。這是全部RPC框架最根本的特色,即當用戶在一臺計算機的程序調用另一臺計算機上的子程序時,用戶自身不該感受到其間設計機器間的通訊,而是感受像是在執行一個本地調用
- 高性能。Hadoop各個系統(HDFS,YARN,MapReduce等)均採用了Master/Slave架構,其中,Master其實是一個RPC Server,它負責處理集羣中全部Slave發送的服務請求,爲了保證Master的併發處理能力,RPC Server應是一個高性能服務器,可以高效地處理來自多個Client的併發RPC請求
- 可控性。RPC是Hadoop最底層最核心的模塊之一,保證其輕量級,高性能和可控性顯得尤其重要
三. RPC整體架構
Hadoop RPC主要分爲四個部分,分別是序列化層、函數調用層、網絡傳輸層和服務器端處理框架
具體實現機制 :
- 序列化層。序列化主要做用是將結構化對象轉爲字節流以便於經過網絡進行傳輸或寫入持久存儲,在RPC框架中,它主要是用於將用戶請求中的參數或者應答轉換成字節流以便跨機器傳輸
- 函數調用層。函數調用層主要功能是定位要調用的而函數並執行該函數,Hadoop RPC採用了Java反射機制與動態代理實現了函數調用
- 網絡傳輸層。網絡傳輸層描述了Client與Server之間消息傳輸的方式,Hadoop RPC採用了基礎TCP/IP的Socket機制
- 服務器端處理框架。服務器端處理框架可被抽象爲網絡I/O模型,它描述了客戶端與服務器間信息的交互方式,它的設計直接決定這服務器端的併發處理能力,而Hadoop RPC採用了基於Reactor設計模式的事件驅動I/O模型
![](http://static.javashuo.com/static/loading.gif)
Hadoop RPC整體架構java
Hadoop RPC整體架構自下而上可分爲兩層,第一層是一個基於Java NIO實現的客戶機 - 服務器通訊模型。其中,客戶端將用戶的調用方法及其參數封裝成請求包後發送到服務器端。服務器端收到請求包後,經解包、調用參數、打包結果等一系列操做後,將結果返回給客戶端。爲了加強Server端的擴展性和併發處理能力,Hadoop RPC採用了基於事件驅動的Reactor設計模式,在具體實現時,用到了JDK提供的各類功能包,主要包括java.nio、java.lang.reflect(反射機制和動態代理)、java.net(網絡編程)等。第二層是供更上層程序直接調用的RPC接口,這些接口底層即爲C/S通訊模型
四. Hadoop RPC使用方法
Hadoop RPC對外主要提供了兩種接口(見類org.apache.hadoop.ipc.RPC),分別是 :
- public static <T>ProtocolProxy <T> getProxy/waitForProxy() : 構造一個客戶端代理對象,用於向服務器發送RPC請求
- public static Server RPC.Builder (Configuration).build() : 爲某個協議實例構造一個服務器對象,用於處理客戶端發送的請求
使用Hadoop RPC分爲如下4個步驟 :
(1) 定義RPC協議
RPC協議是客戶端和服務器端之間的通訊接口,它定義了服務器端對外提供的服務接口。以下所示,咱們定義一個ClientProtocol通訊接口,聲明瞭echo和add兩個方法。Hadoop中全部自定義RPC接口都須要繼承VersionedProtocol接口,它描述了協議的版本信息
interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol {
public static final long versionID = 1L;
String echo(String value) throws IOException;
int add(int v1 , int v2) throws IOException;
}
(2) 實現RPC協議
Hadoop RPC協議一般是一個Java接口,用戶須要實現該接口
public static class ClientProtocolImpl implements ClientProtocol {
//重載的方法,用於獲取自定義的協議版本號
public long getProtocolVersion(String protocol, long clientVersion) {
return ClientProtocol.versionID;
}
//重載的方法,用於獲取協議簽名
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, inthashcode) {
return new ProtocolSignature(ClientProtocol.versionID, null);
}
public String echo(String value) throws IOException {
return value;
}
public int add(int v1, int v2) throws IOException {
return v1 + v2;
}
}
(3) 構造並啓動RPC Server
直接使用靜態類Builder構造一個RPC Server,並調用函數start()啓動該Server
Server server = new RPC.Builder(conf).setProtocol(ClientProtocol.class)
.setInstance(new ClientProtocolImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(5).build();
server.start();
// BindAddress : 服務器的HOST
// Port : 監聽端口號,0表明系統隨機選擇一個端口號
// NumHandlers : 服務端處理請求的線程數目
(4) 構造RPC Client併發送RPC請求
使用靜態方法getProxy構造客戶端代理對象
proxy = (ClientProtocol)RPC.getProxy(
ClientProtocol.class, ClientProtocol.versionID, addr, conf);
int result = proxy.add(5, 6);
String echoResult = proxy.echo("result");
五. Hadoop RPC類詳解
Hadoop RPC主要有三個大類組成,即RPC、Client、Server,分別對應對外編程接口、客戶端實現、服務器實現
ipc.RPC實現
RPC類其實是對底層客戶機 - 服務器網絡模型的封裝,以便爲程序員提供一套更方便簡潔的編程接口
RPC類定義了一系列構建和銷燬RPC客戶端的方法,構建方法分爲getProxy和waitForProxy兩類,銷燬方只有一個,即爲stopProxy。RPC服務器的構建則由靜態內部類RPC.Builder,該類提供了一些方法共用戶設置一些基本的參數,設置完成參數,可調用build()完成一個服務器對象的構建,調用start()方法啓動該服務器
ipc.Client
Client主要完成的功能是發送遠程過程調用信息並接收執行結果。
Client內部有兩個重要的內部類,分別是 :
- Call類 : 封裝了一個RPC請求,它包含5個成員變量,分別是惟一標識ID、函數調用信息param、函數執行返回值value、出錯或者異常信息error和執行完成標識符done。因爲Hadoop RPC Server採用異步方式處理客戶端請求,這使遠程過程調用的發生順序與結果返回順序無直接關係,而Client端正式提供ID識別不一樣的函數調用的。當客戶端向服務器端發送請求時,只需填充id和param兩個變量,而剩下的三個變量則由服務器根據函數執行狀況填充
- Connection類 : Client與每一個Server之間維護一個通訊鏈接,與該鏈接相關的基本信息及操做被封裝到Connection類中,基本信息主要包括通訊鏈接惟一標識、與Server端通訊的Socket、網絡輸入數據流(in)、網絡輸出數據流(out)、保存RPC請求的哈希表(calls)等。操做則包括 :
- addCall -- 將一個Call對象添加到哈希表中
- sendParam -- 向服務器端發送RPC請求
- receiveResponse -- 從服務器端接收已經處理完成的RPC請求
- run -- Connection是一個線程類,它的run方法調用了receiveResponse方法,會一直等待接收RPC返回結果
當調用Call函數執行某個遠程方法時,Client端須要及進行如下四個步驟 :
- 建立一個Connection對象,並將遠程方法調用信息封裝成Call對象,放到Connection對象中的哈希表中
- 調用Connection類中的sendRpcRequest()方法將當前Call對象發送給Server端
- Server端處理完RPC請求後,將結果經過網絡返回給Client端,Client端經過receiveRpcResponse()函數獲取結果
- Client檢查結果處理狀態,並將對應Call對象從哈希表中刪除
![](http://static.javashuo.com/static/loading.gif)
Hadoop RPC Client處理過程
ipc.Server
Hadoop採用了Master/Slave結構,其中Master是整個系統的單點,這是制約系統性能和可擴展性的最關鍵因素之一
ipc.Server採用了不少提升併發處理能力的技術,主要包括線程池、事件驅動和Reactor設計模式等
Reactor是併發編程中一種基於事件驅動的設計模式,它具備如下兩個特色 :
- 經過派發/分離IO操做事件提升系統的併發性能
- 提供了粗粒度的併發控制,使用單線程實現,避免了複雜的同步處理
ipc.Server實際上實現了一個典型的Reactor設計模式,典型的Reactor設計模式中主要包括如下幾個角色 :
- Reactor : I/O事件的派發者
- Acceptor : 接受來自Client的鏈接,創建與Client對應的Handler,並向Reactor註冊此Handler
- Handler : 與一個Client通訊的實體,並按必定的過程實現業務的處理
- Reader/Sender : 爲了加速處理速度,Reactor模式每每構建一個存放數據處理線程的線程池,這樣數據讀出後,當即扔到線程吃中等待後續處理便可。爲此,Reactor模式通常分離Handler中的讀和寫兩個過程,分別註冊成單獨的讀事件和寫事件,並由對應的Reader和Sender線程處理
![](http://static.javashuo.com/static/loading.gif)
Hadoop RPC Server處理過程
ipc.Server處理過程被劃分紅3個階段 : 接收請求、處理請求和返回結果
接收請求
該階段主要任務是接收來自各個客戶端的RPC請求,並將它們封裝成固定的格式(Call類)放到一個共享隊列(CallQueue)中,該階段內部又分爲創建鏈接和接收請求兩個子階段,分別由Listener和Reader兩種線程完成
整個Server只有一個Listener線程,統一負責監聽來自客戶端的鏈接請求,一旦由新的請求到達,它會採用輪詢的方式從線程池中選擇一個Reader線程進行處理,而Reader線程可同時存在多個,它們分別負責接收一部分客戶端鏈接的RPC請求,至於每一個Reader線程負責哪些客戶端鏈接,徹底由Listener決定,當前Listener只是採用了簡單的輪詢分配機制
Listener和Reader線程內部各自包含一個Selector對象,分別用於監聽SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。對於Listener線程,主循環的實現體是監聽是否有新的鏈接請求到達,並採用輪詢策略選擇一個Reader線程處理新鏈接;對於Reader線程,主循環的實現體是監聽客戶端鏈接中是否有新的RPC請求到達,並將新的RPC請求封裝成Call對象,放到共享隊列中
處理請求
該階段主要任務是從共享隊列中獲取call對象,執行對應的函數調用,並將結果返回給客戶端,這所有由Handler線程完成
Server端可同時存在多個Handler線程,它們並行從共享隊列中讀取Call對象,經執行對應的函數調用後,將嘗試着直接將結果返回給對應的客戶端。但考慮到某些函數調用返回結果很大或者網絡速度很慢,可能難以將結果一次性發送給客戶端,此時Handler將嘗試着將後續發送任務交給Responder線程
返回結果
Server僅存一個Responder線程,它的內部包含一個Selector對象,用於監聽SelectionKey.OP_WRITE事件。當Handler沒能將結果一次性發送到客戶端時,會向該Selector對象註冊SelectionKey.OP_WRITE事件,進而由Responder線程採用異步方式繼續發送未發送完成的結果
六. Hadoop RPC參數調優
HadoopRPC對外提供一些可配置參數,以便於用戶根據業務需求和硬件環境對其進行調優 :
- Reader線程數目。參數ipc.server.read.threadpool.size設置
- 每一個Handler線程對應的最大Call數目。參數ipc.server.handler.queue.size設置
- Handler線程數目。參數yarn.resourcemanager.resource-tracker.client.thread-count和dfs.namenode.service.handler.count設置
- 客戶端最大重試次數。參數ipc.client.connect.max.size設置
我天天會寫文章記錄大數據技術從入門到精通,能夠關注個人公衆號"SmallBird技術分享",咱們一塊兒學習分享,而且回覆'分享'會有大數據資源驚喜等着你~