源碼級強力分析hadoop的RPC機制

必備技術點: 
1. 動態代理(參考
 :http://weixiaolu.iteye.com/blog/1477774 
2. Java NIO(參考
 :http://weixiaolu.iteye.com/blog/1479656 
3. Java網絡編程

目錄: 
一.RPC協議
二.ipc.RPC源碼分析
三.ipc.Client源碼分析
四.ipc.Server源碼分析
 

分析: 

一.RPC協議 

在分析協議以前,我以爲咱們頗有必要先搞清楚協議是什麼。下面我就談一點本身的認識吧。若是你學過java的網絡編程,你必定知道:當客戶端發送一個字節給服務端時,服務端必須也要有一個讀字節的方法在阻塞等待;反之亦然。 這種我把它稱爲底層的通訊協議。但是對於一個大型的網絡通訊系統來講,很顯然這種說法的協議粒度過小,不方便咱們理解整個網絡通訊的流程及架構,因此我造了個說法:架構層次的協議。通俗一點說,就是我把某些接口和接口中的方法稱爲協議,客戶端和服務端只要實現這些接口中的方法就能夠進行通訊了,從這個角度來講,架構層次協議的說法就能夠成立了(注:若是從架構層次的協議來分析系統,咱們就先不要太在乎方法的具體實現,呵呵,我相信你懂得~)。

Hadoop的RPC機制正是採用了這種「架構層次的協議」,有一整套做爲協議的接口。如圖:
java

 

 

下面就幾個重點的協議介紹一下吧:node


VersionedProtocol :它是全部RPC協議接口的父接口,其中只有一個方法:getProtocolVersion()

(1)HDFS相關 
ClientDatanodeProtocol :一個客戶端和datanode之間的協議接口,用於數據塊恢復
ClientProtocol :client與Namenode交互的接口,全部控制流的請求均在這裏,如:建立文件、刪除文件等;
DatanodeProtocol : Datanode與Namenode交互的接口,如心跳、blockreport等;
NamenodeProtocol :SecondaryNode與Namenode交互的接口。

(2)Mapreduce相關 
InterDatanodeProtocol :Datanode內部交互的接口,用來更新block的元數據;
InnerTrackerProtocol :TaskTracker與JobTracker交互的接口,功能與DatanodeProtocol類似;
JobSubmissionProtocol :JobClient與JobTracker交互的接口,用來提交Job、得到Job等與Job相關的操做;
TaskUmbilicalProtocol :Task中子進程與母進程交互的接口,子進程即map、reduce等操做,母進程即TaskTracker,該接口能夠回報子進程的運行狀態(詞彙掃盲: umbilical 臍帶的, 關係親密的) 。
編程

 

一會兒羅列了這麼多的協議,有些人可能要問了,hadoop是怎麼使用它們的呢?呵呵,不要着急哦,其實本篇博客所分析的是hadoop的RPC機制底層的具體實現,而這些協議倒是應用層上的東西,好比hadoop是怎麼樣保持「心跳」的啊。因此在個人下一篇博客:源碼級分析hadoop的心跳機制中會詳細說明以上協議是怎樣被使用的。盡請期待哦~。如今就開始咱們的RPC源碼之旅吧•••

二.ipc.RPC源碼分析 

ipc.RPC類中有一些內部類,爲了你們對RPC類有個初步的印象,就先羅列幾個咱們感興趣的分析一下吧:
緩存

 

Invocation :用於封裝方法名和參數,做爲數據傳輸層,至關於VO吧。
ClientCache :用於存儲client對象,用socket factory做爲hash key,存儲結構爲hashMap <SocketFactory, Client>。
Invoker :是動態代理中的調用實現類,繼承了InvocationHandler.
Server :是ipc.Server的實現類。
網絡

 

從以上的分析能夠知道,Invocation類僅做爲VO,ClientCache類只是做爲緩存,而Server類用於服務端的處理,他們都和客戶端的數據流和業務邏輯沒有關係。如今就只剩下Invoker類了。若是你對動態代理(參考:http://weixiaolu.iteye.com/blog/1477774 )比較瞭解的話,你一下就會想到,咱們接下來去研究的就是RPC.Invoker類中的invoke()方法了。代碼以下:

代碼一:
架構

Java代碼  收藏代碼socket

  1. public Object invoke(Object proxy, Method method, Object[] args)  tcp

  2.   throws Throwable {  函數

  3.   •••  oop

  4.   ObjectWritable value = (ObjectWritable)  

  5.     client.call(new Invocation(method, args), remoteId);  

  6.   •••  

  7.   return value.get();  

  8. }  

 

呵呵,若是你發現這個invoke()方法實現的有些奇怪的話,那你就對了。通常咱們看到的動態代理的invoke()方法中總會有 method.invoke(ac, arg);  這句代碼。而上面代碼中卻沒有,這是爲何呢?其實使用 method.invoke(ac, arg); 是在本地JVM中調用;而在hadoop中,是將數據發送給服務端,服務端將處理的結果再返回給客戶端,因此這裏的invoke()方法必然須要進行網絡通訊。而網絡通訊就是下面的這段代碼實現的:

代碼二:

Java代碼  收藏代碼

  1. ObjectWritable value = (ObjectWritable)  

  2. client.call(new Invocation(method, args), remoteId);  

 

Invocation類在這裏封裝了方法名和參數,充當VO。其實這裏網絡通訊只是調用了Client類的call()方法。那咱們接下來分析一下ipc.Client源碼吧。不過在分析ipc.Client源碼以前,爲了避免讓咱們像盲目的蒼蠅同樣亂撞,我想先肯定一下咱們分析的目的是什麼,我總結出了三點須要解決的問題:


1. 客戶端和服務端的鏈接是怎樣創建的?
2. 客戶端是怎樣給服務端發送數據的?
3. 客戶端是怎樣獲取服務端的返回數據的?


基於以上三個問題,咱們開始吧!!!

三.ipc.Client源碼分析 

一樣,爲了對Client類有個初步的瞭解,咱們也先羅列幾個咱們感興趣的內部類:

 

Call :用於封裝Invocation對象,做爲VO,寫到服務端,同時也用於存儲從服務端返回的數據
Connection :用以處理遠程鏈接對象。繼承了Thread
ConnectionId :惟一肯定一個鏈接

 

問題1:客戶端和服務端的鏈接是怎樣創建的? 

下面咱們來看看Client類中的cal()方法吧:

代碼三:

Java代碼  收藏代碼

  1. public Writable call(Writable param, ConnectionId remoteId)    

  2.                        throws InterruptedException, IOException {  

  3.     Call call = new Call(param);       //將傳入的數據封裝成call對象  

  4.     Connection connection = getConnection(remoteId, call);   //得到一個鏈接  

  5.     connection.sendParam(call);     // 向服務端發送call對象  

  6.     boolean interrupted = false;  

  7.     synchronized (call) {  

  8.       while (!call.done) {  

  9.         try {  

  10.           call.wait(); // 等待結果的返回,在Call類的callComplete()方法裏有notify()方法用於喚醒線程  

  11.         } catch (InterruptedException ie) {  

  12.           // 因中斷異常而終止,設置標誌interrupted爲true  

  13.           interrupted = true;  

  14.         }  

  15.       }  

  16.       if (interrupted) {  

  17.         Thread.currentThread().interrupt();  

  18.       }  

  19.   

  20.       if (call.error != null) {  

  21.         if (call.error instanceof RemoteException) {  

  22.           call.error.fillInStackTrace();  

  23.           throw call.error;  

  24.         } else { // 本地異常  

  25.           throw wrapException(remoteId.getAddress(), call.error);  

  26.         }  

  27.       } else {  

  28.         return call.value; //返回結果數據  

  29.       }  

  30.     }  

  31.   }  

 

具體代碼的做用我已作了註釋,因此這裏再也不贅述。但到目前爲止,你依然不知道RPC機制底層的網絡鏈接是怎麼創建的。呵呵,那咱們只好再去深究了,分析代碼後,咱們會發現和網絡通訊有關的代碼只會是下面的兩句了:

代碼四:

Java代碼  收藏代碼

  1. Connection connection = getConnection(remoteId, call);   //得到一個鏈接  

  2. connection.sendParam(call);      // 向服務端發送call對象  

 

先看看是怎麼得到一個到服務端的鏈接吧,下面貼出ipc.Client類中的getConnection()方法。

代碼五:

Java代碼  收藏代碼

  1. private Connection getConnection(ConnectionId remoteId,  

  2.                                    Call call)  

  3.                                    throws IOException, InterruptedException {  

  4.     if (!running.get()) {  

  5.       // 若是client關閉了  

  6.       throw new IOException("The client is stopped");  

  7.     }  

  8.     Connection connection;  

  9. //若是connections鏈接池中有對應的鏈接對象,就不需從新建立了;若是沒有就需從新建立一個鏈接對象。  

  10. //但請注意,該//鏈接對象只是存儲了remoteId的信息,其實還並無和服務端創建鏈接。  

  11.     do {  

  12.       synchronized (connections) {  

  13.         connection = connections.get(remoteId);  

  14.         if (connection == null) {  

  15.           connection = new Connection(remoteId);  

  16.           connections.put(remoteId, connection);  

  17.         }  

  18.       }  

  19.     } while (!connection.addCall(call)); //將call對象放入對應鏈接中的calls池,就不貼出源碼了  

  20.    //這句代碼纔是真正的完成了和服務端創建鏈接哦~  

  21.     connection.setupIOstreams();  

  22.     return connection;  

  23.   }  

 

若是你還有興趣繼續分析下去,那咱們就一探創建鏈接的過程吧,下面貼出Client.Connection類中的setupIOstreams()方法:

代碼六:

Java代碼  收藏代碼

  1. private synchronized void setupIOstreams() throws InterruptedException {  

  2.  •••  

  3.     try {  

  4.      •••  

  5.       while (true) {  

  6.         setupConnection();  //創建鏈接  

  7.         InputStream inStream = NetUtils.getInputStream(socket);     //得到輸入流  

  8.         OutputStream outStream = NetUtils.getOutputStream(socket);  //得到輸出流  

  9.         writeRpcHeader(outStream);  

  10.         •••  

  11.         this.in = new DataInputStream(new BufferedInputStream  

  12.             (new PingInputStream(inStream)));   //將輸入流裝飾成DataInputStream  

  13.         this.out = new DataOutputStream  

  14.         (new BufferedOutputStream(outStream));   //將輸出流裝飾成DataOutputStream  

  15.         writeHeader();  

  16.         // 跟新活動時間  

  17.         touch();  

  18.         //當鏈接創建時,啓動接受線程等待服務端傳回數據,注意:Connection繼承了Tread  

  19.         start();  

  20.         return;  

  21.       }  

  22.     } catch (IOException e) {  

  23.       markClosed(e);  

  24.       close();  

  25.     }  

  26.   }  

 

再有一步咱們就知道客戶端的鏈接是怎麼創建的啦,下面貼出Client.Connection類中的setupConnection()方法:

代碼七:

Java代碼  收藏代碼

  1. private synchronized void setupConnection() throws IOException {  

  2.     short ioFailures = 0;  

  3.     short timeoutFailures = 0;  

  4.     while (true) {  

  5.       try {  

  6.         this.socket = socketFactory.createSocket(); //終於看到建立socket的方法了  

  7.         this.socket.setTcpNoDelay(tcpNoDelay);  

  8.        •••  

  9.         // 設置鏈接超時爲20s  

  10.         NetUtils.connect(this.socket, remoteId.getAddress(), 20000);  

  11.         this.socket.setSoTimeout(pingInterval);  

  12.         return;  

  13.       } catch (SocketTimeoutException toe) {  

  14.         /* 設置最多鏈接重試爲45次。 

  15.          * 總共有20s*45 = 15 分鐘的重試時間。 

  16.          */  

  17.         handleConnectionFailure(timeoutFailures++, 45, toe);  

  18.       } catch (IOException ie) {  

  19.         handleConnectionFailure(ioFailures++, maxRetries, ie);  

  20.       }  

  21.     }  

  22.   }  

 

終於,咱們知道了客戶端的鏈接是怎樣創建的了,其實就是建立一個普通的socket進行通訊。呵呵,那服務端是否是也是建立一個ServerSocket進行通訊的呢?呵呵,先不要急,到這裏咱們只解決了客戶端的第一個問題,下面還有兩個問題沒有解決呢,咱們一個一個地來解決吧。

問題2:客戶端是怎樣給服務端發送數據的? 

咱們回顧一下代碼四吧。第一句爲了完成鏈接的創建,咱們已經分析完畢;而第二句是爲了發送數據,呵呵,分析下去,看能不能解決咱們的問題呢。下面貼出Client.Connection類的sendParam()方法吧:

代碼八:

Java代碼  收藏代碼

  1. public void sendParam(Call call) {  

  2.       if (shouldCloseConnection.get()) {  

  3.         return;  

  4.       }  

  5.       DataOutputBuffer d=null;  

  6.       try {  

  7.         synchronized (this.out) {  

  8.           if (LOG.isDebugEnabled())  

  9.             LOG.debug(getName() + " sending #" + call.id);  

  10.           //建立一個緩衝區  

  11.           d = new DataOutputBuffer();  

  12.           d.writeInt(call.id);  

  13.           call.param.write(d);  

  14.           byte[] data = d.getData();  

  15.           int dataLength = d.getLength();  

  16.           out.writeInt(dataLength);        //首先寫出數據的長度  

  17.           out.write(data, 0, dataLength); //向服務端寫數據  

  18.           out.flush();  

  19.         }  

  20.       } catch(IOException e) {  

  21.         markClosed(e);  

  22.       } finally {  

  23.         IOUtils.closeStream(d);  

  24.       }  

  25.     }    

 

其實這就是java io的socket發送數據的通常過程哦,沒有什麼特別之處。到這裏問題二也解決了,來看看問題三吧。

問題3:客戶端是怎樣獲取服務端的返回數據的? 

咱們再回顧一下代碼六吧。代碼六中,當鏈接創建時會啓動一個線程用於處理服務端返回的數據,咱們看看這個處理線程是怎麼實現的吧,下面貼出Client.Connection類和Client.Call類中的相關方法吧:

代碼九:

Java代碼  收藏代碼

  1. 方法一:    

  2.   public void run() {  

  3.       •••  

  4.       while (waitForWork()) {  

  5.         receiveResponse();  //具體的處理方法  

  6.       }  

  7.       close();  

  8.      •••  

  9. }  

  10.   

  11. 方法二:  

  12. private void receiveResponse() {  

  13.       if (shouldCloseConnection.get()) {  

  14.         return;  

  15.       }  

  16.       touch();  

  17.       try {  

  18.         int id = in.readInt();                    // 阻塞讀取id  

  19.         if (LOG.isDebugEnabled())  

  20.           LOG.debug(getName() + " got value #" + id);  

  21.           Call call = calls.get(id);    //在calls池中找到發送時的那個對象  

  22.         int state = in.readInt();     // 阻塞讀取call對象的狀態  

  23.         if (state == Status.SUCCESS.state) {  

  24.           Writable value = ReflectionUtils.newInstance(valueClass, conf);  

  25.           value.readFields(in);           // 讀取數據  

  26.         //將讀取到的值賦給call對象,同時喚醒Client等待線程,貼出setValue()代碼方法三  

  27.           call.setValue(value);                

  28.           calls.remove(id);               //刪除已處理的call      

  29.         } else if (state == Status.ERROR.state) {  

  30.         •••  

  31.         } else if (state == Status.FATAL.state) {  

  32.         •••  

  33.         }  

  34.       } catch (IOException e) {  

  35.         markClosed(e);  

  36.       }  

  37. }  

  38.   

  39. 方法三:  

  40. public synchronized void setValue(Writable value) {  

  41.       this.value = value;  

  42.       callComplete();   //具體實現  

  43. }  

  44. protected synchronized void callComplete() {  

  45.       this.done = true;  

  46.       notify();         // 喚醒client等待線程  

  47.     }  

 

代碼九完成的功能主要是:啓動一個處理線程,讀取從服務端傳來的call對象,將call對象讀取完畢後,喚醒client處理線程。就這麼簡單,客戶端就獲取了服務端返回的數據了哦~。客戶端的源碼分析就到這裏了哦,下面咱們來分析Server端的源碼吧。

四.ipc.Server源碼分析 

一樣,爲了讓你們對ipc.Server有個初步的瞭解,咱們先分析一下它的幾個內部類吧:

 

Call :用於存儲客戶端發來的請求
Listener : 監聽類,用於監聽客戶端發來的請求,同時Listener內部還有一個靜態類,Listener.Reader,當監聽器監聽到用戶請求,便讓Reader讀取用戶請求。
Responder :響應RPC請求類,請求處理完畢,由Responder發送給請求客戶端。
Connection :鏈接類,真正的客戶端請求讀取邏輯在這個類中。
Handler :請求處理類,會循環阻塞讀取callQueue中的call對象,並對其進行操做。

 

若是你看過ipc.Server的源碼,你會發現其實ipc.Server是一個abstract修飾的抽象類。那隨之而來的問題就是:hadoop是怎樣初始化RPC的Server端的呢?這個問題着實也讓我想了好長時間。不事後來我想到Namenode初始化時必定初始化了RPC的Sever端,那咱們去看看Namenode的初始化源碼吧:

1. 初始化Server


代碼十:

Java代碼  收藏代碼

  1. private void initialize(Configuration conf) throws IOException {  

  2.    •••  

  3.     // 建立 rpc server  

  4.     InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);  

  5.     if (dnSocketAddr != null) {  

  6.       int serviceHandlerCount =  

  7.         conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,  

  8.                     DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);  

  9.       //得到serviceRpcServer  

  10.       this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),   

  11.           dnSocketAddr.getPort(), serviceHandlerCount,  

  12.           false, conf, namesystem.getDelegationTokenSecretManager());  

  13.       this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();  

  14.       setRpcServiceServerAddress(conf);  

  15. }  

  16. //得到server  

  17.     this.server = RPC.getServer(this, socAddr.getHostName(),  

  18.         socAddr.getPort(), handlerCount, false, conf, namesystem  

  19.         .getDelegationTokenSecretManager());  

  20.   

  21.    •••  

  22.     this.server.start();  //啓動 RPC server   Clients只容許鏈接該server  

  23.     if (serviceRpcServer != null) {  

  24.       serviceRpcServer.start();  //啓動 RPC serviceRpcServer 爲HDFS服務的server  

  25.     }  

  26.     startTrashEmptier(conf);  

  27.   }  

 

