RPC原理及RPC實例分析

在學校期間你們都寫過很多程序,好比寫個hello world服務類,而後本地調用下,以下所示。這些程序的特色是服務消費方和服務提供方是本地調用關係。html

1
2
3
4
5
6
public class Test {
      public static void main(String[] args) {
          HelloWorldService helloWorldService = new HelloWorldServiceImpl();
          helloWorldService.sayHello( "test" );
      }
}

而一旦踏入公司尤爲是大型互聯網公司就會發現,公司的系統都由成千上萬大大小小的服務組成,各服務部署在不一樣的機器上,由不一樣的團隊負責。java

這時就會遇到兩個問題:node

  1. 要搭建一個新服務,免不了須要依賴他人的服務,而如今他人的服務都在遠端,怎麼調用?
  2. 其它團隊要使用咱們的新服務,咱們的服務該怎麼發佈以便他人調用?下文將對這兩個問題展開探討。

1.  如何調用他人的遠程服務?

因爲各服務部署在不一樣機器,服務間的調用免不了網絡通訊過程,服務消費方每調用一個服務都要寫一坨網絡通訊相關的代碼,不只複雜並且極易出錯。web

若是有一種方式能讓咱們像調用本地服務同樣調用遠程服務,而讓調用者對網絡通訊這些細節透明,那麼將大大提升生產力,好比服務消費方在執行helloWorldService.sayHello(「test」)時,實質上調用的是遠端的服務。這種方式其實就是RPC(Remote Procedure Call Protocol),在各大互聯網公司中被普遍使用,如阿里巴巴的hsf、dubbo(開源)、Facebook的thrift(開源)、Google grpc(開源)、Twitter的finagle(開源)等。編程

要讓網絡通訊細節對使用者透明,咱們須要對通訊細節進行封裝,咱們先看下一個RPC調用的流程涉及到哪些通訊細節:網絡

  1. 服務消費方(client)調用以本地調用方式調用服務;
  2. client stub接收到調用後負責將方法、參數等組裝成可以進行網絡傳輸的消息體;
  3. client stub找到服務地址,並將消息發送到服務端;
  4. server stub收到消息後進行解碼;
  5. server stub根據解碼結果調用本地的服務;
  6. 本地服務執行並將結果返回給server stub;
  7. server stub將返回結果打包成消息併發送至消費方;
  8. client stub接收到消息,並進行解碼;
  9. 服務消費方獲得最終結果。

RPC的目標就是要2~8這些步驟都封裝起來,讓用戶對這些細節透明。數據結構

1.1 怎麼作到透明化遠程服務調用?

怎麼封裝通訊細節才能讓用戶像以本地調用方式調用遠程服務呢?對java來講就是使用代理!java代理有兩種方式:架構

  1. jdk 動態代理
  2. 字節碼生成

儘管字節碼生成方式實現的代理更爲強大和高效,但代碼維護不易,大部分公司實現RPC框架時仍是選擇動態代理方式。併發

下面簡單介紹下動態代理怎麼實現咱們的需求。咱們須要實現RPCProxyClient代理類,代理類的invoke方法中封裝了與遠端服務通訊的細節,消費方首先從RPCProxyClient得到服務提供方的接口,當執行helloWorldService.sayHello(「test」)方法時就會調用invoke方法。負載均衡

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class RPCProxyClient implements java.lang.reflect.InvocationHandler{
     private Object obj;
 
     public RPCProxyClient(Object obj){
         this .obj=obj;
     }
 
     /**
      * 獲得被代理對象;
      */
     public static Object getProxy(Object obj){
         return java.lang.reflect.Proxy.newProxyInstance(obj.getClass().getClassLoader(),
                 obj.getClass().getInterfaces(), new RPCProxyClient(obj));
     }
 
     /**
      * 調用此方法執行
      */
     public Object invoke(Object proxy, Method method, Object[] args)
             throws Throwable {
         //結果參數;
         Object result = new Object();
         // ...執行通訊相關邏輯
         // ...
         return result;
     }
}
1
2
3
4
5
6
public class Test {
      public static void main(String[] args) {
          HelloWorldService helloWorldService = (HelloWorldService)RPCProxyClient.getProxy(HelloWorldService. class );
          helloWorldService.sayHello( "test" );
      }
  }

