GRPC協議的相關原理

      GRPC的Client與Server,均經過Netty Channel做爲數據通訊,序列化、反序列化則使用Protobuf,每一個請求都將被封裝成HTTP2的Stream,在整個生命週期中,客戶端Channel應該保持長鏈接,而不是每次調用從新建立Channel、響應結束後關閉Channel(即短鏈接、交互式的RPC),目的就是達到連接的複用,進而提升交互效率。java

    一、Server端算法

    咱們一般使用NettyServerBuilder,即IO處理模型基於Netty,未來可能會支持其餘的IO模型。Netty Server的IO模型簡析:設計模式

    1)建立ServerBootstrap,設定BossGroup與workerGroup線程池網絡

    2)註冊childHandler,用來處理客戶端連接中的請求成幀架構

    3)bind到指定的port,即內部初始化ServerSocketChannel等,開始偵聽和接受客戶端連接。併發

    4)BossGroup中的線程用於accept客戶端連接,並轉發(輪訓)給workerGroup中的線程。app

    5)workerGroup中的特定線程用於初始化客戶端連接,初始化pipeline和handler,並將其註冊到worker線程的selector上(每一個worker線程持有一個selector,不共享)async

    6)selector上發生讀寫事件後,獲取事件所屬的連接句柄,而後執行handler(inbound),同時進行拆封package,handler執行完畢後,數據寫入經過,由outbound handler處理(封包)經過連接發出。    注意每一個worker線程上的數據請求是隊列化的。tcp

    GRPC而言,只是對Netty Server的簡單封裝,底層使用了PlaintextHandler、Http2ConnectionHandler的相關封裝等。具體Framer、Stream方式請參考Http2相關文檔。ide

    1)bossEventLoopGroup:若是沒指定,默認爲一個static共享的對象,即JVM內全部的NettyServer都使用同一個Group,默認線程池大小爲1。

    2)workerEventLoopGroup:若是沒指定,默認爲一個static共享的對象,線程池大小爲coreSize * 2。這兩個對象採用默認值並不會帶來問題;一般狀況下,即便你的application中有多個GRPC Server,默認值也同樣可以帶來收益。不合適的線程池大小,有可能會是性能受限。

    3)channelType:默認爲NioServerSocketChannel,一般咱們採用默認值;固然你也能夠開發本身的類。若是此值爲NioServerSocketChannel,則開啓keepalive,同時設定SO_BACKLOG爲128;BACKLOG就是系統底層已經創建引入連接可是還沒有被accept的Socket隊列的大小,在連接密集型(特別是短鏈接)時,若是隊列超過此值,新的建立連接請求將會被拒絕(有可能你在壓力測試時,會遇到這樣的問題),keepalive和BACKLOG特性目前沒法直接修改。