查看Namenode初始化源碼得知:RPC的server對象是經過ipc.RPC類的getServer()方法得到的。下面我們去看看ipc.RPC類中的getServer()源碼吧:

代碼十一:

Java代碼  收藏代碼

  1. public static Server getServer(final Object instance, final String bindAddress, final int port,  

  2.                                  final int numHandlers,  

  3.                                  final boolean verbose, Configuration conf,  

  4.                                  SecretManager<? extends TokenIdentifier> secretManager)   

  5.     throws IOException {  

  6.     return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);  

  7.   }  

 

這時咱們發現getServer()是一個建立Server對象的工廠方法,但建立的倒是RPC.Server類的對象。哈哈,如今你明白了我前面說的「RPC.Server是ipc.Server的實現類」了吧。不過RPC.Server的構造函數仍是調用了ipc.Server類的構造函數的,因篇幅所限,就不貼出相關源碼了。

2. 運行Server 
如代碼十所示,初始化Server後,Server端就運行起來了,看看ipc.Server的start()源碼吧:

代碼十二:

Java代碼  收藏代碼

  1. /** 啓動服務 */  

  2. public synchronized void start() {  

  3.   responder.start();  //啓動responder  

  4.   listener.start();   //啓動listener  

  5.   handlers = new Handler[handlerCount];  

  6.     

  7.   for (int i = 0; i < handlerCount; i++) {  

  8.     handlers[i] = new Handler(i);  

  9.     handlers[i].start();   //逐個啓動Handler  

  10.   }  

  11. }  

 