1.2  怎麼對消息進行編碼和解碼?

1.2.1 肯定消息數據結構

上節講了invoke裏須要封裝通訊細節(通訊細節再後面幾章詳細探討),而通訊的第一步就是要肯定客戶端和服務端相互通訊的消息結構。客戶端的請求消息結構通常須要包括如下內容:

1)接口名稱

在咱們的例子裏接口名是「HelloWorldService」,若是不傳,服務端就不知道調用哪一個接口了;

2)方法名

一個接口內可能有不少方法,若是不傳方法名服務端也就不知道調用哪一個方法;

3)參數類型&參數值

參數類型有不少,好比有bool、int、long、double、string、map、list,甚至如struct(class);以及相應的參數值;

4)超時時間

5)requestID,標識惟一請求id,在下面一節會詳細描述requestID的用處。

同理服務端返回的消息結構通常包括如下內容。

1)返回值

2)狀態code

3)requestID

1.2.2 序列化

一旦肯定了消息的數據結構後,下一步就是要考慮序列化與反序列化了。

什麼是序列化?序列化就是將數據結構或對象轉換成二進制串的過程,也就是編碼的過程。

什麼是反序列化?將在序列化過程當中所生成的二進制串轉換成數據結構或者對象的過程。

爲何須要序列化?轉換爲二進制串後纔好進行網絡傳輸嘛!

爲何須要反序列化?將二進制轉換爲對象纔好進行後續處理!

現現在序列化的方案愈來愈多,每種序列化方案都有優勢和缺點,它們在設計之初有本身獨特的應用場景,那到底選擇哪一種呢?從RPC的角度上看,主要看三點:

  1. 通用性,好比是否能支持Map等複雜的數據結構;
  2. 性能,包括時間複雜度和空間複雜度,因爲RPC框架將會被公司幾乎全部服務使用,若是序列化上能節約一點時間,對整個公司的收益都將很是可觀,同理若是序列化上能節約一點內存,網絡帶寬也能省下很多;
  3. 可擴展性,對互聯網公司而言,業務變化飛快,若是序列化協議具備良好的可擴展性,支持自動增長新的業務字段,而不影響老的服務,這將大大提供系統的靈活度。

目前互聯網公司普遍使用Protobuf、Thrift、Avro等成熟的序列化解決方案來搭建RPC框架,這些都是久經考驗的解決方案。

1.3  通訊

消息數據結構被序列化爲二進制串後,下一步就要進行網絡通訊了。目前有兩種經常使用IO通訊模型:1)BIO;2)NIO。通常RPC框架須要支持這兩種IO模型。

如何實現RPC的IO通訊框架呢?

  1. 使用java nio方式自研,這種方式較爲複雜,並且頗有可能出現隱藏bug,但也見過一些互聯網公司使用這種方式;
  2. 基於mina,mina在早幾年比較火熱,不過這些年版本更新緩慢;
  3. 基於netty,如今不少RPC框架都直接基於netty這一IO通訊框架,省力又省心,好比阿里巴巴的HSF、dubbo,Twitter的finagle等。

1.4  消息裏爲何要有requestID?

若是使用netty的話,通常會用channel.writeAndFlush()方法來發送消息二進制串,這個方法調用後對於整個遠程調用(從發出請求到接收到結果)來講是一個異步的,即對於當前線程來講,將請求發送出來後,線程就能夠日後執行了,至於服務端的結果,是服務端處理完成後,再以消息的形式發送給客戶端的。因而這裏出現如下兩個問題:

  1. 怎麼讓當前線程「暫停」,等結果回來後,再向後執行?
  2. 若是有多個線程同時進行遠程方法調用,這時創建在client server之間的socket鏈接上會有不少雙方發送的消息傳遞,先後順序也多是隨機的,server處理完結果後,將結果消息發送給client,client收到不少消息,怎麼知道哪一個消息結果是原先哪一個線程調用的?

以下圖所示,線程A和線程B同時向client socket發送請求requestA和requestB,socket前後將requestB和requestA發送至server,而server可能將responseA先返回,儘管requestA請求到達時間更晚。咱們須要一種機制保證responseA丟給ThreadA,responseB丟給ThreadB。