Java代碼  
  1. [root@sh149 ~]# sysctl -a|grep tcp_keepalive  
  2. net.ipv4.tcp_keepalive_time = 60  ##單位:秒  
  3. net.ipv4.tcp_keepalive_probes = 9  
  4. net.ipv4.tcp_keepalive_intvl = 75 ##單位:秒  
  5. ##能夠在/etc/sysctl.conf查看和修改相關值  
  6. ##tcp_keepalive_time:最後一個實際數據包發送完畢後,首個keepalive探測包發送的時間。  
  7. ##若是首個keepalive包探測成功,那麼連接會被標記爲keepalive(首先TCP開啓了keepalive)  
  8. ##此後此參數將再也不生效,而是使用下述的2個參數繼續探測  
  9. ##tcp_keepalive_intvl:此後,不管通道上是否發生數據交換,keepalive探測包發送的時間間隔  
  10. ##tcp_keepalive_probes:在判定連接失效以前,嘗試發送探測包的次數;  
  11. ##若是都失敗,則判定連接已關閉。  

    對於Server端,咱們須要關注上述keepalive的一些設置;若是Netty Client在空閒一段時間後,Server端會主動關閉連接,有可能Client仍然保持連接的句柄,將會致使RPC調用時發生異常。這也會致使GRPC客戶端調用時偶爾發生錯誤的緣由之一。

    4)followControlWindow:流量控制的窗口大小,單位:字節,默認值爲1M,HTTP2中的「Flow Control」特性;鏈接上,已經發送還沒有ACK的數據幀大小,好比window大小爲100K,且winow已滿,每次向Client發送消息時,若是客戶端反饋ACK(攜帶這次ACK數據的大小),window將會減掉此大小;每次向window中添加亟待發送的數據時,window增長;若是window中的數據已達到限定值,它將不能繼續添加數據,只能等待Client端ACK。

    5)maxConcurrentCallPerConnection:每一個connection容許的最大併發請求數,默認值爲Integer.MAX_VALUE;若是此鏈接上已經接受但還沒有響應的streams個數達到此值,新的請求將會被拒絕。爲了不TCP通道的過分擁堵,咱們能夠適度調整此值,以便Server端平穩處理,畢竟buffer太多的streams會對server的內存形成巨大壓力。

    6)maxMessageSize:每次調用容許發送的最大數據量,默認爲100M。

    7)maxHeaderListSize:每次調用容許發送的header的最大條數,GRPC中默認爲8192。

    對於其餘的好比SSL/TSL等,能夠參考其餘文檔。

    GRPC Server端,還有一個最終要的方法:addService。【以下文service代理模式】

    在此以前,咱們須要介紹一下bindService方法,每一個GRPC生成的service代碼中都有此方法,它以硬編碼的方式遍歷此service的方法列表,將每一個方法的調用過程都與「被代理實例」綁定,這個模式有點相似於靜態代理,好比調用sayHello方法時,其實內部直接調用「被代理實例」的sayHello方法(參見MethodHandler.invoke方法,每一個方法都有一個惟一的index,經過硬編碼方式執行);bindService方法的最終目的是建立一個ServerServiceDefinition對象,這個對象內部位置一個map,key爲此Service的方法的全名(fullname,{package}.{service}.{method}),value就是此方法的GRPC封裝類(ServerMethodDefinition)。