3. Server處理請求 

1)創建鏈接 
分析過ipc.Client源碼後,咱們知道Client端的底層通訊直接採用了阻塞式IO編程,當時咱們曾作出猜想:Server端是否是也採用了阻塞式IO。如今咱們仔細地分析一下吧,若是Server端也採用阻塞式IO,當鏈接進來的Client端不少時,勢必會影響Server端的性能。hadoop的實現者們考慮到了這點,因此他們採用了java  NIO來實現Server端,java NIO可參考博客:
http://weixiaolu.iteye.com/blog/1479656  。那Server端採用java NIO是怎麼創建鏈接的呢?分析源碼得知,Server端採用Listener監聽客戶端的鏈接,下面先分析一下Listener的構造函數吧:

代碼十三:

Java代碼  收藏代碼

  1. public Listener() throws IOException {  

  2.   address = new InetSocketAddress(bindAddress, port);  

  3.   // 建立ServerSocketChannel,並設置成非阻塞式  

  4.   acceptChannel = ServerSocketChannel.open();  

  5.   acceptChannel.configureBlocking(false);  

  6.   

  7.   // 將server socket綁定到本地端口  

  8.   bind(acceptChannel.socket(), address, backlogLength);  

  9.   port = acceptChannel.socket().getLocalPort();   

  10.   // 得到一個selector  

  11.   selector= Selector.open();  

  12.   readers = new Reader[readThreads];  

  13.   readPool = Executors.newFixedThreadPool(readThreads);  

  14.   //啓動多個reader線程,爲了防止請求多時服務端響應延時的問題  

  15.   for (int i = 0; i < readThreads; i++) {         

  16.     Selector readSelector = Selector.open();  

  17.     Reader reader = new Reader(readSelector);  

  18.     readers[i] = reader;  

  19.     readPool.execute(reader);  

  20.   }  

  21.   // 註冊鏈接事件  

  22.   acceptChannel.register(selector, SelectionKey.OP_ACCEPT);  

  23.   this.setName("IPC Server listener on " + port);  

  24.   this.setDaemon(true);  

  25. }  

 