怎麼解決呢?

  1. client線程每次經過socket調用一次遠程接口前,生成一個惟一的ID,即requestID(requestID必需保證在一個Socket鏈接裏面是惟一的),通常經常使用AtomicLong從0開始累計數字生成惟一ID;
  2. 將處理結果的回調對象callback,存放到全局ConcurrentHashMap裏面put(requestID, callback);
  3. 當線程調用channel.writeAndFlush()發送消息後,緊接着執行callback的get()方法試圖獲取遠程返回的結果。在get()內部,則使用synchronized獲取回調對象callback的鎖,再先檢測是否已經獲取到結果,若是沒有,而後調用callback的wait()方法,釋放callback上的鎖,讓當前線程處於等待狀態。
  4. 服務端接收到請求並處理後,將response結果(此結果中包含了前面的requestID)發送給客戶端,客戶端socket鏈接上專門監聽消息的線程收到消息,分析結果,取到requestID,再從前面的ConcurrentHashMap裏面get(requestID),從而找到callback對象,再用synchronized獲取callback上的鎖,將方法調用結果設置到callback對象裏,再調用callback.notifyAll()喚醒前面處於等待狀態的線程。
1
2
3
4
5
6
7
public Object get() {
         synchronized ( this ) { // 旋鎖
             while (!isDone) { // 是否有結果了
                 wait(); //沒結果是釋放鎖,讓當前線程處於等待狀態
             }
         }
}
1
2
3
4
5
6
7
private void setDone(Response res) {
         this .res = res;
         isDone = true ;
         synchronized ( this ) { //獲取鎖,由於前面wait()已經釋放了callback的鎖了
             notifyAll(); // 喚醒處於等待的線程
         }
     }

2 如何發佈本身的服務?

如何讓別人使用咱們的服務呢?有同窗說很簡單嘛,告訴使用者服務的IP以及端口就能夠了啊。確實是這樣,這裏問題的關鍵在因而自動告知仍是人肉告知。

人肉告知的方式:若是你發現你的服務一臺機器不夠,要再添加一臺,這個時候就要告訴調用者我如今有兩個ip了,大家要輪詢調用來實現負載均衡;調用者咬咬牙改了,結果某天一臺機器掛了,調用者發現服務有一半不可用,他又只能手動修改代碼來刪除掛掉那臺機器的ip。現實生產環境固然不會使用人肉方式。

有沒有一種方法能實現自動告知,即機器的增添、剔除對調用方透明,調用者再也不須要寫死服務提供方地址?固然能夠,現現在zookeeper被普遍用於實現服務自動註冊與發現功能!

簡單來說,zookeeper能夠充當一個服務註冊表(Service Registry),讓多個服務提供者造成一個集羣,讓服務消費者經過服務註冊表獲取具體的服務訪問地址(ip+端口)去訪問具體的服務提供者。以下圖所示:

具體來講,zookeeper就是個分佈式文件系統,每當一個服務提供者部署後都要將本身的服務註冊到zookeeper的某一路徑上: /{service}/{version}/{ip:port}, 好比咱們的HelloWorldService部署到兩臺機器,那麼zookeeper上就會建立兩條目錄:分別爲/HelloWorldService/1.0.0/100.19.20.01:16888  /HelloWorldService/1.0.0/100.19.20.02:16888。

zookeeper提供了「心跳檢測」功能,它會定時向各個服務提供者發送一個請求(實際上創建的是一個 Socket 長鏈接),若是長期沒有響應,服務中心就認爲該服務提供者已經「掛了」,並將其剔除,好比100.19.20.02這臺機器若是宕機了,那麼zookeeper上的路徑就會只剩/HelloWorldService/1.0.0/100.19.20.01:16888。

服務消費者會去監聽相應路徑(/HelloWorldService/1.0.0),一旦路徑上的數據有任務變化(增長或減小),zookeeper都會通知服務消費方服務提供者地址列表已經發生改變,從而進行更新。

更爲重要的是zookeeper與生俱來的容錯容災能力(好比leader選舉),能夠確保服務註冊表的高可用性。

3.Hadoop中RPC實例分析

ipc.RPC類中有一些內部類,爲了你們對RPC類有個初步的印象,就先羅列幾個咱們感興趣的分析一下吧:

