RPC 實現一共有3個最重要的類,Client 客戶端、Server 服務端、RPC 三類,RPC實現主要是經過java NIO 、java 動態代理、java 反射的方式實現。node
本文只分析client 和RPC當前這兩部分,後續會加入Server端的部分。
與客戶端相關的RPCInvoker,與服務端相關的Server(是RPC的內部類而不是上面的Server服務端類)。RPC中還有一個跟RPC引擎相關的類,RPCKind 枚舉類,內容以下:算法
public enum RpcKind { RPC_BUILTIN ((short) 1), // 測試用 RPC_WRITABLE ((short) 2), // Use WritableRpcEngine RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size public final short value; //TODO make it private RpcKind(short val) { this.value = val; } }
能夠看出 Hadoop自從yarn的引入,Hadoop的序列化引擎已經不僅僅是writable了,新引入了google的protocol方式,所以引入了RPCEngine接口和對應的實現類ProtoBufRPCEngine和WritableRPCEngine。RPCEngine 是客戶端和服務端統一獲取IPC鏈接的地方(RPC類中也包含相關部分,最終經過RPCKind類選擇適當的引擎的實現類),客戶端經過getProxy獲取客戶端鏈接,服務端經過getServer獲取鏈接。apache
RPCEngine中的getProxy <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, AtomicBoolean fallbackToSimpleAuth) throws IOException
Class<T> protocol Hadoop各個角色之間的協議(2.0以後Hadoop協議接口都已經protocol化,不在採用writable方式)如客戶端和namenode之間的協議,namenode和datanode之間的協議都要接口化,各個接口中都相關的可用方法,IPC遠程調用其實就是調用這些接口的實現類中的方法。下面是客戶端和datanode之間的協議接口(下面的是爲了說明協議接口的應用,有必定了解的能夠略過):安全
public interface ClientDatanodeProtocol { public static final long versionID = 9L; /**返回一個副本的可見長度. */ long getReplicaVisibleLength(ExtendedBlock b) throws IOException; /** * 刷新聯合namenode名單,因爲configuration中的namenode節點的增長和中止已經 *刪除的namenode節點(2.x開始引入了聯合namenode的方式,namenode再也不是單一 *節點,分佈在多個節點上,每一個節點管理不一樣的目錄,如namenode1管理*/application1 ,namenode2管理/application2,每一個目錄互不干擾,其中某個namenode掛 *掉了,只是其管理的目錄下的*應用不可用,不會影響其餘的節點,datanode不變,任*何一個namenode均可以控制全部的*datanode ) * * @throws IOException on error **/ void refreshNamenodes() throws IOException; /** *刪除塊池目錄。若是「force」是false只有塊池目錄爲空時刪除,不然塊池與它的內容 *一併刪除。(此方法和新hdfs datanode數據管理相關,下章會講解) * * @param bpid Blockpool id to be deleted. * @param force If false blockpool directory is deleted only if it is empty * i.e. if it doesn't contain any block files, otherwise it is * deleted along with its contents. * @throws IOException */ void deleteBlockPool(String bpid, boolean force) throws IOException; /** * 檢索存儲在本地文件系統上的塊文件和元數據文件的路徑名。 * 爲了使此方法有效,下列狀況之一應知足 * 客戶端用戶必須在數據節點被配置成可以使用這一方法 * * 當啓用安全,Kerberos身份驗證必須可以鏈接到這個Datanode * * @param block * the specified block on the local datanode * @param token * the block access token. * @return the BlockLocalPathInfo of a block * @throws IOException * on error */ BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, Token<BlockTokenIdentifier> token) throws IOException; /** *檢索Datanode上有關一個list塊上卷位置信息。 *這是在一個不透明的形式{@link org.apache.hadoop.fs.VolumeId} *爲配置的每一個數據目錄,這是不能保證橫跨DN從新啓動同樣的。 * * @param blockPoolId the pool to query * @param blockIds * list of blocks on the local datanode * @param tokens * block access tokens corresponding to the requested blocks * @return an HdfsBlocksMetadata that associates {@link ExtendedBlock}s with * data directories * @throws IOException * if datanode is unreachable, or replica is not found on datanode */ HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId, long []blockIds, List<Token<BlockTokenIdentifier>> tokens) throws IOException; /** * 關閉一個datanode節點. * * @param forUpgrade If true, data node does extra prep work before shutting * down. The work includes advising clients to wait and saving * certain states for quick restart. This should only be used when * the stored data will remain the same during upgrade/restart. * @throws IOException */ void shutdownDatanode(boolean forUpgrade) throws IOException; /** * 獲取datanode元數據信息 * * @return software/config version and uptime of the datanode */ DatanodeLocalInfo getDatanodeInfo() throws IOException; /** * Asynchronously reload configuration on disk and apply changes. */ void startReconfiguration() throws IOException; /** *獲取以前發出的從新配置任務的狀態. * @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}. */ ReconfigurationTaskStatus getReconfigurationStatus() throws IOException; /** * 觸發一個新block report */ void triggerBlockReport(BlockReportOptions options) throws IOException; }
long clientVersion client標識
InetSocketAddress addr 訪問的服務端地址
UserGroupInformation ticket 用戶組信息
Configuration conf configuration配置信息
SocketFactory factory socket工廠用來生成socket鏈接(IPC通訊採用socket的TCP方式)
int rpcTimeout 超時時間
RetryPolicy connectionRetryPolicy 鏈接重試策略(直接失敗,重試和切換到另外一臺機器重試詳細見RetryPolicy類)
AtomicBoolean fallbackToSimpleAuth 是否退到通常用戶併發
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, AtomicBoolean fallbackToSimpleAuth) throws IOException { //Invoker 類實現了InvocationHandler final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth); //生成代理對象(此部分不熟悉看一下java的動態代理) return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[]{protocol}, invoker), false); }
isClosed 與鏈接關閉有關
remoteId Client端到Server端的鏈接id,Client會繼續分析
client Client對象
clientProtocolVersion 不一樣Hadoop版本之間的協議版本是不一致的,因此不能用2.1的版本與2.5的通訊
protocolName 協議名
returnTypes 緩存每一個協議接口中方法的返回類型(Message封裝Message是google protocolBuffer的消息序列化類)
private Invoker(Class<?> protocol, Client.ConnectionId connId, Configuration conf, SocketFactory factory) { this.remoteId = connId; // CLIENTS 是ClientCache類型的對象,其中緩存着全部訪問過的客戶端對象信息,若是是新的客戶端則構造新的client對象並將其緩存。 this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class); this.protocolName = RPC.getProtocolName(protocol); this.clientProtocolVersion = RPC .getProtocolVersion(protocol); }
@Override public Object invoke(Object proxy, Method method, Object[] args) throws ServiceException { long startTime = 0; if (LOG.isDebugEnabled()) { startTime = Time.now();//當前時間毫秒數 } if (args.length != 2) { // 參數必須是2個RpcController + Message throw new ServiceException("Too many parameters for request. Method: [" + method.getName() + "]" + ", Expected: 2, Actual: " + args.length); } if (args[1] == null) { throw new ServiceException("null param while calling Method: [" + method.getName() + "]"); } //追述信息相關, TraceScope traceScope = null; // if Tracing is on then start a new span for this rpc. // guard it in the if statement to make sure there isn't // any extra string manipulation. if (Trace.isTracing()) { traceScope = Trace.startSpan(RpcClientUtil.methodToTraceString(method)); } //RPC請求頭信息,相似http中的請求頭同樣,客戶端和服務端都要先發送頭信息,而後在發送內容。注意,構造頭信息是將method放入了請求中,在服務端接受時就會知道調用哪一個方法。 RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method); if (LOG.isTraceEnabled()) { LOG.trace(Thread.currentThread().getId() + ": Call -> " + remoteId + ": " + method.getName() + " {" + TextFormat.shortDebugString((Message) args[1]) + "}"); } //method的參數信息,method反射是用到。 Message theRequest = (Message) args[1]; // server端返回的結果 final RpcResponseWrapper val; try { // 調用client(client已經在構造方法裏生成了對應的對象)類中的call方法(client類中會具體分析該方法)返回server端的返回結果 val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER, new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId, fallbackToSimpleAuth); } catch (Throwable e) { if (LOG.isTraceEnabled()) { LOG.trace(Thread.currentThread().getId() + ": Exception <- " + remoteId + ": " + method.getName() + " {" + e + "}"); } if (Trace.isTracing()) { traceScope.getSpan().addTimelineAnnotation( "Call got exception: " + e.getMessage()); } throw new ServiceException(e); } finally { if (traceScope != null) traceScope.close(); } if (LOG.isDebugEnabled()) { long callTime = Time.now() - startTime; LOG.debug("Call: " + method.getName() + " took " + callTime + "ms"); } Message prototype = null; try { //獲取method的返回類型 prototype = getReturnProtoType(method); } catch (Exception e) { throw new ServiceException(e); } Message returnMessage; try { //將返回值message序列化 returnMessage = prototype.newBuilderForType() .mergeFrom(val.theResponseRead).build(); if (LOG.isTraceEnabled()) { LOG.trace(Thread.currentThread().getId() + ": Response <- " + remoteId + ": " + method.getName() + " {" + TextFormat.shortDebugString(returnMessage) + "}"); } } catch (Throwable e) { throw new ServiceException(e); } return returnMessage; } 獲取方法的返回類型(message序列化後的結果) private Message getReturnProtoType(Method method) throws Exception { if (returnTypes.containsKey(method.getName())) { return returnTypes.get(method.getName()); } Class<?> returnType = method.getReturnType(); Method newInstMethod = returnType.getMethod("getDefaultInstance"); newInstMethod.setAccessible(true); Message prototype = (Message) newInstMethod.invoke(null, (Object[]) null); returnTypes.put(method.getName(), prototype); return prototype; } 關閉客戶端的IPC鏈接 public void close() throws IOException { if (!isClosed) { isClosed = true; CLIENTS.stopClient(client); } }
總之,invoker 類經過client call方法攔截了協議接口方法的調用,並將處理方式發送到Client.call方法中,由call方法處理如何將調用信息發送到服務端並獲取返回結果,封裝成message返回最終的調用的結果。
下面以ProtoBufRPCEngine. ProtoBufRpcInvoker爲例講解call方法的具體處理步驟。
public Writable call(RPC.Server server, String protocol, Writable writableRequest, long receiveTime) throws Exception { RpcRequestWrapper request = (RpcRequestWrapper) writableRequest; RequestHeaderProto rpcRequest = request.requestHeader; //獲取調用的方法名 String methodName = rpcRequest.getMethodName(); //獲取協議接口名 String protoName = rpcRequest.getDeclaringClassProtocolName(); //獲取客戶端版本 long clientVersion = rpcRequest.getClientProtocolVersion(); if (server.verbose) LOG.info("Call: protocol=" + protocol + ", method=" + methodName); //獲取接口實現類 ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName, clientVersion); BlockingService service = (BlockingService) protocolImpl.protocolImpl; //根據方法名獲取方法描述信息 MethodDescriptor methodDescriptor = service.getDescriptorForType() .findMethodByName(methodName); if (methodDescriptor == null) { String msg = "Unknown method " + methodName + " called on " + protocol + " protocol."; LOG.warn(msg); throw new RpcNoSuchMethodException(msg); } //根據方法描述信息獲取客戶端發送的message信息(protocol方式採用message類序列化信息)。 Message prototype = service.getRequestPrototype(methodDescriptor); //獲取方法參數 Message param = prototype.newBuilderForType() .mergeFrom(request.theRequestRead).build(); Message result; long startTime = Time.now(); int qTime = (int) (startTime - receiveTime); Exception exception = null; try { server.rpcDetailedMetrics.init(protocolImpl.protocolClass); //調用方法返回結果,內部是protocol方式實現調用協議接口中的方法。 result = service.callBlockingMethod(methodDescriptor, null, param); } catch (ServiceException e) { exception = (Exception) e.getCause(); throw (Exception) e.getCause(); } catch (Exception e) { exception = e; throw e; } finally { int processingTime = (int) (Time.now() - startTime); if (LOG.isDebugEnabled()) { String msg = "Served: " + methodName + " queueTime= " + qTime + " procesingTime= " + processingTime; if (exception != null) { msg += " exception= " + exception.getClass().getSimpleName(); } LOG.debug(msg); } String detailedMetricsName = (exception == null) ? methodName : exception.getClass().getSimpleName(); server.rpcMetrics.addRpcQueueTime(qTime); server.rpcMetrics.addRpcProcessingTime(processingTime); server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName, processingTime); } //返回最終的結果 return new RpcResponseWrapper(result); }
Client中包含不少內部類,大體可概括爲兩部分,一部分是與IPC鏈接相關的類 connection、connectionId等,另外一部分與遠程接口調用相關的 Call、ParallelCall等
callIDCounter 一個生成Client.Call 類中惟一id的一個生成器。
callId 當前線程對應的call對象的id
retryCount 重試次數,鏈接失敗或者返回結果錯誤或者超時
connections 當前client全部的正在處理的鏈接
running client是否處於運行狀態
conf configuration配置類
socketFactory 建立socket的工廠
clientId 當前client的惟一id
CONNECTION_CONTEXT_CALL_ID 特殊的一種callId 用於傳遞connection上下文信息的callId
valueClass :Class<? extends Writable> Call服務端返回結果類型
sendParamsExecutor 多線程方式處理connection
public Client(Class<? extends Writable> valueClass, Configuration conf, SocketFactory factory) { this.valueClass = valueClass; this.conf = conf; this.socketFactory = factory; //獲取超時時間 this.connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT); this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); //經過uuid方式生成clientId this.clientId = ClientId.getClientId(); //生成一個cache類型的executorService 稍後分析 this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance(); }
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException { //生成一個Call類型的對象,上面曾說過,client中包含不少內部類,Call就是其中之一,負責遠程接口調用。下面會細化此類 final Call call = createCall(rpcKind, rpcRequest); //生成一個connection對象,Hadoop在此處進行了一些優化措施,若是當前鏈接在過去的曾經應用過,而且當前仍然是活躍的,那麼就複用此鏈接。這會減小內存的開銷和遠程socket通訊的開銷,後面會細化此類 Connection connection = getConnection(remoteId, call, serviceClass, fallbackToSimpleAuth); try { //call對象已經把調用信息進行了封裝,而後經過connection對象將call封裝的信息發送到server端。 connection.sendRpcRequest(call); // send the rpc request } catch (RejectedExecutionException e) { throw new IOException("connection has been closed", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.warn("interrupted waiting to send rpc request to server", e); throw new IOException(e); } boolean interrupted = false; synchronized (call) { while (!call.done) { try { //在此處會堵塞當前線程,直道call有返回結果。由notify喚醒。 call.wait(); // wait for the result } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } } //線程中斷異常處理 if (interrupted) { // set the interrupt flag now that we are done waiting Thread.currentThread().interrupt(); } //call 返回錯誤處理 if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } else { // local exception InetSocketAddress address = connection.getRemoteAddress(); throw NetUtils.wrapException(address.getHostName(), address.getPort(), NetUtils.getHostname(), 0, call.error); } } else { //將正確信息返回到invoker中。 return call.getRpcResponse(); } } }
private Connection getConnection(ConnectionId remoteId, Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException { //確保當前client處於運行狀態 if (!running.get()) { // the client is stopped throw new IOException("The client is stopped"); } Connection connection; /* we could avoid this allocation for each RPC by having a * connectionsId object and with set() method. We need to manage the * refs for keys in HashMap properly. For now its ok. */ do { //加上同步鎖會有多個線程同時獲取鏈接,避免相同鏈接生成屢次 synchronized (connections) { connection = connections.get(remoteId); //若是鏈接池中不包含想要的鏈接則建立新鏈接 if (connection == null) { connection = new Connection(remoteId, serviceClass); connections.put(remoteId, connection); } } } while (!connection.addCall(call));//將剛剛建立的call添加到次connection中,一個connection能夠處理多個調用。 //connection初始IOstream,其中包含建立請求頭消息併發送信息。 //此段代碼並無放到同步代碼塊中,緣由是若是服務端很慢的話,它會花費很長的時間建立一個鏈接,這會使整個系統宕掉(同步代碼使得每次只能處理一個線程,其餘的connection都要等待,這會使系統處於死等狀態)。 connection.setupIOstreams(fallbackToSimpleAuth); return connection; }
建立Call 方法很簡單直接調用call的構造方法。
Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) { return new Call(rpcKind, rpcRequest); }
在說connection以前,說一下Hadoop IPC消息傳遞的方式,實際上是採用變長消息格式,因此每次發送消息以前要發送消息的總長度包含消息頭信息,通常用dataLength表示消息長度,Hadoop用4個字節的來存儲消息的大小。
相似java,Hadoop也有一個魔數 ‘hrpc’ 這個魔數存儲在connection發送的消息頭中,正好佔的是dataLength的4個字節,這是Hadoop精心設置的一種方式。若是dataLength字段是hrpc則說明是集羣中某個client發送過來的信息,而頭信息並不須要數據內容,只包含頭信息,這使得在處理頭信息時,不用關心信息長度。由於他的長度就是頭信息那麼大。
Server 對應服務端的地址和端口
remoteId connectionId 是connection的惟一id屬性
socket 與服務端的socket鏈接
in 輸入,從鏈接中獲取服務端返回的結果用
out 輸出,發送數據到服務端用
lastActivity 最近一次進行I/O的時間用於判斷超時
rpcTimeout 超時時間範圍
calls 當前connection處理的全部call
maxIdleTime 最大空閒時間,若是超過這個時間,connection將會從client對象中的connections map對象中剔除掉,將剩餘的空間留給比較忙的connection。
connectionRetryPolicy 鏈接失敗的重試策略。
maxRetriesOnSocketTimeouts 在socket中最大的重試超時時間範圍。
shouldCloseConnection 是否應該關閉當前connection,true關閉
sendRpcRequestLock 同步鎖用對象。
TcpNoDelay 是否採用Nagle算法(與tcp數據包相關)
closeException 關閉connection多是由於某種錯誤,記錄錯誤信息
doping 每隔一段時間發送的ping信息,防止服務端誤認爲客戶端死掉。
pingInterval ping的時間間隔
pingRequest ping發送的內容
public Connection(ConnectionId remoteId, int serviceClass) throws IOException { this.remoteId = remoteId; this.server = remoteId.getAddress(); if (server.isUnresolved()) { throw NetUtils.wrapException(server.getHostName(), server.getPort(), null, 0, new UnknownHostException()); } this.rpcTimeout = remoteId.getRpcTimeout(); this.maxIdleTime = remoteId.getMaxIdleTime(); this.connectionRetryPolicy = remoteId.connectionRetryPolicy; this.maxRetriesOnSasl = remoteId.getMaxRetriesOnSasl(); this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts(); this.tcpNoDelay = remoteId.getTcpNoDelay(); this.doPing = remoteId.getDoPing(); if (doPing) { // construct a RPC header with the callId as the ping callId pingRequest = new ByteArrayOutputStream(); RpcRequestHeaderProto pingHeader = ProtoUtil .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, PING_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, clientId); pingHeader.writeDelimitedTo(pingRequest); } this.pingInterval = remoteId.getPingInterval(); this.serviceClass = serviceClass; if (LOG.isDebugEnabled()) { LOG.debug("The ping interval is " + this.pingInterval + " ms."); } UserGroupInformation ticket = remoteId.getTicket(); // try SASL if security is enabled or if the ugi contains tokens. // this causes a SIMPLE client with tokens to attempt SASL boolean trySasl = UserGroupInformation.isSecurityEnabled() || (ticket != null && !ticket.getTokens().isEmpty()); this.authProtocol = trySasl ? AuthProtocol.SASL : AuthProtocol.NONE; this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " + server.toString() + " from " + ((ticket==null)?"an unknown user":ticket.getUserName())); this.setDaemon(true); }
下面分析一下在getConnection中的setupIOstreams,這是Connection初始IO和發送頭信息的方法 ,注意此處的同步鎖synchronized和上面的getConnection 的同步代碼塊意義不同,代碼塊鎖住了全部的Connection,而這裏的同步鎖只是在Connection重用的時候同步鎖。
private synchronized void setupIOstreams( AtomicBoolean fallbackToSimpleAuth) { //若是是已經存在的鏈接,或者這個鏈接應該關閉了,直接返回。兩種狀況都已不須要初始化Connection了。 if (socket != null || shouldCloseConnection.get()) { return; } try { if (LOG.isDebugEnabled()) { LOG.debug("Connecting to "+server); } if (Trace.isTracing()) { Trace.addTimelineAnnotation("IPC client connecting to " + server); } short numRetries = 0; Random rand = null; while (true) { //connection初始化 setupConnection(); //生成socket的IO InputStream inStream = NetUtils.getInputStream(socket); OutputStream outStream = NetUtils.getOutputStream(socket); //發送請求頭信息 writeConnectionHeader(outStream); ----------------------------------------安全、權限相關--------------------------------------------- if (authProtocol == AuthProtocol.SASL) { final InputStream in2 = inStream; final OutputStream out2 = outStream; UserGroupInformation ticket = remoteId.getTicket(); if (ticket.getRealUser() != null) { ticket = ticket.getRealUser(); } try { authMethod = ticket .doAs(new PrivilegedExceptionAction<AuthMethod>() { @Override public AuthMethod run() throws IOException, InterruptedException { return setupSaslConnection(in2, out2); } }); } catch (Exception ex) { authMethod = saslRpcClient.getAuthMethod(); if (rand == null) { rand = new Random(); } handleSaslConnectionFailure(numRetries++, maxRetriesOnSasl, ex, rand, ticket); continue; } if (authMethod != AuthMethod.SIMPLE) { // Sasl connect is successful. Let's set up Sasl i/o streams. inStream = saslRpcClient.getInputStream(inStream); outStream = saslRpcClient.getOutputStream(outStream); // for testing remoteId.saslQop = (String)saslRpcClient.getNegotiatedProperty(Sasl.QOP); LOG.debug("Negotiated QOP is :" + remoteId.saslQop); if (fallbackToSimpleAuth != null) { fallbackToSimpleAuth.set(false); } } else if (UserGroupInformation.isSecurityEnabled()) { if (!fallbackAllowed) { throw new IOException("Server asks us to fall back to SIMPLE " + "auth, but this client is configured to only allow secure " + "connections."); } if (fallbackToSimpleAuth != null) { fallbackToSimpleAuth.set(true); } } } ----------------------------------------安全、權限相關--------------------------------------------- //是否到了發送ping的時間 if (doPing) { //將ping內容讀入 inStream = new PingInputStream(inStream); } this.in = new DataInputStream(new BufferedInputStream(inStream)); // SASL may have already buffered the stream if (!(outStream instanceof BufferedOutputStream)) { outStream = new BufferedOutputStream(outStream); } this.out = new DataOutputStream(outStream); //發送Connection上下文 writeConnectionContext(remoteId, authMethod); // 更新活躍時間 touch(); if (Trace.isTracing()) { Trace.addTimelineAnnotation("IPC client connected to " + server); } // 開啓run方法,其中包含接受server返回信息。 start(); return; } } catch (Throwable t) { //異常關閉鏈接 if (t instanceof IOException) { //此方法會是shouldCloseConnection 變爲true, markClosed((IOException)t); } else { markClosed(new IOException("Couldn't set up IO streams", t)); } close(); } }
private synchronized void markClosed(IOException e) { //經過cas方式設置爲true if (shouldCloseConnection.compareAndSet(false, true)) { closeException = e; //喚醒全部阻塞在此鏈接的線程。 notifyAll(); } }
private synchronized void setupConnection() throws IOException { //io錯誤次數 short ioFailures = 0; //超時次數 short timeoutFailures = 0; //循環直道成功建立socket鏈接 while (true) { try { //建立socket this.socket = socketFactory.createSocket(); this.socket.setTcpNoDelay(tcpNoDelay); this.socket.setKeepAlive(true); ---------------------------權限、安全相關--------------------------------------- /* * Bind the socket to the host specified in the principal name of the * client, to ensure Server matching address of the client connection * to host name in principal passed. */ UserGroupInformation ticket = remoteId.getTicket(); if (ticket != null && ticket.hasKerberosCredentials()) { KerberosInfo krbInfo = remoteId.getProtocol().getAnnotation(KerberosInfo.class); if (krbInfo != null && krbInfo.clientPrincipal() != null) { String host = SecurityUtil.getHostFromPrincipal(remoteId.getTicket().getUserName()); // If host name is a valid local address then bind socket to it InetAddress localAddr = NetUtils.getLocalInetAddress(host); if (localAddr != null) { this.socket.bind(new InetSocketAddress(localAddr, 0)); } } } ---------------------------權限、安全相關--------------------------------------- //將socket綁定到server端 NetUtils.connect(this.socket, server, connectionTimeout); //超時時間和ping間隔相同。 if (rpcTimeout > 0) { pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval } //設置socket超時 this.socket.setSoTimeout(pingInterval); return; } catch (ConnectTimeoutException toe) { /* 鏈接超時多是鏈接地址發生了改變,調用updateAdress方法,若是返回true *說明鏈接地址確實改變了,從新創建鏈接。 */ if (updateAddress()) { //更新超時次數和io錯誤次數爲0 timeoutFailures = ioFailures = 0; } //此方法會關閉socket鏈接, handleConnectionTimeout(timeoutFailures++, maxRetriesOnSocketTimeouts, toe); } catch (IOException ie) { if (updateAddress()) { timeoutFailures = ioFailures = 0; } handleConnectionFailure(ioFailures++, ie); } } }
private synchronized boolean updateAddress() throws IOException { // Do a fresh lookup with the old host name. InetSocketAddress currentAddr = NetUtils.createSocketAddrForHost( server.getHostName(), server.getPort()); //若是地址與之前的不一樣則更新 if (!server.equals(currentAddr)) { LOG.warn("Address change detected. Old: " + server.toString() + " New: " + currentAddr.toString()); //更新爲新的地址 server = currentAddr; return true; } return false; }
/** * Write the connection header - this is sent when connection is established * +----------------------------------+ * | "hrpc" 4 bytes | * +----------------------------------+ * | Version (1 byte) | * +----------------------------------+ * | Service Class (1 byte) | * +----------------------------------+ * | AuthProtocol (1 byte) | * +----------------------------------+ */ private void writeConnectionHeader(OutputStream outStream) throws IOException { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream)); // Write out the header, version and authentication method out.write(RpcConstants.HEADER.array()); out.write(RpcConstants.CURRENT_VERSION); out.write(serviceClass); out.write(authProtocol.callId); out.flush(); }
/* 此方法和上面的方法都不是同步的,緣由是他們只在初始化的時候調用一次。
private void writeConnectionContext(ConnectionId remoteId, AuthMethod authMethod) throws IOException { // Write out the ConnectionHeader IpcConnectionContextProto message = ProtoUtil.makeIpcConnectionContext( RPC.getProtocolName(remoteId.getProtocol()), remoteId.getTicket(), authMethod); //構造上下文信息,只有上下文內容,沒有信系, RpcRequestHeaderProto connectionContextHeader = ProtoUtil //rpc引擎類型,rpc打包方式,context的callId默認-3,重試次數-1表示一直重試,客戶端id .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, clientId); RpcRequestMessageWrapper request = new RpcRequestMessageWrapper(connectionContextHeader, message); // Write out the packet length out.writeInt(request.getLength()); request.write(out); }
下面是client call方法中經過Connection sendRPCRequest發送遠程調用
/** Initiates a rpc call by sending the rpc request to the remote server. */ public void sendRpcRequest(final Call call) throws InterruptedException, IOException { //若是應該關閉鏈接,返回 if (shouldCloseConnection.get()) { return; } // 序列化的call將會被髮送到服務端,這是在call線程中處理 // 而不是sendParamsExecutor 線程 // 所以若是序列化出現了問題,也能準確的報告 // 這也是一種併發序列化的方式. // // Format of a call on the wire: // 0) Length of rest below (1 + 2) // 1) RpcRequestHeader - is serialized Delimited hence contains length // 2) RpcRequest // // Items '1' and '2' are prepared here. final DataOutputBuffer d = new DataOutputBuffer(); //構造請求頭信息,與鏈接剛創建時候相似。 RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry, clientId); //將請求信息和頭信息寫到一個輸入流的buffer中 header.writeDelimitedTo(d); call.rpcRequest.write(d); // synchronized (sendRpcRequestLock) { //多線程方式發送請求 Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() { @Override public void run() { try { //out加同步鎖,以避免多個消息寫亂輸出流 synchronized (Connection.this.out) { if (shouldCloseConnection.get()) { return; } if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); //經過Connection的out輸出流將請求信息發送到服務端 byte[] data = d.getData(); //計算信息總長度 int totalLength = d.getLength(); //寫出長度信息 out.writeInt(totalLength); // Total Length //寫出內容信息 out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest out.flush(); } } catch (IOException e) { // exception at this point would leave the connection in an // unrecoverable state (eg half a call left on the wire). // So, close the connection, killing any outstanding calls markClosed(e); } finally { //the buffer is just an in-memory buffer, but it is still polite to // close early IOUtils.closeStream(d); } } }); try { //阻塞等待結果,真正的返回結果是在call 中。 senderFuture.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); // cause should only be a RuntimeException as the Runnable above // catches IOException if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } else { throw new RuntimeException("unexpected checked exception", cause); } } } }
public void run() { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": starting, having connections " + connections.size()); try { //等待是否有可用的call,直到Connection可關閉時,結束循環 while (waitForWork()) {//wait here for work - read or close connection //接受返回結果 receiveRpcResponse(); } } catch (Throwable t) { // This truly is unexpected, since we catch IOException in receiveResponse // -- this is only to be really sure that we don't leave a client hanging // forever. LOG.warn("Unexpected error reading responses on connection " + this, t); markClosed(new IOException("Error reading responses", t)); } //while循環判斷shouldCloseConnection爲true,關閉Connection close(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": stopped, remaining connections " + connections.size()); }
private synchronized boolean waitForWork() { //在鏈接可用,還沒有有可處理的call時,掛起當前線程直到達到最大空閒時間。 if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { long timeout = maxIdleTime- (Time.now()-lastActivity.get()); if (timeout>0) { try { wait(timeout); } catch (InterruptedException e) {} } } //在有處理的call且鏈接可用,client尚在運行,返回true if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { return true; //其餘情況則返回false,並標記shouldCloseConnection爲true } else if (shouldCloseConnection.get()) { return false; } else if (calls.isEmpty()) { // idle connection closed or stopped markClosed(null); return false; } else { // get stopped but there are still pending requests markClosed((IOException)new IOException().initCause( new InterruptedException())); return false; } }
private synchronized boolean addCall(Call call) { //若是當前鏈接不可用則返回false。 if (shouldCloseConnection.get()) return false; //將call對象放入Connection正在處理的call隊列裏。 calls.put(call.id, call); //喚醒在waitForWork中被wait的鏈接,若是沒有這略過 notify(); return true; }
Addcall 方法是在上面client解析中getConnection的方法中調用。由於鏈接會複用,因此方法中會判斷鏈接是否可用。
private void receiveRpcResponse() { if (shouldCloseConnection.get()) { return; } touch(); try { //獲取消息長度 int totalLen = in.readInt(); 讀取消息內容 RpcResponseHeaderProto header = RpcResponseHeaderProto.parseDelimitedFrom(in); //結果校驗 checkResponse(header); int headerLen = header.getSerializedSize(); headerLen += CodedOutputStream.computeRawVarint32Size(headerLen); //獲取對應處理的call int callId = header.getCallId(); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + callId); //找到對應的call並將結果放到call對象的RpcResponse中 Call call = calls.get(callId); //查看處理結果的狀態,是否爲success RpcStatusProto status = header.getStatus(); if (status == RpcStatusProto.SUCCESS) { //狀態success將返回值放入call的rpcresponse中 Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value //此請求已處理完成,從calls中移除call calls.remove(callId); call.setRpcResponse(value); // verify that length was correct // only for ProtobufEngine where len can be verified easily //若是是ProtoBuffEngine則用protocol方式將結果包裹一次,用於protocol的方式處理 if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) { ProtobufRpcEngine.RpcWrapper resWrapper = (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse(); if (totalLen != headerLen + resWrapper.getLength()) { throw new RpcClientException( "RPC response length mismatch on rpc success"); } } } else { // Rpc 返回錯誤 // Verify that length was correct if (totalLen != headerLen) { throw new RpcClientException( "RPC response length mismatch on rpc error"); } //獲取錯誤信息 final String exceptionClassName = header.hasExceptionClassName() ? header.getExceptionClassName() : "ServerDidNotSetExceptionClassName"; final String errorMsg = header.hasErrorMsg() ? header.getErrorMsg() : "ServerDidNotSetErrorMsg" ; final RpcErrorCodeProto erCode = (header.hasErrorDetail() ? header.getErrorDetail() : null); if (erCode == null) { LOG.warn("Detailed error code not set by server on rpc error"); } RemoteException re = ( (erCode == null) ? new RemoteException(exceptionClassName, errorMsg) : new RemoteException(exceptionClassName, errorMsg, erCode)); if (status == RpcStatusProto.ERROR) { //error時,將錯誤信息填充到call中,並將call從calls中移除 calls.remove(callId); call.setException(re); } else if (status == RpcStatusProto.FATAL) { //若是是致命錯誤則關閉鏈接,多是鏈接異常引發的錯誤 // Close the connection markClosed(re); } } } catch (IOException e) { //若是發生IO錯誤則關閉鏈接。 markClosed(e); } }
Id call的惟一id 來自於client的callId
Retry 重試次數,來自於client的retryCount
rpcRequest 請求內容序列化後的
rpcResponese 返回結果序列化後的
error 錯誤信息
rpcKind rpc引擎
done 此請求是否完成
//其實方法很簡單只是將receiveRpcResponse中序列化好的結果放到了call的RPCResponse中。並調用了callComplete。 public synchronized void setRpcResponse(Writable rpcResponse) { this.rpcResponse = rpcResponse; callComplete(); }
protected synchronized void callComplete() { //標記這次請求已完成 this.done = true; notify(); // notify caller }
while (!call.done) { try { call.wait(); // wait for the result } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } }