this.jobClient = (InterTrackerProtocol) UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<Object>() { public Object run() throws IOException { return RPC.waitForProxy(InterTrackerProtocol.class, InterTrackerProtocol.versionID, jobTrackAddr, fConf); } });
它是經過調用RPC類中的靜態方法waitForProxy()方法而獲得了InterTrackerProtocol的一個代理,藉助於這個代理對象,TaskTracker就能夠與JobTracker進行通訊了。java
VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
跟蹤Hadoop的源代碼,咱們能夠發現PRC.waitForProxy()最終是調用的Proxy.newProxyInstance()來建立一個代理對象,第一個參數是類加載器(代理類在運行的過程當中動態生成),第二個參數是要實現的代理類的接口,第三個參數是InvokercationHandler接口的子類,最終調用的也就是InvokercationHandler實現類的的invoker()方法。node
private static class Invoker implements InvocationHandler { private Client.ConnectionId remoteId; private Client client; ..... public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final boolean logDebug = LOG.isDebugEnabled(); long startTime = 0; if (logDebug) { startTime = System.currentTimeMillis(); } ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId); if (logDebug) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); } .... }
咱們能夠看到,InvocationHandler的實現類Invoker中主要包含兩個成員變量即remoteId(惟一標識RPC的服務器端)、Client(經過工廠模式獲得的客戶端),invoke()方法中最重要的就是下面的語句:apache
ObjectWritable value = (ObjectWritable)client.call(new Invocation(method, args), remoteId);
其中call方法的第一個參數封裝調用方法和參數並實現Writable接口的對象,以便於在分佈式環境中傳輸,第二個參數勿需多言,它就用於惟一標識RPC Server,也就是與指定的Server進行通訊。call方法的核心代碼以下:服務器
public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException { Call call = new Call(param); Connection connection = getConnection(remoteId, call);//請看下面的說明 connection.sendParam(call); // 將參數封裝成一個call對象發送給Server boolean interrupted = false; synchronized (call) { while (!call.done) { try { call.wait(); // 等待Server發送的內容 } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } } ... return call.value; }
其中居然出現了一個Call對象,咱們看到此方法返回的結果是call對象的一個成員變量,也就是說Call封裝了Client的請求以及Server的響應,synchronized的使用會同步Client的請求以及Server的響應。通Connection對象的sendParam方法能夠將請求發送給Server,那麼Connection又是什麼呢?分佈式
private Connection getConnection(ConnectionId remoteId,Call call) throws IOException, InterruptedException { do { synchronized (connections) { connection = connections.get(remoteId); if (connection == null) { connection = new Connection(remoteId); connections.put(remoteId, connection); } } } while (!connection.addCall(call)); ... connection.setupIOstreams(); return connection; }
其實Connection是擴展Thread而獲得的一個線程,最終把全部的connection對象都放入到一個Hashtable中,同一個ConnectionId的Connection能夠複用,下降了建立線程的開銷。connection.setupIOstreams()用於在真正的創建鏈接,並將RPC的header寫入到輸出流中,經過start方法啓動線程,其核心代碼以下所示:ide
public void run() { while (waitForWork()) {//等到能夠讀響應時返回true receiveResponse();
}
receiveResponse方法主要是從輸入流反序列化出value,並將其封裝在call對象中,這樣client端就獲得了server的響應,核心代碼以下:函數
private void receiveResponse() { try { int id = in.readInt(); // 讀取鏈接id,以便從calls中取出相應的call對象 Call call = calls.get(id); int state = in.readInt(); // 讀取輸入流的狀態 if (state == Status.SUCCESS.state) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value call.setValue(value); calls.remove(id); } ... }
才疏學淺,錯誤之處在所不免,懇請各位予以指正。。 oop