Invocation :用於封裝方法名和參數,做爲數據傳輸層。
ClientCache :用於存儲client對象,用socket factory做爲hash key,存儲結構爲hashMap <SocketFactory, Client>。
Invoker :是動態代理中的調用實現類,繼承了InvocationHandler.
Server :是ipc.Server的實現類。

1
2
3
4
5
6
7
8
public Object invoke(Object proxy, Method method, Object[] args)
       throws Throwable {
       •••
       ObjectWritable value = (ObjectWritable)
         client.call( new Invocation(method, args), remoteId);
       •••
       return value.get();
     }

若是你發現這個invoke()方法實現的有些奇怪的話,那你就對了。通常咱們看到的動態代理的invoke()方法中總會有 method.invoke(ac, arg);  這句代碼。而上面代碼中卻沒有,這是爲何呢?其實使用 method.invoke(ac, arg); 是在本地JVM中調用;而在hadoop中,是將數據發送給服務端,服務端將處理的結果再返回給客戶端,因此這裏的invoke()方法必然須要進行網絡通訊。而網絡通訊就是下面的這段代碼實現的:

1
2
ObjectWritable value = (ObjectWritable)
client.call( new Invocation(method, args), remoteId);

Invocation類在這裏封裝了方法名和參數。其實這裏網絡通訊只是調用了Client類的call()方法。那咱們接下來分析一下ipc.Client源碼吧。和第一章同樣,一樣是3個問題

  1. 客戶端和服務端的鏈接是怎樣創建的?
  2. 客戶端是怎樣給服務端發送數據的?
  3. 客戶端是怎樣獲取服務端的返回數據的?

3.1 客戶端和服務端的鏈接是怎樣創建的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public Writable call(Writable param, ConnectionId remoteId) 
                        throws InterruptedException, IOException {
     Call call = new Call(param);       //將傳入的數據封裝成call對象
     Connection connection = getConnection(remoteId, call);   //得到一個鏈接
     connection.sendParam(call);     // 向服務端發送call對象
     boolean interrupted = false ;
     synchronized (call) {
       while (!call.done) {
         try {
           call.wait(); // 等待結果的返回,在Call類的callComplete()方法裏有notify()方法用於喚醒線程
         } catch (InterruptedException ie) {
           // 因中斷異常而終止,設置標誌interrupted爲true
           interrupted = true ;
         }
       }
       if (interrupted) {
         Thread.currentThread().interrupt();
       }
 
       if (call.error != null ) {
         if (call.error instanceof RemoteException) {
           call.error.fillInStackTrace();
           throw call.error;
         } else { // 本地異常
           throw wrapException(remoteId.getAddress(), call.error);
         }
       } else {
         return call.value; //返回結果數據
       }
     }
   }

具體代碼的做用我已作了註釋,因此這裏再也不贅述。但到目前爲止,你依然不知道RPC機制底層的網絡鏈接是怎麼創建的。分析代碼後,咱們會發現和網絡通訊有關的代碼只會是下面的兩句了:

1
2
Connection connection = getConnection(remoteId, call);   //得到一個鏈接
  connection.sendParam(call);      // 向服務端發送call對象

先看看是怎麼得到一個到服務端的鏈接吧,下面貼出ipc.Client類中的getConnection()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private Connection getConnection(ConnectionId remoteId,
                                    Call call)
                                    throws IOException, InterruptedException {
     if (!running.get()) {
       // 若是client關閉了
       throw new IOException( "The client is stopped" );
     }
     Connection connection;
//若是connections鏈接池中有對應的鏈接對象,就不需從新建立了;若是沒有就需從新建立一個鏈接對象。
//但請注意,該//鏈接對象只是存儲了remoteId的信息,其實還並無和服務端創建鏈接。
     do {
       synchronized (connections) {
         connection = connections.get(remoteId);
         if (connection == null ) {
           connection = new Connection(remoteId);
           connections.put(remoteId, connection);
         }
       }
     } while (!connection.addCall(call)); //將call對象放入對應鏈接中的calls池,就不貼出源碼了
    //這句代碼纔是真正的完成了和服務端創建鏈接哦~
     connection.setupIOstreams();
     return connection;
   }