Java代碼 
  1. private static final int METHODID_SAY_HELLO = 0;  
  2. private static class MethodHandlers<Req, Resp> implements  
  3.       ... {  
  4.     private final TestRpcService serviceImpl;//實際被代理實例  
  5.     private final int methodId;  
  6.   
  7.     public MethodHandlers(TestRpcService serviceImpl, int methodId) {  
  8.       this.serviceImpl = serviceImpl;  
  9.       this.methodId = methodId;  
  10.     }  
  11.   
  12.     @java.lang.SuppressWarnings("unchecked")  
  13.     public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {  
  14.       switch (methodId) {  
  15.         case METHODID_SAY_HELLO:        //經過方法的index來斷定具體須要代理那個方法  
  16.           serviceImpl.sayHello((com.test.grpc.service.model.TestModel.TestRequest) request,  
  17.               (io.grpc.stub.StreamObserver<com.test.grpc.service.model.TestModel.TestResponse>) responseObserver);  
  18.           break;  
  19.         default:  
  20.           throw new AssertionError();  
  21.       }  
  22.     }  
  23.     ....  
  24.   }  
  25.   
  26.   public static io.grpc.ServerServiceDefinition bindService(  
  27.       final TestRpcService serviceImpl) {  
  28.     return io.grpc.ServerServiceDefinition.builder(SERVICE_NAME)  
  29.         .addMethod(  
  30.           METHOD_SAY_HELLO,  
  31.           asyncUnaryCall(  
  32.             new MethodHandlers<  
  33.               com.test.grpc.service.model.TestModel.TestRequest,  
  34.               com.test.grpc.service.model.TestModel.TestResponse>(  
  35.                 serviceImpl, METHODID_SAY_HELLO)))  
  36.         .build();  
  37.   }  

    addService方法能夠添加多個Service,即一個Netty Server能夠爲多個service服務,這並不違背設計模式和架構模式。addService方法將會把service保存在內部的一個map中,key爲serviceName(即{package}.{service}),value就是上述bindService生成的對象。

    那麼究竟Server端是如何解析RPC過程的?Client在調用時會將調用的service名稱 + method信息保存在一個GRPC「保留」的header中,那麼Server端便可經過獲取這個特定的header信息,就能夠得知此stream須要請求的service、以及其method,那麼接下來只須要從上述提到的map中找到service,而後找到此method,直接代理調用便可。執行結果在Encoder以後發送給Client。(參見:NettyServerHandler)

    由於是map存儲,因此咱們須要在定義.proto文件時,儘量的指定package信息,以免由於service過多致使名稱可能重複的問題。

    二、Client端

    咱們使用ManagedChannelBuilder來建立客戶端channel,ManagedChannelBuilder使用了provider機制,具體是建立了哪一種channel有provider決定,能夠參看META-INF下同類名的文件中的註冊信息。當前Channel有2種:NettyChannelBuilder與OkHttpChannelBuilder。本人的當前版本中爲NettyChannelBuilder;咱們能夠直接使用NettyChannelBuilder來構建channel。以下描述則針對NettyChannelBuilder:

    配置參數與NettyServerBuilder基本相似,再次再也不贅言。默認狀況下,Client端默認的eventLoopGroup線程池也是static的,全局共享的,默認線程個數爲coreSize * 2。合理的線程池個數能夠提升客戶端的吞吐能力。

    ManagedChannel是客戶端最核心的類,它表示邏輯上的一個channel;底層持有一個物理的transport(TCP通道,參見NettyClientTransport),並負責維護此transport的活性;即在RPC調用的任什麼時候機,若是檢測到底層transport處於關閉狀態(terminated),將會嘗試重建transport。(參見TransportSet.obtainActiveTransport())

    一般狀況下,咱們不須要在RPC調用結束後就關閉Channel,Channel能夠被一直重用,直到Client再也不須要請求位置或者Channel沒法真的異常中斷而沒法繼續使用。固然,爲了提升Client端application的總體併發能力,咱們可使用鏈接池模式,即建立多個ManagedChannel,而後使用輪訓、隨機等算法,在每次RPC請求時選擇一個Channel便可。(備註,鏈接池特性,目前GRPC還沒有提供,須要額外的開發)

    每一個Service客戶端,都生成了2種stub:BlockingStub和FutureStub;這兩個Stub內部調用過程幾乎同樣,惟一不一樣的是BlockingStub的方法直接返回Response Model,而FutureStub返回一個Future對象。BlockingStub內部也是基於Future機制,只是封裝了阻塞等待的過程:

Java代碼 
  1. try {  
  2.         //也是基於Future  
  3.       ListenableFuture<RespT> responseFuture = futureUnaryCall(call, param);  
  4.       //阻塞過程  
  5.       while (!responseFuture.isDone()) {  
  6.         try {  
  7.           executor.waitAndDrain();  
  8.         } catch (InterruptedException e) {  
  9.           Thread.currentThread().interrupt();  
  10.           throw Status.CANCELLED.withCause(e).asRuntimeException();  
  11.         }  
  12.       }  
  13.       return getUnchecked(responseFuture);  
  14.     } catch (Throwable t) {  
  15.       call.cancel();  
  16.       throw t instanceof RuntimeException ? (RuntimeException) t : new RuntimeException(t);  
  17. }  

    建立一個Stub的成本是很是低的,咱們能夠在每次請求時都經過channel建立新的stub,這並不會帶來任何問題(只不過是建立了大量對象);其實更好的方式是,咱們應該使用一個Stub發送屢次請求,即Stub也是能夠重用的;直到Stub上的狀態異常而沒法使用。最多見的異常,就是「io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED」,即表示DEADLINE時間過時,咱們能夠爲每一個Stub配置deadline時間,那麼若是此stub被使用的時長超過此值(不是空閒的時間),將不能再發送請求,此時咱們應該建立新的Stub。不少人想盡辦法來使用「withDeadlineAfter」方法來實現一些奇怪的事情,此參數的主要目的就是代表:此stub只能被使用X時長,此後將不能再進行請求,應該被釋放。因此,它並不能實現相似於「keepAlive」的語義,即便咱們須要keepAlive,也應該在Channel級別,而不是在一個Stub上。

    若是你使用了鏈接池,那麼其實鏈接池不該該關注DEADLINE的錯誤,只要Channel自己沒有terminated便可;就把這個問題交給調用者處理。若是你也對Stub使用了對象池,那麼你就可能須要關注這個狀況了,你不該該向調用者返回一個「DEADLINE」的stub,或者若是調用者發現了DEADLINE,你的對象池應該可以移除它。

    1)實例化ManagedChannel,此channel能夠被任意多個Stub實例引用;如上文說述,咱們能夠經過建立Channel池,來提升application總體的吞吐能力。此Channel實例,不該該被shutdown,直到Client端中止服務;在任什麼時候候,特別是建立Stub時,咱們應該斷定Channel的狀態。

