Hadoop 中 RPC 機制詳解之 Client 端java
1. Server.Listenerapache
RPC Client 端的 RPC 請求發送到 Server 端後, 首先由 Server.Listener 接收數組
Server.Listener 類繼承自 Thread 類, 監聽了 OP_READ 和 OP_ACCEPT 事件app
Server.Listener 接收 RPC 請求, 在 Server.Listener.doRead() 方法中讀取數據, 在 doRead() 方法中又調用了Server.Connection.readAndProcess() 方法, ide
最後會調用 Server.Connection.processRpcRequest() 方法, 源碼以下:oop
private void processRpcRequest(RpcRequestHeaderProto header, DataInputStream dis) throws WrappedRpcServerException, InterruptedException { ... Writable rpcRequest; // 從成員變量dis中反序列化出Client端發送來的RPC請求( WritableRpcEngine.Invocation對象 ) try { //Read the rpc request rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf); rpcRequest.readFields(dis); } catch (Throwable t) { // includes runtime exception from newInstance ... } // 構造Server端Server.Call實例對象 Call call = new Call(header.getCallId(), header.getRetryCount(), rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header .getClientId().toByteArray()); // 將Server.Call實例對象放入調用隊列中 callQueue.put(call); // queue the call; maybe blocked here incRpcCount(); // Increment the rpc count }
調用隊列 callQueue 是 Server 的成員變量, Server.Listener 和 Server.Handler 是典型的生產者, 消費者模型, this
Server.Listener( 生產者 )的doRead()方法最終調用Server.Connection.processRpcRequest() 方法, spa
而Server.Handler( 消費者 )處理RPC請求.net
2. Server.Handler 繼承 Thread 類, 其主要工做是處理 callQueue 中的調用, 都在 run() 方法中完成. 在 run() 的主循環中, 每次處理一個從 callQueue 中出隊的請求, Server.call() 是一個抽象方法, 實際是調用了 RPC.Server.call()方法, 最後經過 WritableRPCEngine.call() 方法完成 Server 端方法調用code
/** Handles queued calls . */ private class Handler extends Thread { ... @Override public void run() { ... ByteArrayOutputStream buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); while (running) { ... final Call call = callQueue.take(); // 獲取一個RPC調用請求 ... Writable value = null; value = call.connection.user.doAs(new PrivilegedExceptionAction<Writable>() { @Override public Writable run() throws Exception { // 調用RPC.Server.call()方法 // call.rpcKind : RPC調用請求的類型, 通常爲Writable // call.connection.protocolName : RPC協議接口的類名 // call.rpcRequest : Invocation實例對象, 包括方法名, 參數列表, 參數列表的Class對象數組 // call.timestamp : 調用時間戳 return call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp); } }); } ... } }
RPC.Server.call() 方法以下:
@Override public Writable call(RPC.RpcKind rpcKind, String protocol, Writable rpcRequest, long receiveTime) throws Exception { return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest, receiveTime); }
最後經過 WritableRPCEngine.call() 方法完成 Server 端方法調用, 代碼以下:
@Override public Writable call(org.apache.hadoop.ipc.RPC.Server server, String protocolName, Writable rpcRequest, long receivedTime) throws IOException, RPC.VersionMismatch { Invocation call = (Invocation)rpcRequest; // 將RPC請求強制轉成WritableRpcEngine.Invocation對象 ... long clientVersion = call.getProtocolVersion(); final String protoName; ProtoClassProtoImpl protocolImpl; // Server端RPC協議接口的實現類的實例對象 ... // Invoke the protocol method try { ... // 獲取RPC請求中調用的方法對象Method Method method = protocolImpl.protocolClass.getMethod(call.getMethodName(), call.getParameterClasses()); method.setAccessible(true); ... // 在Server端RPC協議接口的實現類的實例對象 protocolImpl 上調用具體的方法 Object value = method.invoke(protocolImpl.protocolImpl, call.getParameters()); ... // 調用正常結束, 返回調用結果 return new ObjectWritable(method.getReturnType(), value); } catch (InvocationTargetException e) { // 調用出現異常, 用IOException包裝異常, 最後拋出該異常 Throwable target = e.getTargetException(); if (target instanceof IOException) { throw (IOException)target; } else { IOException ioe = new IOException(target.toString()); ioe.setStackTrace(target.getStackTrace()); throw ioe; } } catch (Throwable e) { ... } } }
在 WritableRpcEngine.call() 方法中, 傳入的 rpcRequest 會被強制轉換成 WritableRpcEngine.Invocation 類型的對象 call , 並經過 call 這個對象包含的方法名(getMethodName()方法)和參數列表的 Class對象數組(getParameterClasses())獲取 Method 對象, 最終經過 Method 對象的invoke() 方法, 調用實現類的實例對象 protocolImpl 上的方法, 完成 Hadoop 的遠程過程調用
好了, 如今 Server 端的具體方法已經被調用了, 調用結果分兩種狀況:
1) 調用正常結束, 則將方法的返回值和調用結果封裝成一個 ObjectWritable 類型的對象, 並返回
2) 調用出現異常, 拋出 IOException 類型的異常
3. Server.Responder
這個類的功能: 發送 Hadoop 遠程過程調用的應答給 Client 端, Server.Responder 類繼承自 Thread 類, 監聽了 OP_WRITE 事件, 即通道可寫. 具體細節寫不下去了
總結:
Server.Responder 和 Server.Listener, Server.Handler 一塊兒配合, 完成 Hadoop 中 RPC 的 Server 端處理:
Server.Listener 接收 Client 端的鏈接請求和請求數據; Server.Handler 完成實際的過程調用; Server.Responder 則進行應答發送