下面貼出Client.Connection類中的setupIOstreams()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private synchronized void setupIOstreams() throws InterruptedException {
   •••
      try {
       •••
        while ( true ) {
          setupConnection();  //創建鏈接
          InputStream inStream = NetUtils.getInputStream(socket);     //得到輸入流
          OutputStream outStream = NetUtils.getOutputStream(socket);  //得到輸出流
          writeRpcHeader(outStream);
          •••
          this .in = new DataInputStream( new BufferedInputStream
              ( new PingInputStream(inStream)));   //將輸入流裝飾成DataInputStream
          this .out = new DataOutputStream
          ( new BufferedOutputStream(outStream));   //將輸出流裝飾成DataOutputStream
          writeHeader();
          // 跟新活動時間
          touch();
          //當鏈接創建時,啓動接受線程等待服務端傳回數據,注意:Connection繼承了Tread
          start();
          return ;
        }
      } catch (IOException e) {
        markClosed(e);
        close();
      }
    }

再有一步咱們就知道客戶端的鏈接是怎麼創建的啦,下面貼出Client.Connection類中的setupConnection()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private synchronized void setupConnection() throws IOException {
      short ioFailures = 0 ;
      short timeoutFailures = 0 ;
      while ( true ) {
        try {
          this .socket = socketFactory.createSocket(); //終於看到建立socket的方法了
          this .socket.setTcpNoDelay(tcpNoDelay);
         •••
          // 設置鏈接超時爲20s
          NetUtils.connect( this .socket, remoteId.getAddress(), 20000 );
          this .socket.setSoTimeout(pingInterval);
          return ;
        } catch (SocketTimeoutException toe) {
          /* 設置最多鏈接重試爲45次。
           * 總共有20s*45 = 15 分鐘的重試時間。
           */
          handleConnectionFailure(timeoutFailures++, 45 , toe);
        } catch (IOException ie) {
          handleConnectionFailure(ioFailures++, maxRetries, ie);
        }
      }
    }

終於,咱們知道了客戶端的鏈接是怎樣創建的了,其實就是建立一個普通的socket進行通訊。

3.2 客戶端是怎樣給服務端發送數據的?

下面貼出Client.Connection類的sendParam()方法吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void sendParam(Call call) {
       if (shouldCloseConnection.get()) {
         return ;
       }
       DataOutputBuffer d= null ;
       try {
         synchronized ( this .out) {
           if (LOG.isDebugEnabled())
             LOG.debug(getName() + " sending #" + call.id);
           //建立一個緩衝區
           d = new DataOutputBuffer();
           d.writeInt(call.id);
           call.param.write(d);
           byte [] data = d.getData();
           int dataLength = d.getLength();
           out.writeInt(dataLength);        //首先寫出數據的長度
           out.write(data, 0 , dataLength); //向服務端寫數據
           out.flush();
         }
       } catch (IOException e) {
         markClosed(e);
       } finally {
         IOUtils.closeStream(d);
       }
     }

3.3 客戶端是怎樣獲取服務端的返回數據的?

下面貼出Client.Connection類和Client.Call類中的相關方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
方法一: 
   public void run() {
       •••
       while (waitForWork()) {
         receiveResponse();  //具體的處理方法
       }
       close();
      •••
}
 
方法二:
private void receiveResponse() {
       if (shouldCloseConnection.get()) {
         return ;
       }
       touch();
       try {
         int id = in.readInt();                    // 阻塞讀取id
         if (LOG.isDebugEnabled())
           LOG.debug(getName() + " got value #" + id);
           Call call = calls.get(id);    //在calls池中找到發送時的那個對象
         int state = in.readInt();     // 阻塞讀取call對象的狀態
         if (state == Status.SUCCESS.state) {
           Writable value = ReflectionUtils.newInstance(valueClass, conf);
           value.readFields(in);           // 讀取數據
         //將讀取到的值賦給call對象,同時喚醒Client等待線程,貼出setValue()代碼方法三
           call.setValue(value);             
           calls.remove(id);               //刪除已處理的call   
         } else if (state == Status.ERROR.state) {
         •••
         } else if (state == Status.FATAL.state) {
         •••
         }
       } catch (IOException e) {
         markClosed(e);
       }
}
 
方法三:
public synchronized void setValue(Writable value) {
       this .value = value;
       callComplete();   //具體實現
}
protected synchronized void callComplete() {
       this .done = true ;
       notify();         // 喚醒client等待線程
     }