Java代碼   收藏代碼
  1. synchronized (this) {  
  2.     if (channel.isShutdown() || channel.isTerminated()) {  
  3.         channel = ManagedChannelBuilder.forAddress(poolConfig.host, poolConfig.port).usePlaintext(true).build();  
  4.     }  
  5.     //new Stub  
  6. }  
  7.   
  8. //或者  
  9. ManagedChannel channel = (ManagedChannel)client.getChannel();  
  10. if(channel.isShutdown() || channel.isTerminated()) {  
  11.     client = createBlockStub();  
  12. }  
  13. client.sayHello(...)  

    由於Channel是能夠多路複用,因此咱們用Pool機制(好比commons-pool)也能夠實現鏈接池,只是這種池並不是徹底符合GRPC/HTTP2的設計語義,由於GRPC容許一個Channel上連續發送對個Requests(而後一次性接收多個Responses),而不是「交互式」的Request-Response模式,固然這麼使用並不會有任何問題。

    2)對於批量調用的場景,咱們可使用FutureStub,對於普通的業務類型RPC,咱們應該使用BlockingStub。

    3)每一個RPC方法的調用,好比sayHello,調用開始後,將會爲每一個調用請求建立一個ClientCall實例,其內部封裝了調用的方法、配置選項(headers)等。此後將會建立Stream對象,每一個Stream都持有惟一的streamId,它是Transport用於分揀Response的憑證。最終調用的全部參數都會被封裝在Stream中。

    4)檢測DEADLINE,是否已通過期,若是過時,將使用FailingClientStream對象來模擬整個RPC過程,固然請求不會經過通道發出,直接通過異常流處理過程。

    5)而後獲取transport,若是此時檢測到transport已經中斷,則重建transport。(自動重練機制,ClientCallImpl.start()方法)

    6)發送請求參數,即咱們Request實例。一次RPC調用,數據是分屢次發送,可是ClientCall在建立時已經綁定到了指定的線程上,因此數據發送老是經過一個線程進行(不會亂序)。

    7)將ClientCall實例置爲halfClose,即半關閉,並非將底層Channel或者Transport半關閉,只是邏輯上限定此ClientCall實例上將不能繼續發送任何stream信息,而是等待Response。

    8)Netty底層IO將會對reponse數據流進行解包(Http2ConnectionDecoder),並根據streamId分揀Response,同時喚醒響應的ClientCalls阻塞。(參見ClientCalls,GrpcFuture)

    9)若是是BlockingStub,則請求返回,若是響應中包含應用異常,則封裝後拋出;若是是網絡異常,則可能觸發Channel重建、Stream重置等。

 

轉載自:http://shift-alt-ctrl.iteye.com/blog/2292862

相關文章
相關標籤/搜索