this.jobClient = (InterTrackerProtocol) UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<Object>() { public Object run() throws IOException { return RPC.waitForProxy(InterTrackerProtocol.class, InterTrackerProtocol.versionID, jobTrackAddr, fConf); } });
VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
private static class Invoker implements InvocationHandler { private Client.ConnectionId remoteId; private Client client; ..... public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final boolean logDebug = LOG.isDebugEnabled(); long startTime = 0; if (logDebug) { startTime = System.currentTimeMillis(); } ObjectWritable value = (ObjectWritable) Invocation(method, args), remoteId); if (logDebug) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); } .... }
ObjectWritable value = (ObjectWritable) Invocation(method, args), remoteId);
其中call方法的第一個參數封裝調用方法和參數並實現Writable接口的對象,以便於在分佈式環境中傳輸,第二個參數勿需多言,它就用於惟一標識RPC Server,也就是與指定的Server進行通訊。call方法的核心代碼以下:服務器
public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException { Call call = new Call(param); Connection connection = getConnection(remoteId, call);//請看下面的說明 connection.sendParam(call); // 將參數封裝成一個call對象發送給Server boolean interrupted = false; synchronized (call) { while (!call.done) { try { call.wait(); // 等待Server發送的內容 } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } } ... return call.value; }
private Connection getConnection(ConnectionId remoteId,Call call) throws IOException, InterruptedException { do { synchronized (connections) { connection = connections.get(remoteId); if (connection == null) { connection = new Connection(remoteId); connections.put(remoteId, connection); } } } while (!connection.addCall(call)); ... connection.setupIOstreams(); return connection; }
public void run() { while (waitForWork()) {//等到能夠讀響應時返回true receiveResponse();
private void receiveResponse() { try { int id = in.readInt(); // 讀取鏈接id,以便從calls中取出相應的call對象 Call call = calls.get(id); int state = in.readInt(); // 讀取輸入流的狀態 if (state == Status.SUCCESS.state) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value call.setValue(value); calls.remove(id); } ... }
才疏學淺,錯誤之處在所不免,懇請各位予以指正。。 oop