完成的功能主要是:啓動一個處理線程,讀取從服務端傳來的call對象,將call對象讀取完畢後,喚醒client處理線程。就這麼簡單,客戶端就獲取了服務端返回的數據了哦~。客戶端的源碼分析就到這裏了哦,下面咱們來分析Server端的源碼吧。

3.4 ipc.Server源碼分析

爲了讓你們對ipc.Server有個初步的瞭解,咱們先分析一下它的幾個內部類吧:

Call :用於存儲客戶端發來的請求
Listener : 監聽類,用於監聽客戶端發來的請求,同時Listener內部還有一個靜態類,Listener.Reader,當監聽器監聽到用戶請求,便讓Reader讀取用戶請求。
Responder :響應RPC請求類,請求處理完畢,由Responder發送給請求客戶端。
Connection :鏈接類,真正的客戶端請求讀取邏輯在這個類中。
Handler :請求處理類,會循環阻塞讀取callQueue中的call對象,並對其進行操做。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void initialize(Configuration conf) throws IOException {
    •••
     // 建立 rpc server
     InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
     if (dnSocketAddr != null ) {
       int serviceHandlerCount =
         conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
                     DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
       //得到serviceRpcServer
       this .serviceRpcServer = RPC.getServer( this , dnSocketAddr.getHostName(),
           dnSocketAddr.getPort(), serviceHandlerCount,
           false , conf, namesystem.getDelegationTokenSecretManager());
       this .serviceRPCAddress = this .serviceRpcServer.getListenerAddress();
       setRpcServiceServerAddress(conf);
}
//得到server
     this .server = RPC.getServer( this , socAddr.getHostName(),
         socAddr.getPort(), handlerCount, false , conf, namesystem
         .getDelegationTokenSecretManager());
 
    •••
     this .server.start();  //啓動 RPC server   Clients只容許鏈接該server
     if (serviceRpcServer != null ) {
       serviceRpcServer.start();  //啓動 RPC serviceRpcServer 爲HDFS服務的server
     }
     startTrashEmptier(conf);
   }

查看Namenode初始化源碼得知:RPC的server對象是經過ipc.RPC類的getServer()方法得到的。下面我們去看看ipc.RPC類中的getServer()源碼吧:

1
2
3
4
5
6
7
public static Server getServer( final Object instance, final String bindAddress, final int port,
                                  final int numHandlers,
                                  final boolean verbose, Configuration conf,
                                  SecretManager<? extends TokenIdentifier> secretManager)
     throws IOException {
     return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
   }

這時咱們發現getServer()是一個建立Server對象的工廠方法,但建立的倒是RPC.Server類的對象。哈哈,如今你明白了我前面說的「RPC.Server是ipc.Server的實現類」了吧。不過RPC.Server的構造函數仍是調用了ipc.Server類的構造函數的,因篇幅所限,就不貼出相關源碼了。

初始化Server後,Server端就運行起來了,看看ipc.Server的start()源碼吧:

1
2
3
4
5
6
7
8
9
10
11
/** 啓動服務 */
  public synchronized void start() {
    responder.start();  //啓動responder
    listener.start();   //啓動listener
    handlers = new Handler[handlerCount];
 
    for ( int i = 0 ; i < handlerCount; i++) {
      handlers[i] = new Handler(i);
      handlers[i].start();   //逐個啓動Handler
    }
  }

分析過ipc.Client源碼後,咱們知道Client端的底層通訊直接採用了阻塞式IO編程,當時咱們曾作出猜想:Server端是否是也採用了阻塞式IO。如今咱們仔細地分析一下吧,若是Server端也採用阻塞式IO,當鏈接進來的Client端不少時,勢必會影響Server端的性能。hadoop的實現者們考慮到了這點,因此他們採用了java  NIO來實現Server端,那Server端採用java NIO是怎麼創建鏈接的呢?分析源碼得知,Server端採用Listener監聽客戶端的鏈接,下面先分析一下Listener的構造函數吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // 建立ServerSocketChannel,並設置成非阻塞式
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking( false );
 
      // 將server socket綁定到本地端口
      bind(acceptChannel.socket(), address, backlogLength);
      port = acceptChannel.socket().getLocalPort();
      // 得到一個selector
      selector= Selector.open();
      readers = new Reader[readThreads];
      readPool = Executors.newFixedThreadPool(readThreads);
      //啓動多個reader線程,爲了防止請求多時服務端響應延時的問題
      for ( int i = 0 ; i < readThreads; i++) {      
        Selector readSelector = Selector.open();
        Reader reader = new Reader(readSelector);
        readers[i] = reader;
        readPool.execute(reader);
      }
      // 註冊鏈接事件
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this .setName( "IPC Server listener on " + port);
      this .setDaemon( true );
    }

