RPC是Remote Procedure Call(遠程過程調用)的簡稱,這一機制都要面對兩個問題
java
對象調用方式;node
序列/反序列化機制web
在此以前,咱們有必要了解什麼是架構層次的協議。通俗一點說,就是我把某些接口和接口中的方法稱爲協議,客戶端和服務端只要實現這些接口中的方法就能夠進行通訊了,從這個角度來講,架構層次協議的說法就能夠成立了。Hadoop的RPC機制正是採用了這種「架構層次的協議」,有一整套做爲協議的接口,以下圖
sql
主要用來作方法的加強,讓你能夠在不修改源碼的狀況下,加強一些方法,在方法執行先後作任何你想作的事情(甚至根本不去執行這個方法),由於在InvocationHandler的invoke方法中,你能夠直接獲取正在調用方法對應的Method對象,具體應用的話,好比能夠添加調用日誌,作事務控制等。apache
這個接口的實現部署在其它服務器上,在編寫客戶端代碼的時候,沒辦法直接調用接口方法,由於接口是不能直接生成對象的,這個時候就能夠考慮代理模式(動態代理)了,經過Proxy.newProxyInstance代理一個該接口對應的InvocationHandler對象,而後在InvocationHandler的invoke方法內封裝通信細節就能夠了。具體的應用,最經典的固然是Java標準庫的RMI,其它好比hessian,各類webservice框架中的遠程調用,大體都是這麼實現的。緩存
VersionedProtocol是全部RPC協議接口的父接口,其中只有一個方法:getProtocolVersion()服務器
ClientDatanodeProtocol:一個客戶端和datanode之間的協議接口,用於數據塊恢復。markdown
ClientProtocol:client與Namenode交互的接口,全部控制流的請求均在這裏,如:建立文件、刪除文件等;網絡
DatanodeProtocol : Datanode與Namenode交互的接口,如心跳、blockreport等;
NamenodeProtocol:SecondaryNode與Namenode交互的接口。架構
InterDatanodeProtocol:Datanode內部交互的接口,用來更新block的元數據;
InnerTrackerProtocol:TaskTracker與JobTracker交互的接口,功能與DatanodeProtocol類似;
JobSubmissionProtocol:JobClient與JobTracker交互的接口,用來提交Job、得到Job等與Job相關的操做;
TaskUmbilicalProtocol:Task中子進程與母進程交互的接口,子進程即map、reduce等操做,母進程即TaskTracker,該接口能夠回報子進程的運行狀態(詞彙掃盲: umbilical 臍帶的, 關係親密的) 。
簡單來講,Hadoop RPC=動態代理+定製的二進制流。分佈式對象通常都會要求根據接口生成存根和框架。如 CORBA,能夠經過 IDL,生成存根和框架。在ipc.RPC類中有一些內部類,下邊簡單介紹下
Invocation:用於封裝方法名和參數,做爲數據傳輸層,至關於VO吧。
ClientCache:用於存儲client對象,用socket factory做爲hash key,存儲結構爲hashMap <SocketFactory, Client>
。
Invoker:是動態代理中的調用實現類,繼承了InvocationHandler.
Server:是ipc.Server的實現類。咱們就須要這樣的步驟了。
上類圖
Dynamic Proxy 是由兩個 class 實現的:java.lang.reflect.Proxy
和 java.lang.reflect.InvocationHandler
,後者是一個接口。
所謂 Dynamic Proxy 是這樣一種 class:它是在運行時生成的 class,在生成它時你必須提供一組 interface 給它,而後該 class就宣稱它實現了這些 interface。
這個 Dynamic Proxy 其實就是一個典型的 Proxy 模式,它丌會替你做實質性的工做,在生成它的實例時你必須提供一個handler,由它接管實際的工做。
這個 handler,在 Hadoop 的 RPC 中,就是 Invoker 對象。
咱們能夠簡單地理解:就是你能夠經過一個接口來生成一個類,這個類上的全部方法調用,都會傳遞到你生成類時傳遞的
InvocationHandler 實現中。
在 Hadoop 的 RPC 中,Invoker 實現了 InvocationHandler 的 invoke 方法(invoke 方法也是 InvocationHandler 的惟一方法)。 Invoker 會把全部跟此次調用相關的調用方法名,參數類型列表,參數列表打包,而後利用前面咱們分析過的 Client,經過 socket 傳遞到服務器端。就是說,你在 proxy 類上的任何調用,都經過 Client 發送到遠方的服務器上。
Invoker 使用 Invocation。 Invocation 封裝了一個過程調用的全部相關信息,它的主要屬性有: methodName,調用方法名,parameterClasses,調用方法參數的類型列表和 parameters,調用方法參數。注意,它實現了 Writable 接口,能夠串行化。
RPC.Server 實現了 org.apache.hadoop.ipc.Server
,你能夠把一個對象,經過 RPC,升級成爲一個服務器。服務器接收到的請求(經過 Invocation),解串行化之後,就發成了方法名,方法參數列表和參數列表。調用 Java 反射,咱們就能夠調用對應的對象的方法。調用的結果再經過 socket,迒回給客戶端,客戶端把結果解包後,就能夠返回給Dynamic Proxy 的使用者了。
咱們接下來去研究的就是RPC.Invoker類中的invoke()方法了,代碼以下
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { …… ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId); …… return value.get(); }
通常咱們看到的動態代理的invoke()方法中總會有 method.invoke(ac, arg); 這句代碼。而上面代碼中卻沒有。其實使用 method.invoke(ac, arg); 是在本地JVM中調用;而在hadoop中,是將數據發送給服務端,服務端將處理的結果再返回給客戶端,因此這裏的invoke()方法必然須要進行網絡通訊。而網絡通訊就是下面的這段代碼實現的:
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), remoteId);
Invocation類在這裏封裝了方法名和參數,充當VO。其實這裏網絡通訊只是調用了Client類的call()方法。
接下來分析一下ipc.Client源碼,在此以前咱們得明確下咱們的目標,總結出瞭如下幾個問題
客戶端和服務端的鏈接是怎樣創建的?
客戶端是怎樣給服務端發送數據的?
客戶端是怎樣獲取服務端的返回數據的?
基於這三個問題,咱們開始分析ipc.Client源碼,主要包含如下幾個類
Call:用於封裝Invocation對象,做爲VO,寫到服務端,同時也用於存儲從服務端返回的數據。
Connection:用以處理遠程鏈接對象。繼承了Thread
ConnectionId:惟一肯定一個鏈接
Question1:客戶端和服務端的鏈接是怎樣創建的?
Client類中的cal()方法以下
public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException { Call call = new Call(param); //將傳入的數據封裝成call對象 Connection connection = getConnection(remoteId, call); //得到一個鏈接
connection.sendParam(call); // 向服務端發送call對象 boolean interrupted = false;
synchronized (call) { while (!call.done) { try { call.wait(); // 等待結果的返回,在Call類的callComplete()方法裏有notify()方法用於喚醒線程 } catch (InterruptedException ie) { // 因中斷異常而終止,設置標誌interrupted爲true interrupted = true;
}
}
if (interrupted)
{
Thread.currentThread().interrupt();
}
if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace();
throw call.error;
}
else // 本地異常
{
throw wrapException(remoteId.getAddress(), call.error);
}
}
else
{
return call.value; //返回結果數據
}
}
}
具體代碼的做用我已作了註釋,因此這裏再也不贅述。分析代碼後,咱們會發現和網絡通訊有關的代碼只會是下面的兩句了:
Connection connection = getConnection(remoteId, call); //得到一個鏈接
connection.sendParam(call); // 向服務端發送call對象
先看看是怎麼得到一個到服務端的鏈接吧,下面貼出ipc.Client類中的getConnection()方法。
private Connection getConnection(ConnectionId remoteId,
Call call)
throws IOException, InterruptedException
{
if (!running.get())
{
// 若是client關閉了
throw new IOException("The client is stopped");
}
Connection connection;
//若是connections鏈接池中有對應的鏈接對象,就不需從新建立了;若是沒有就需從新建立一個鏈接對象。
//但請注意,該//鏈接對象只是存儲了remoteId的信息,其實還並無和服務端創建鏈接。
do
{
synchronized (connections)
{
connection = connections.get(remoteId);
if (connection == null)
{
connection = new Connection(remoteId);
connections.put(remoteId, connection);
}
}
}
while (!connection.addCall(call)); //將call對象放入對應鏈接中的calls池,就不貼出源碼了
//這句代碼纔是真正的完成了和服務端創建鏈接哦~
connection.setupIOstreams();
return connection;
}
Client.Connection類中的setupIOstreams()方法以下:
private synchronized void setupIOstreams() throws InterruptedException
{
……
try
{
……
while (true)
{
setupConnection(); //創建鏈接
InputStream inStream = NetUtils.getInputStream(socket); //得到輸入流
OutputStream outStream = NetUtils.getOutputStream(socket); //得到輸出流
writeRpcHeader(outStream);
……
this.in = new DataInputStream(new BufferedInputStream
(new PingInputStream(inStream))); //將輸入流裝飾成DataInputStream
this.out = new DataOutputStream
(new BufferedOutputStream(outStream)); //將輸出流裝飾成DataOutputStream
writeHeader();
// 跟新活動時間
touch();
//當鏈接創建時,啓動接受線程等待服務端傳回數據,注意:Connection繼承了Tread
start();
return;
}
}
catch (IOException e)
{
markClosed(e);
close();
}
}
再有一步咱們就知道客戶端的鏈接是怎麼創建的啦,下面貼出Client.Connection類中的setupConnection()方法
private synchronized void setupConnection() throws IOException {
short ioFailures = 0;
short timeoutFailures = 0;
while (true) {
try {
this.socket = socketFactory.createSocket(); //終於看到建立socket的方法了
this.socket.setTcpNoDelay(tcpNoDelay);
……
// 設置鏈接超時爲20s
NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
this.socket.setSoTimeout(pingInterval);
return;
} catch (SocketTimeoutException toe) {
/* 設置最多鏈接重試爲45次。 * 總共有20s*45 = 15 分鐘的重試時間。 */
handleConnectionFailure(timeoutFailures++, 45, toe);
} catch (IOException ie) {
handleConnectionFailure(ioFailures++, maxRetries, ie);
}
}
}
不難看出客戶端的鏈接是建立一個普通的socket進行通訊的。
Question2:客戶端是怎樣給服務端發送數據的?
Client.Connection類的sendParam()方法以下
public void sendParam(Call call) {
if (shouldCloseConnection.get()) {
return;
}
DataOutputBuffer d=null;
try {
synchronized (this.out) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
//建立一個緩衝區
d = new DataOutputBuffer();
d.writeInt(call.id);
call.param.write(d);
byte[] data = d.getData();
int dataLength = d.getLength();
out.writeInt(dataLength); //首先寫出數據的長度
out.write(data, 0, dataLength); //向服務端寫數據
out.flush();
}
} catch(IOException e) {
markClosed(e);
} finally {
IOUtils.closeStream(d);
}
}
Question3:客戶端是怎樣獲取服務端的返回數據的?
Client.Connection類和Client.Call類中的相關方法以下
Method1:
public void run() {
……
while (waitForWork()) {
receiveResponse(); //具體的處理方法
}
close();
……
}
Method2:
private void receiveResponse() {
if (shouldCloseConnection.get()) {
return;
}
touch();
try {
int id = in.readInt(); // 阻塞讀取id
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + id);
Call call = calls.get(id); //在calls池中找到發送時的那個對象
int state = in.readInt(); // 阻塞讀取call對象的狀態
if (state == Status.SUCCESS.state) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // 讀取數據
//將讀取到的值賦給call對象,同時喚醒Client等待線程,貼出setValue()代碼Method3
call.setValue(value);
calls.remove(id); //刪除已處理的call
} else if (state == Status.ERROR.state) {
……
} else if (state == Status.FATAL.state) {
……
}
} catch (IOException e) {
markClosed(e);
}
}
Method3:
public synchronized void setValue(Writable value) {
this.value = value;
callComplete(); //具體實現
}
protected synchronized void callComplete() {
this.done = true;
notify(); // 喚醒client等待線程
}
啓動一個處理線程,讀取從服務端傳來的call對象,將call對象讀取完畢後,喚醒client處理線程。就這麼簡單,客戶端就獲取了服務端返回的數據。客戶端的源碼分析暫時到這,下面咱們來分析Server端的源碼
內部類以下
Call :用於存儲客戶端發來的請求
Listener : 監聽類,用於監聽客戶端發來的請求,同時Listener內部還有一個靜態類,Listener.Reader,當監聽器監聽到用戶請求,便讓Reader讀取用戶請求。
Responder :響應RPC請求類,請求處理完畢,由Responder發送給請求客戶端。
Connection :鏈接類,真正的客戶端請求讀取邏輯在這個類中。
Handler :請求處理類,會循環阻塞讀取callQueue中的call對象,並對其進行操做。
hadoop是怎樣初始化RPC的Server端的呢?
Namenode初始化時必定初始化了RPC的Sever端,那咱們去看看Namenode的初始化源碼
private void initialize(Configuration conf) throws IOException {
……
// 建立 rpc server
InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
if (dnSocketAddr != null) {
int serviceHandlerCount =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
//得到serviceRpcServer
this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),
dnSocketAddr.getPort(), serviceHandlerCount,
false, conf, namesystem.getDelegationTokenSecretManager());
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
setRpcServiceServerAddress(conf);
}
//得到server
this.server = RPC.getServer(this, socAddr.getHostName(),
socAddr.getPort(), handlerCount, false, conf, namesystem
.getDelegationTokenSecretManager());
……
this.server.start(); //啓動 RPC server Clients只容許鏈接該server
if (serviceRpcServer != null) {
serviceRpcServer.start(); //啓動 RPC serviceRpcServer 爲HDFS服務的server
}
startTrashEmptier(conf);
}
RPC的server對象是經過ipc.RPC類的getServer()方法得到的。ipc.RPC類中的getServer()源碼以下
public static Server getServer(final Object instance, final String bindAddress, final int port,
final int numHandlers,
final boolean verbose, Configuration conf,
SecretManager <? extends TokenIdentifier > secretManager)
throws IOException
{
return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
}
getServer()是一個建立Server對象的工廠方法,但建立的倒是RPC.Server類的對象。
初始化Server後,Server端就運行起來了,看看ipc.Server的start()源碼
/** 啓動服務 */
public synchronized void start()
{
responder.start(); //啓動responder
listener.start(); //啓動listener
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++)
{
handlers[i] = new Handler(i);
handlers[i].start(); //逐個啓動Handler
}
}
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// 建立ServerSocketChannel,並設置成非阻塞式
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// 將server socket綁定到本地端口
bind(acceptChannel.socket(), address, backlogLength);
port = acceptChannel.socket().getLocalPort();
// 得到一個selector
selector= Selector.open();
readers = new Reader[readThreads];
readPool = Executors.newFixedThreadPool(readThreads);
//啓動多個reader線程,爲了防止請求多時服務端響應延時的問題
for (int i = 0; i < readThreads; i++) {
Selector readSelector = Selector.open();
Reader reader = new Reader(readSelector);
readers[i] = reader;
readPool.execute(reader);
}
// 註冊鏈接事件
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}
在啓動Listener線程時,服務端會一直等待客戶端的鏈接,下面貼出Server.Listener類的run()方法
public void run()
{
……
while (running)
{
SelectionKey key = null;
try
{
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext())
{
key = iter.next();
iter.remove();
try
{
if (key.isValid())
{
if (key.isAcceptable())
doAccept(key); //具體的鏈接方法
}
}
catch (IOException e)
{
}
key = null;
}
}
catch (OutOfMemoryError e)
{
……
}
Server.Listener類中doAccept ()方法中的關鍵源碼以下:
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c = null;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) { //創建鏈接
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
Reader reader = getReader(); //從readers池中得到一個reader
try {
reader.startAdd(); // 激活readSelector,設置adding爲true
SelectionKey readKey = reader.registerChannel(channel);//將讀事件設置成興趣事件
c = new Connection(readKey, channel, System.currentTimeMillis());//建立一個鏈接對象
readKey.attach(c); //將connection對象注入readKey
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
}
……
} finally {
//設置adding爲false,採用notify()喚醒一個reader,其實代碼十三中啓動的每一個reader都使
//用了wait()方法等待。因篇幅有限,就不貼出源碼了。
reader.finishAdd();
}
}
}
當reader被喚醒,reader接着執行doRead()方法。
Method1:
void doRead(SelectionKey key) throws InterruptedException {
int count = 0;
Connection c = (Connection)key.attachment(); //得到connection對象
if (c == null) {
return;
}
c.setLastContact(System.currentTimeMillis());
try {
count = c.readAndProcess(); // 接受並處理請求
} catch (InterruptedException ieo) {
……
}
……
}
Method2:
public int readAndProcess() throws IOException, InterruptedException {
while (true) {
……
if (!rpcHeaderRead) {
if (rpcHeaderBuffer == null) {
rpcHeaderBuffer = ByteBuffer.allocate(2);
}
//讀取請求頭
count = channelRead(channel, rpcHeaderBuffer);
if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
return count;
}
// 讀取請求版本號
int version = rpcHeaderBuffer.get(0);
byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
……
data = ByteBuffer.allocate(dataLength);
}
// 讀取請求
count = channelRead(channel, data);
if (data.remaining() == 0) {
……
if (useSasl) {
……
} else {
processOneRpc(data.array());//處理請求
}
……
}
}
return count;
}
}
Method1:
private void processOneRpc(byte[] buf) throws IOException,
InterruptedException {
if (headerRead) {
processData(buf);
} else {
processHeader(buf);
headerRead = true;
if (!authorizeConnection()) {
throw new AccessControlException("Connection from " + this
+ " for protocol " + header.getProtocol()
+ " is unauthorized for user " + user);
}
}
}
Method2:
private void processData(byte[] buf) throws IOException, InterruptedException {
DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
int id = dis.readInt(); // 嘗試讀取id
Writable param = ReflectionUtils.newInstance(paramClass, conf);//讀取參數
param.readFields(dis);
Call call = new Call(id, param, this); //封裝成call
callQueue.put(call); // 將call存入callQueue
incRpcCount(); // 增長rpc請求的計數
}
while (running) {
try {
final Call call = callQueue.take(); //彈出call,可能會阻塞
……
//調用ipc.Server類中的call()方法,但該call()方法是抽象方法,具體實如今RPC.Server類中
value = call(call.connection.protocol, call.param, call.timestamp);
synchronized (call.connection.responseQueue) {
setupResponse(buf, call,
(error == null) ? Status.SUCCESS : Status.ERROR,
value, errorClass, error);
……
//給客戶端響應請求
responder.doRespond(call);
}
}
void doRespond(Call call) throws IOException
{
synchronized (call.connection.responseQueue)
{
call.connection.responseQueue.addLast(call);
if (call.connection.responseQueue.size() == 1)
{
// 返回響應結果,並激活writeSelector
processResponse(call.connection.responseQueue, true);
}
}
}