在啓動Listener線程時,服務端會一直等待客戶端的鏈接,下面貼出Server.Listener類的run()方法:

代碼十四:

Java代碼  收藏代碼

  1. public void run() {  

  2.    •••  

  3.     while (running) {  

  4.       SelectionKey key = null;  

  5.       try {  

  6.         selector.select();  

  7.         Iterator<SelectionKey> iter = selector.selectedKeys().iterator();  

  8.         while (iter.hasNext()) {  

  9.           key = iter.next();  

  10.           iter.remove();  

  11.           try {  

  12.             if (key.isValid()) {  

  13.               if (key.isAcceptable())  

  14.                 doAccept(key);     //具體的鏈接方法  

  15.             }  

  16.           } catch (IOException e) {  

  17.           }  

  18.           key = null;  

  19.         }  

  20.       } catch (OutOfMemoryError e) {  

  21.      •••           

  22.   }  

 

下面貼出Server.Listener類中doAccept ()方法中的關鍵源碼吧:

代碼十五:

Java代碼  收藏代碼

  1.     void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {  

  2.       Connection c = null;  

  3.       ServerSocketChannel server = (ServerSocketChannel) key.channel();  

  4.       SocketChannel channel;  

  5.       while ((channel = server.accept()) != null) { //創建鏈接  

  6.         channel.configureBlocking(false);  

  7.         channel.socket().setTcpNoDelay(tcpNoDelay);  

  8.         Reader reader = getReader();  //從readers池中得到一個reader  

  9.         try {  

  10.           reader.startAdd(); // 激活readSelector,設置adding爲true  

  11.           SelectionKey readKey = reader.registerChannel(channel);//將讀事件設置成興趣事件  

  12.           c = new Connection(readKey, channel, System.currentTimeMillis());//建立一個鏈接對象  

  13.           readKey.attach(c);   //將connection對象注入readKey  

  14.           synchronized (connectionList) {  

  15.             connectionList.add(numConnections, c);  

  16.             numConnections++;  

  17.           }  

  18.         •••   

  19.         } finally {  

  20. //設置adding爲false,採用notify()喚醒一個reader,其實代碼十三中啓動的每一個reader都使  

  21. //用了wait()方法等待。因篇幅有限,就不貼出源碼了。  

  22.           reader.finishAdd();  

  23.         }  

  24.       }  

  25.     }  

 

當reader被喚醒,reader接着執行doRead()方法。

2)接收請求 
下面貼出Server.Listener.Reader類中的doRead()方法和Server.Connection類中的readAndProcess()方法源碼:

代碼十六:

Java代碼  收藏代碼

  1. 方法一:     

  2.  void doRead(SelectionKey key) throws InterruptedException {  

  3.       int count = 0;  

  4.       Connection c = (Connection)key.attachment();  //得到connection對象  

  5.       if (c == null) {  

  6.         return;    

  7.       }  

  8.       c.setLastContact(System.currentTimeMillis());  

  9.       try {  

  10.         count = c.readAndProcess();    // 接受並處理請求    

  11.       } catch (InterruptedException ieo) {  

  12.        •••  

  13.       }  

  14.      •••      

  15. }  

  16.   

  17. 方法二:  

  18. public int readAndProcess() throws IOException, InterruptedException {  

  19.       while (true) {  

  20.         •••  

  21.         if (!rpcHeaderRead) {  

  22.           if (rpcHeaderBuffer == null) {  

  23.             rpcHeaderBuffer = ByteBuffer.allocate(2);  

  24.           }  

  25.          //讀取請求頭  

  26.           count = channelRead(channel, rpcHeaderBuffer);  

  27.           if (count < 0 || rpcHeaderBuffer.remaining() > 0) {  

  28.             return count;  

  29.           }  

  30.         // 讀取請求版本號    

  31.           int version = rpcHeaderBuffer.get(0);  

  32.           byte[] method = new byte[] {rpcHeaderBuffer.get(1)};  

  33.         •••    

  34.          

  35.           data = ByteBuffer.allocate(dataLength);  

  36.         }  

  37.         // 讀取請求    

  38.         count = channelRead(channel, data);  

  39.           

  40.         if (data.remaining() == 0) {  

  41.          •••  

  42.           if (useSasl) {  

  43.          •••  

  44.           } else {  

  45.             processOneRpc(data.array());//處理請求  

  46.           }  

  47.         •••  

  48.           }  

  49.         }   

  50.         return count;  

  51.       }  

  52.     }  

 