在啓動Listener線程時,服務端會一直等待客戶端的鏈接,下面貼出Server.Listener類的run()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void run() {
     •••
      while (running) {
        SelectionKey key = null ;
        try {
          selector.select();
          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {
                if (key.isAcceptable())
                  doAccept(key);     //具體的鏈接方法
              }
            } catch (IOException e) {
            }
            key = null ;
          }
        } catch (OutOfMemoryError e) {
       •••        
    }

下面貼出Server.Listener類中doAccept()方法中的關鍵源碼吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
       Connection c = null ;
       ServerSocketChannel server = (ServerSocketChannel) key.channel();
       SocketChannel channel;
       while ((channel = server.accept()) != null ) { //創建鏈接
         channel.configureBlocking( false );
         channel.socket().setTcpNoDelay(tcpNoDelay);
         Reader reader = getReader();  //從readers池中得到一個reader
         try {
           reader.startAdd(); // 激活readSelector,設置adding爲true
           SelectionKey readKey = reader.registerChannel(channel); //將讀事件設置成興趣事件
           c = new Connection(readKey, channel, System.currentTimeMillis()); //建立一個鏈接對象
           readKey.attach(c);   //將connection對象注入readKey
           synchronized (connectionList) {
             connectionList.add(numConnections, c);
             numConnections++;
           }
         •••
         } finally {
//設置adding爲false,採用notify()喚醒一個reader,其實代碼十三中啓動的每一個reader都使
//用了wait()方法等待。因篇幅有限,就不貼出源碼了。
           reader.finishAdd();
         }
       }
     }

當reader被喚醒,reader接着執行doRead()方法。

下面貼出Server.Listener.Reader類中的doRead()方法和Server.Connection類中的readAndProcess()方法源碼:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
方法一:  
  void doRead(SelectionKey key) throws InterruptedException {
       int count = 0 ;
       Connection c = (Connection)key.attachment();  //得到connection對象
       if (c == null ) {
         return
       }
       c.setLastContact(System.currentTimeMillis());
       try {
         count = c.readAndProcess();    // 接受並處理請求 
       } catch (InterruptedException ieo) {
        •••
       }
      •••   
}
 
方法二:
public int readAndProcess() throws IOException, InterruptedException {
       while ( true ) {
         •••
         if (!rpcHeaderRead) {
           if (rpcHeaderBuffer == null ) {
             rpcHeaderBuffer = ByteBuffer.allocate( 2 );
           }
          //讀取請求頭
           count = channelRead(channel, rpcHeaderBuffer);
           if (count < 0 || rpcHeaderBuffer.remaining() > 0 ) {
             return count;
           }
         // 讀取請求版本號 
           int version = rpcHeaderBuffer.get( 0 );
           byte [] method = new byte [] {rpcHeaderBuffer.get( 1 )};
         ••• 
 
           data = ByteBuffer.allocate(dataLength);
         }
         // 讀取請求 
         count = channelRead(channel, data);
 
         if (data.remaining() == 0 ) {
          •••
           if (useSasl) {
          •••
           } else {
             processOneRpc(data.array()); //處理請求
           }
         •••
           }
         }
         return count;
       }
     }

下面貼出Server.Connection類中的processOneRpc()方法和processData()方法的源碼。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
方法一:  
  private void processOneRpc( byte [] buf) throws IOException,
         InterruptedException {
       if (headerRead) {
         processData(buf);
       } else {
         processHeader(buf);
         headerRead = true ;
         if (!authorizeConnection()) {
           throw new AccessControlException( "Connection from " + this
               + " for protocol " + header.getProtocol()
               + " is unauthorized for user " + user);
         }
       }
}
方法二:
     private void processData( byte [] buf) throws  IOException, InterruptedException {
       DataInputStream dis =
         new DataInputStream( new ByteArrayInputStream(buf));
       int id = dis.readInt();      // 嘗試讀取id
       Writable param = ReflectionUtils.newInstance(paramClass, conf); //讀取參數
       param.readFields(dis);       
 
       Call call = new Call(id, param, this );  //封裝成call
       callQueue.put(call);   // 將call存入callQueue
       incRpcCount();  // 增長rpc請求的計數
     }

4. RPC與web service

RPC:

Web service

web service接口就是RPC中的stub組件,規定了server可以提供的服務(web service),這在server和client上是一致的,可是也是跨語言跨平臺的。同時,因爲web service規範中的WSDL文件的存在,如今各平臺的web service框架,均可以基於WSDL文件,自動生成web service接口 。

其實二者差很少,只是傳輸的協議不一樣。

Reference:

1. http://www.cnblogs.com/LBSer/p/4853234.html

2. http://weixiaolu.iteye.com/blog/1504898

3. http://kyfxbl.iteye.com/blog/1745550

 

 

在應用的迭代演進過程當中,隨着系統訪問量提升,業務複雜度提升,代碼複雜度提升,應用逐漸從單體式架構向面向服務的分佈式架構轉變。RPC(Remote Procedure Call Protocol遠程過程調用)是分佈式架構的核心,按響應方式分以下兩種:

同步調用:客戶端調用服務方方法,等待直到服務方返回結果或者超時,再繼續本身的操做

異步調用:客戶端把消息發送給中間件,再也不等待服務端返回,直接繼續本身的操做。

同步調用的實現方式有WebService和RMI。Web Service提供的服務是基於web容器的,底層使用http協議,於是適合不一樣語言異構系統間的調用。RMI其實是Java語言的RPC實現,容許方法返回 Java 對象以及基本數據類型,適合用於JAVA語言構建的不一樣系統間的調用。

異步調用的JAVA實現版就是JMS(Java Message Service),目前開源的的JMS中間件有Apache社區的ActiveMQ和Kafka,另外有阿里的RocketMQ,昨天(2016年11月28日)看到的新聞阿里已經將此組件捐獻給Apache社區基金組織。

 

下面重點對RPC同步調用的原理進行探討。簡單來講一個RPC架構裏包含以下4個組件:

一、 客戶端(Client):服務調用方

二、 客戶端存根(Client Stub):存放服務端地址信息,將客戶端的請求參數打包成網絡消息,再經過網絡發送給服務方

三、 服務端存根(Server Stub):接受客戶端發送過來的消息並解包,再調用本地服務

四、 服務端(Server):真正的服務提供者。

這4個組件調用時序圖以下:

一、 服務調用方(client)調用以本地調用方式調用服務;

二、 client stub接收到調用後負責將方法、參數等組裝成可以進行網絡傳輸的消息體;在Java裏就是序列化的過程

三、 client stub找到服務地址,並將消息經過網絡發送到服務端;

四、 server stub收到消息後進行解碼,在Java裏就是反序列化的過程;

五、 server stub根據解碼結果調用本地的服務;

六、 本地服務執行處理邏輯;

七、 本地服務將結果返回給server stub;

八、 server stub將返回結果打包成消息,Java裏的序列化;

九、 server stub將打包後的消息經過網絡併發送至消費方

十、 client stub接收到消息,並進行解碼, Java裏的反序列化;

十一、 服務調用方(client)獲得最終結果。

RPC框架的目標就是把2-10步封裝起來,把調用、編碼/解碼的過程封裝起來,讓用戶像調用本地服務同樣的調用遠程服務。要作到對客戶端(調用方)透明化服務, RPC框架須要考慮解決以下問題:

一、 服務端提供的服務如何發佈,客戶端如何發現服務;

二、 如何對請求對象和返回結果進行序列化和反序列化;

三、 如何更高效進行網絡通訊。

 

以上問題在一些開源的RPC框架裏都有比較好的解決,如阿里的Dubbo,Facebook的Thrift。有興趣的同窗能夠對這兩個框架進行深刻學習研究。

RPC是每一個分佈式應用的必用之術,本文只是進行了一個粗略的描述,但願能對你們全部幫助,拋磚引玉,引發更多人對底層技術實現的興趣。

相關文章
相關標籤/搜索