3)得到call對象 
下面貼出Server.Connection類中的processOneRpc()方法和processData()方法的源碼。

代碼十七:

Java代碼  收藏代碼

  1. 方法一:     

  2.  private void processOneRpc(byte[] buf) throws IOException,  

  3.         InterruptedException {  

  4.       if (headerRead) {  

  5.         processData(buf);  

  6.       } else {  

  7.         processHeader(buf);  

  8.         headerRead = true;  

  9.         if (!authorizeConnection()) {  

  10.           throw new AccessControlException("Connection from " + this  

  11.               + " for protocol " + header.getProtocol()  

  12.               + " is unauthorized for user " + user);  

  13.         }  

  14.       }  

  15. }  

  16. 方法二:  

  17.     private void processData(byte[] buf) throws  IOException, InterruptedException {  

  18.       DataInputStream dis =  

  19.         new DataInputStream(new ByteArrayInputStream(buf));  

  20.       int id = dis.readInt();      // 嘗試讀取id  

  21.       Writable param = ReflectionUtils.newInstance(paramClass, conf);//讀取參數  

  22.       param.readFields(dis);          

  23.           

  24.       Call call = new Call(id, param, this);  //封裝成call  

  25.       callQueue.put(call);   // 將call存入callQueue  

  26.       incRpcCount();  // 增長rpc請求的計數  

  27.     }  

 

4)處理call對象 
你還記得Server類中還有個Handler內部類嗎?呵呵,對call對象的處理就是它乾的。下面貼出Server.Handler類中run()方法中的關鍵代碼:

代碼十八:

Java代碼  收藏代碼

  1. while (running) {  

  2.       try {  

  3.         final Call call = callQueue.take(); //彈出call,可能會阻塞  

  4.         •••  

  5.         //調用ipc.Server類中的call()方法,但該call()方法是抽象方法,具體實如今RPC.Server類中  

  6.         value = call(call.connection.protocol, call.param, call.timestamp);  

  7.         synchronized (call.connection.responseQueue) {  

  8.           setupResponse(buf, call,   

  9.                       (error == null) ? Status.SUCCESS : Status.ERROR,   

  10.                       value, errorClass, error);  

  11.            •••  

  12.           //給客戶端響應請求  

  13.           responder.doRespond(call);  

  14.         }  

  15. }  

 


5)返回請求 
下面貼出Server.Responder類中的doRespond()方法源碼:
 

代碼十九:

Java代碼  收藏代碼

  1. 方法一:     

  2.  void doRespond(Call call) throws IOException {  

  3.       synchronized (call.connection.responseQueue) {  

  4.         call.connection.responseQueue.addLast(call);  

  5.         if (call.connection.responseQueue.size() == 1) {  

  6.            // 返回響應結果,並激活writeSelector  

  7.           processResponse(call.connection.responseQueue, true);  

  8.         }  

  9.       }  

  10. }  

 

小結:


到這裏,hadoop RPC機制的源碼分析就結束了,請繼續關注個人後續博客:hadoop心跳機制的源碼分析。在這裏須要感謝我所參考的iteye上相關博主的文章,因太多了,就不一一列舉了,不過最感謝的是wikieno的博客,博客地址爲:
http://www.wikieno.com/2012/02/hadoop-ipc-server/ 。

相關文章
相關標籤/搜索