基於Protobuf的分佈式高性能RPC框架——Navi-Pbrpc

基於Protobuf的分佈式高性能RPC框架——Navi-Pbrpc

1 簡介

Navi-pbrpc框架是一個高性能的遠程調用RPC框架,使用netty4技術提供非阻塞、異步、全雙工的信道,使用protobuf做爲序列化協議,同時提供長、短鏈接模式,支持non-blocking和傳統的blocking io,以及負載均衡,容錯處理策略等,對於基於socket的分佈式調用提供通訊基礎。java

若是你的項目中須要高性能的RPC解決方案,那麼navi-pbrpc能夠幫助到你構建一個強大的遠程調用系統。git

Navi-pbrpc使用netty nio開發,全雙工、異步、非阻塞的通訊模型,保證了高性能和理想的QPS,瞭解詳細性能測試報告見附錄性能測試。github

單測覆蓋率見附錄。spring

設計關於UML類圖見附錄。 shell

github已開源,連接請點此https://github.com/neoremind/navi-pbrpcapi

 

2 協議介紹

Navi-pbrpc通訊模型以下,服務端與客戶端通訊採用4層TCP Socket通訊,支持長、短鏈接鏈路,應用層採用header+body方式做爲一個package或者叫作frame,header內含的body length屬性來代表二進制數據長度,body採用通過protobuf壓縮後的二進制數據。 
 
 -------------                                              ------------- 
|             |                                            |             |       
|    客戶端    |                                            |    服務端    |     
|             |                                            |             |     
|             |                                            |             |    
|             |                                            |             |
|    應用層    | ----NsHead + protobuf序列化body(byte[])-----|    應用層    |   
|-------------|                                            |-------------|   
|             | -----------  全雙工短鏈接tcp socket  --------|             |   
|             | ------------[全雙工長鏈接tcp socket]---------|             |
|             |                   .                        |             |  
|             |                   .                        |             |  
|    傳輸層    |                 (1-n條channel)             |    傳輸層    |  
|             |                   .                        |             |  
|             |                   .                        |             |  
|             | ------------[全雙工長鏈接tcp socket]---------|             |     
|-------------|                                            |-------------|         
|    網絡層    |                                            |    網絡層    |   
|-------------|                                            |-------------|    
|    鏈路層    |                                            |    鏈路層    |    
|-------------|                                            |-------------|    
|    物理層    | ================== <<->> ================= |    物理層    |    
 -------------                                              -------------

Header在框架內部叫作NsHead,NsHead + protobuf序列化body包結構示意以下,關於NsHead頭結構更多信息見附錄。安全

     Byte/      0       |       1       |       2       |       3       |
         /              |               |               |               |
        |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
        +---------------+---------------+---------------+---------------+
       0/ NsHead                                                        /
        /                                                               /
        /                                                               /
        /                                                               /
        +---------------+---------------+---------------+---------------+
      36/ protobuf序列化後的數據                                          /
       +/  (body長度在NsHead中定義)                                       /
        +---------------+---------------+---------------+---------------+

 

3 使用方法

3.1 準備工做

使用Maven管理的工程POM依賴請添加:服務器

<dependency>
    <groupId>com.baidu.beidou</groupId>
    <artifactId>navi-pbrpc</artifactId>
    <version>1.1.1</version>
</dependency>

最新依賴請查找:Sonatype(https://oss.sonatype.org/#nexus-search;quick%7Enavi-pbrpc) 網絡

Maven依賴樹以下:併發

+- commons-pool:commons-pool:jar:1.5.7:compile
+- com.google.protobuf:protobuf-java:jar:2.5.0:compile
+- io.netty:netty-all:jar:4.0.28.Final:compile
+- org.javassist:javassist:jar:3.18.1-GA:compile
+- org.slf4j:slf4j-api:jar:1.7.7:compile
+- org.slf4j:slf4j-log4j12:jar:1.7.7:compile
|  \- log4j:log4j:jar:1.2.17:compile

3.2 服務端開發

3.2.1 protoc生成代碼

首先定義服務的proto,例如新建一個demo.proto文件,內容以下:

package com.baidu.beidou.navi.pbrpc.demo.proto;
 
option cc_generic_services = true;
 
message DemoRequest {
    optional int32 user_id = 1;
}
 
message DemoResponse {
    optional int32 user_id = 1;
    optional string user_name = 2;
    enum GenderType {
        MALE = 1;
        FEMALE = 2;
    }  
    optional GenderType gender_type = 3;
}

使用protoc命令編譯,生成Demo.java,方法見附錄。

3.2.2 開發服務實現

開發一個服務端的實現,例如DemoServiceImpl,代碼以下:

public class DemoServiceImpl implements DemoService {
 
    @Override
    public DemoResponse doSmth(DemoRequest req) {
        DemoResponse.Builder builder = DemoResponse.newBuilder();
        builder.setUserId(1);
        builder.setUserName("name-1");
        builder.setGenderType(DemoResponse.GenderType.MALE);
        return builder.build();
    }
}

特別注意,一個方法若想暴露爲服務必須知足以下限制:

  • 參數必須只有1個。
  • 參數和返回值類型必須爲繼承自com.google.protobuf.GeneratedMessage。由protoc生成的java bean都會繼承這個類。

3.2.3 暴露而且啓動服務

啓動服務端,代碼以下: 

PbrpcServer server = new PbrpcServer(8088);
server.register(100, new DemoServiceImpl());
server.start();

表示開放端口爲8088,將DemoServiceImpl這個對象中的方法注入server,做爲服務。register(int, Object)中的第一個參數做爲服務標示的起始值,默認會遍歷Object中的全部方法,把符合上述限制條件的方法暴露爲服務,其標示從int起始值開始,依次遞增1,這個例子中DemoServiceImpl.doSmth(..)方法的標示就是100,若是還有其餘方法能夠暴露,則從101開始遞增。

這裏注意,服務端默認若是全雙工的channel鏈路在1個小時以內沒有任何數據寫入,那麼會自動關閉該鏈路,避免浪費服務端資源。Navi-rpc短鏈接調用不受影響,對於池化的長鏈接再下次發起請求的時候會從新make connection,若是是非Navi-rpc客戶端的其餘長鏈接接入,請注意這個限制。

3.2.4 關閉服務

安全關閉鏈接的方法以下:

server.shutdown();

 

4 客戶端開發

4.1 同步調用與異步調用

在下面的代碼示例中,會看到client調用遠程RPC,會有同步以及異步的方式,做爲異步方式的調用示例以下:

// 異步調用
CallFuture<demoresponse> future = client.asyncTransport(DemoResponse.class, msg);  // 阻塞線程,等待結果 DemoResponse res = future.get(); </demoresponse>

調用客戶端能夠發送完請求後,拿到future,選擇作其餘邏輯,或者在get()上阻塞等待。

做爲同步方式的調用示例以下: 

// 同步調用
DemoResponse res = client.syncTransport(DemoResponse.class, msg);

調用客戶端會一直阻塞等待。

4.2 nio短鏈接調用

// 構造客戶端
PbrpcClient client = PbrpcClientFactory.buildShortLiveConnection("127.0.0.1", 8088, 60000);
 
// 構建請求數據
DemoRequest.Builder req = DemoRequest.newBuilder();
req.setUserId(1);
byte[] data = req.build().toByteArray();
 
// 構造請求消息
PbrpcMsg msg = new PbrpcMsg();
msg.setServiceId(100);
msg.setProvider("beidou");
msg.setData(data);
 
// 異步調用
CallFuture<demoresponse> future = client.asyncTransport(DemoResponse.class, msg);  // 阻塞線程,等待結果 DemoResponse res = future.get();  // 打印結果 System.out.println(res); 這裏注意,一旦PbrpcClient創建好是能夠複用的,無需每次從新新建。 </demoresponse>

PbrpcClientFactory是一個client工廠,幫助構造短鏈接調用,其餘參數以下:

public static SimplePbrpcClient buildShortLiveConnection(String ip, int port);
public static SimplePbrpcClient buildShortLiveConnection(String ip, int port, int readTimeout);
public static SimplePbrpcClient buildShortLiveConnection(String ip, int port, int connTimeout, int readTimeout);

其中connTimeout表示客戶端鏈接時間,單位毫秒。

readTimeout表示客戶端調用時間,單位毫秒,超時會拋出TimeoutException。例如以下:

Exception in thread "main" java.lang.RuntimeException: Error occurrs due to Client call timeout, request logId=1696636656
    at com.baidu.beidou.navi.pbrpc.client.callback.CallFuture.get(CallFuture.java:97)
    at com.baidu.beidou.navi.pbrpc.client.PooledPbrpcClient.syncTransport(PooledPbrpcClient.java:109)
    at com.baidu.unbiz.soma.biz.siconf.rpc.pbrpc.product.protocol.TestBiz.main(TestBiz.java:31)
Caused by: com.baidu.beidou.navi.pbrpc.exception.TimeoutException: Client call timeout, request logId=1696636656
    at com.baidu.beidou.navi.pbrpc.client.TimeoutEvictor.detectTimetout(TimeoutEvictor.java:68)
    at com.baidu.beidou.navi.pbrpc.client.TimeoutEvictor.run(TimeoutEvictor.java:47)
    at java.util.TimerThread.mainLoop(Timer.java:512)
    at java.util.TimerThread.run(Timer.java:462)

4.3 nio長鏈接池調用

鏈接池默認開啓8個keepAlive長鏈接,代碼以下:

// 構造客戶端
PbrpcClient client = PbrpcClientFactory.buildPooledConnection(new PooledConfiguration(),
        "127.0.0.1", 8088, 60000);
 
// 構建請求數據
DemoRequest.Builder req = DemoRequest.newBuilder();
req.setUserId(1);
byte[] data = req.build().toByteArray();
 
// 構造請求消息
PbrpcMsg msg = new PbrpcMsg();
msg.setServiceId(100);
msg.setProvider("beidou");
msg.setData(data);
 
// 異步調用
CallFuture<demoresponse> future = client.asyncTransport(DemoResponse.class, msg);  // 阻塞線程,等待結果 DemoResponse res = future.get();  // 打印結果 System.out.println(res); </demoresponse>

其中PooledConfiguration能夠設置鏈接池相關的參數,例如多少個長鏈接等策略。

PbrpcClientFactory是一個client工廠,幫助構造長鏈接池調用,其餘參數以下: 

public static PooledPbrpcClient buildPooledConnection(String ip, int port);
public static PooledPbrpcClient buildPooledConnection(String ip, int port, int readTimeout);
public static PooledPbrpcClient buildPooledConnection(PooledConfiguration configuration,
        String ip, int port, int readTimeout);
public static PooledPbrpcClient buildPooledConnection(PooledConfiguration configuration,
        String ip, int port, int connTimeout, int readTimeout);

其中connTimeout表示客戶端鏈接時間,單位毫秒。

readTimeout表示客戶端調用時間,單位毫秒,超時會拋出TimeoutException。

4.4 Blocking IO短鏈接調用 

// 構造客戶端
PbrpcClient client = PbrpcClientFactory.buildShortLiveBlockingIOConnection("127.0.0.1",
        8088, 60000);
 
// 構建請求數據
DemoRequest.Builder req = DemoRequest.newBuilder();
req.setUserId(1);
byte[] data = req.build().toByteArray();
 
// 構造請求消息
PbrpcMsg msg = new PbrpcMsg();
msg.setServiceId(100);
msg.setProvider("beidou");
msg.setData(data);
 
// 同步調用,blocking IO只支持同步調用
DemoResponse res = client.syncTransport(DemoResponse.class, msg);
 
// 打印結果
System.out.println(res);

默認只支持同步調用,其餘構造方法以下:

public static BlockingIOPbrpcClient buildShortLiveBlockingIOConnection(String ip, int port);
public static BlockingIOPbrpcClient buildShortLiveBlockingIOConnection(String ip, int port,
        int readTimeout);
public static BlockingIOPbrpcClient buildShortLiveBlockingIOConnection(String ip, int port,
        int connTimeout, int readTimeout);

特別注意,調用一個不能定位logId的pbrpc服務,請必須使用blocking IO方式,半雙工通訊方式,即一問一答,流程以下圖所示:

        1.request ------------------------->
client --------single TCP connection-------- server
        <-------------------------2.response

對於netty nio來講沒法標示到全雙工後服務端發送回來的一個包到底映射到本地哪一個調用請求上,對於經過Navi-pbrpc暴露的service服務,各類方式能夠隨意使用。

4.5 Blocking IO長鏈接池調用

// 構造客戶端
PbrpcClient client = PbrpcClientFactory.buildPooledBlockingIOConnection("127.0.0.1",
        8088, 60000);
 
// 構建請求數據
DemoRequest.Builder req = DemoRequest.newBuilder();
req.setUserId(1);
byte[] data = req.build().toByteArray();
 
// 構造請求消息
PbrpcMsg msg = new PbrpcMsg();
msg.setServiceId(100);
msg.setProvider("beidou");
msg.setData(data);
 
// 同步調用,blocking IO只支持同步調用
DemoResponse res = client.syncTransport(DemoResponse.class, msg);
 
// 打印結果
System.out.println(res);

默認只支持同步調用,其餘構造方法以下:

public static BlockingIOPooledPbrpcClient buildPooledBlockingIOConnection(String ip, int port);
public static BlockingIOPooledPbrpcClient buildPooledBlockingIOConnection(String ip, int port,
        int readTimeout);
public static BlockingIOPooledPbrpcClient buildPooledBlockingIOConnection(
        PooledConfiguration configuration, String ip, int port, int readTimeout);
public static BlockingIOPooledPbrpcClient buildPooledBlockingIOConnection(
        PooledConfiguration configuration, String ip, int port, int connTimeout, int readTimeout);

4.6 帶有負載均衡以及容錯策略的HA客戶端調用

// 構造客戶端
PbrpcClient client = HAPbrpcClientFactory.buildShortLiveConnection("127.0.0.1:8088,1.1.1.1:9999",
        new RRLoadBalanceStrategy(new FailOverStrategy(2)));
 
// 構建請求數據
DemoRequest.Builder req = DemoRequest.newBuilder();
req.setUserId(1);
byte[] data = req.build().toByteArray();
 
// 構造請求消息
PbrpcMsg msg = new PbrpcMsg();
msg.setServiceId(100);
msg.setProvider("beidou");
msg.setData(data);
 
// 異步調用
CallFuture<demoresponse> future = client.asyncTransport(DemoResponse.class, msg);  // 阻塞線程,等待結果 DemoResponse res = future.get();  // 打印結果 System.out.println(res); </demoresponse>

其中HAPbrpcClientFactory是負責構造高可用客戶端的工廠,第一個參數是一個IP:PORT串,按照逗號分隔。

其後面的參數是可擴展的負載均衡策略和容錯處理策略,RRLoadBalanceStrategy表示使用輪訓(Round Robin)策略,FailOverStrategy表示容錯策略爲失敗重試,最多重試次數爲2。

還支持的其餘策略組合爲RandomLoadBalanceStrategy標示隨機策略,FailFastStrategy表示失敗當即退出。能夠隨意組合。

其餘構造方法以下:

public static HAPbrpcClient buildShortLiveConnection(String connectString,
        LoadBalanceStrategy lb);
public static HAPbrpcClient buildShortLiveConnection(String connectString, int readTimeout,
        LoadBalanceStrategy lb);
public static HAPbrpcClient buildShortLiveConnection(String connectString, int connTimeout,
        int readTimeout, LoadBalanceStrategy lb);
public static HAPbrpcClient buildPooledConnection(String connectString, LoadBalanceStrategy lb);
public static HAPbrpcClient buildPooledConnection(String connectString, int readTimeout,
        LoadBalanceStrategy lb);
public static HAPbrpcClient buildPooledConnection(PooledConfiguration configuration,
        String connectString, int readTimeout, LoadBalanceStrategy lb);
public static HAPbrpcClient buildPooledConnection(PooledConfiguration configuration,
        String connectString, int connTimeout, int readTimeout, LoadBalanceStrategy lb);
 
public static HAPbrpcClient buildShortLiveBlockingIOConnection(String connectString,
        LoadBalanceStrategy lb);
public static HAPbrpcClient buildShortLiveBlockingIOConnection(String connectString,
        int readTimeout, LoadBalanceStrategy lb);
public static HAPbrpcClient buildShortLiveBlockingIOConnection(String connectString,
        int connTimeout, int readTimeout, LoadBalanceStrategy lb);
public static HAPbrpcClient buildShortLiveBlockingIOConnection(
        PbrpcClientConfiguration configuration, String connectString, int connTimeout,
        int readTimeout, LoadBalanceStrategy lb);
public static HAPbrpcClient buildPooledBlockingIOConnection(String connectString,
        LoadBalanceStrategy lb);
public static HAPbrpcClient buildPooledBlockingIOConnection(String connectString,
        int readTimeout, LoadBalanceStrategy lb);
public static HAPbrpcClient buildPooledBlockingIOConnection(PooledConfiguration configuration,
        String connectString, int readTimeout, LoadBalanceStrategy lb);
public static HAPbrpcClient buildPooledBlockingIOConnection(String connectString,
        int connTimeout, int readTimeout, LoadBalanceStrategy lb);
public static HAPbrpcClient buildPooledBlockingIOConnection(PooledConfiguration configuration,
        String connectString, int connTimeout, int readTimeout, LoadBalanceStrategy lb);
public static HAPbrpcClient buildPooledBlockingIOConnection(PooledConfiguration configuration,
        PbrpcClientConfiguration clientConfig, String connectString, int connTimeout,
        int readTimeout, LoadBalanceStrategy lb);

4.7 關閉鏈接

安全關閉鏈接和各類鏈接池的方法以下:

client.shutdown();

 

5 與Spring集成

5.1 準備工做

Maven POM依賴請添加:

<dependency>
    <groupId>com.baidu.beidou</groupId>
    <artifactId>navi-pbrpc-spring</artifactId>
    <version>1.1.1</version>
</dependency>

5.2 開發服務接口

1)根據服務提供方的proto文件生成java代碼。此處省略具體方法。詳細見第一部分。

2)開發一個Java的Interface

接口名稱隨意,達意便可。

入參有且僅有一個請求類型,參數和返回值類型必須繼承自com.google.protobuf.GeneratedMessage。由protoc生成的java bean都會繼承這個類。

方法名隨意,達意便可。

方法上加入一個PbrpcMethodId的註解,標明遠程服務的method id,若是沒有註解則默認爲0。

一個實例以下,這裏的DemoResponse和DemoRequest都是根據proto生成的java類定義,100標示遠程服務的method id標識。

/**
 * ClassName: DemoService <br />
 * Function: 遠程服務接口demo
 *
 * @author Zhang Xu
 */
public interface DemoService {
 
    /**
     * 乾點什麼
     *
     * @param req 請求
     * @return 響應
     */
    @PbrpcMethodId(100)
    DemoResponse doSmth(DemoRequest req);
 
}

5.3 配置XML

一般項目均會與Spring集成,利用Spring的IoC配置管理,能夠作到功能的靈活插拔可擴展,一個最經常使用的典型配置是

使用properties文件中配置的IP:PORT列表標示遠程服務

使用短鏈接blocking io訪問遠程服務

將下面的配置加入到你的XML文件便可,說明全在註釋中。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">
 
    <aop:aspectj-autoproxy proxy-target-class="true"/>
 
    <context:annotation-config/>
    <context:component-scan base-package="com.baidu.beidou"/>
 
    <!-- properties配置文件,內含ip端口列表或者一些timeout設置 -->
    <bean id="propertyPlaceholderConfigurerConfig"
          class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/>
        <property name="ignoreResourceNotFound" value="true"/>
        <property name="ignoreUnresolvablePlaceholders" value="true"/>
        <property name="locations">
            <list>
                <value>classpath:autoevict/application.properties</value>
            </list>
        </property>
    </bean>
 
    <!-- 自動剔除傳輸回調callback,單位時間內調用失敗率大於某個百分比,則剔除掉該客戶端 -->
    <!-- 下面的例子表示服務啓動後2s(initDelay)開始第一次檢查,檢查週期是6s(checkPeriod), -->
    <!-- 檢查週期內錯誤率大於80%(maxFailPercentage)而且調用次數大於3次(minInvokeNumber)則剔除 -->
    <bean id="autoEvictTransportCallback" class="com.baidu.beidou.navi.pbrpc.client.AutoEvictTransportCallback">
        <property name="checkPeriod" value="6000"/>
        <property name="minInvokeNumber" value="3"/>
        <property name="initDelay" value="2000"/>
        <property name="maxFailPercentage" value="80"/>
    </bean>
 
    <!-- 高可用相關配置,FailOverStrategy表明失敗重試,FailFastStrategy表明失敗當即退出 -->
    <!-- 負載均衡配置中,RRLoadBalanceStrategy表明輪訓調用服務器,RandomLoadBalanceStrategy表明隨機選擇服務器調用 -->
    <!-- 默認transportCallback不作任何事情,能夠配置AutoEvictTransportCallback作自動剔除失效連接 -->
    <bean id="failoverStrategy" class="com.baidu.beidou.navi.pbrpc.client.ha.FailOverStrategy"/>
    <bean id="roundRobinLoadBalanceStrategy" class="com.baidu.beidou.navi.pbrpc.client.ha.RRLoadBalanceStrategy">
        <property name="failStrategy" ref="failoverStrategy"/>
        <property name="transportCallback" ref="autoEvictTransportCallback"/>
    </bean>
 
    <!-- Pbprc服務server定位locator工廠,這裏使用BlockingIO短鏈接 -->
    <bean id="pbrpcServerLocator"
          class="com.baidu.beidou.navi.pbrpc.client.IpPortShortLiveBlockingIOPbrpcServerLocator"/>
 
    <!-- 經過Pbprc服務server定位locator工廠構造高可用客戶端 -->
    <bean id="haPbrpcClient"
          factory-bean="pbrpcServerLocator"
          factory-method="factory">
        <constructor-arg value="${pbrpc.client.server}"/>
        <constructor-arg value="${pbrpc.client.connect.timeout}"/>
        <constructor-arg value="${pbrpc.client.read.timeout}"/>
        <constructor-arg ref="roundRobinLoadBalanceStrategy"/>
    </bean>
 
    <!-- Pbprc代理proxy生成器,須要指定高可用pbrpc客戶端和provider標示 -->
    <!-- 這裏的proxy是利用jdk的動態代理技術構建的,proxy也可使用javassist動態字節碼技術生成 -->
    <bean id="integrationProxy" class="com.baidu.beidou.navi.pbrpc.spring.JdkDynamicIntegrationProxy">
        <property name="pbrpcClient" ref="haPbrpcClient"/>
        <property name="provider" value="beidou"/>
    </bean>
 
    <!-- 服務bean定義,使用Spring的FactoryBean來作bean代理,可使用Resource註解注入這個bean -->
    <bean id="demoService" class="com.baidu.beidou.navi.pbrpc.spring.PbrpcProxyFactoryBean">
        <property name="integrationProxy" ref="integrationProxy"/>
        <property name="serviceInterface">
            <value>com.baidu.beidou.navi.pbrpc.demo.service.DemoService</value>
        </property>
    </bean>
 
</beans>

properties配置以下:

pbrpc.client.server=127.0.0.1:14419,127.0.0.1:14420
pbrpc.client.connect.timeout=2000
pbrpc.client.read.timeout=5000

瞭解更多可選配置見下面小節。

5.4 開始調用

因爲上面配置了DemoService的代理,所以能夠用@Resource很天然地來使用bean,一個testcase以下。 

@ContextConfiguration(locations = {"classpath*:applicationContext.xml"})
public class SpringIntegrationIpPortListTest extends AbstractJUnit4SpringContextTests {
 
    @Autowired
    private DemoService demoService;
 
    @Test
    public void testDoSmth() {
        Demo.DemoRequest.Builder req = Demo.DemoRequest.newBuilder();
        req.setUserId(1);
        Demo.DemoResponse response = demoService.doSmth(req.build());
        System.out.println(response);
        assertThat(response.getUserId(), is(1));
    }
}

5.5 其餘配置

5.5.1 單點的配置IP:PORT而且不啓用自動失效剔除

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">
 
    <aop:aspectj-autoproxy proxy-target-class="true"/>
 
    <context:annotation-config/>
    <context:component-scan base-package="com.baidu.beidou"/>
 
    <!-- properties配置文件,內含ip端口列表或者一些timeout設置 -->
    <bean id="propertyPlaceholderConfigurerConfig"
          class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/>
        <property name="ignoreResourceNotFound" value="true"/>
        <property name="ignoreUnresolvablePlaceholders" value="true"/>
        <property name="locations">
            <list>
                <value>classpath:ipportlist/application.properties</value>
            </list>
        </property>
    </bean>
 
    <!-- 高可用相關配置,FailOverStrategy表明失敗重試,FailFastStrategy表明失敗當即退出 -->
    <!-- 負載均衡配置中,RRLoadBalanceStrategy表明輪訓調用服務器,RandomLoadBalanceStrategy表明隨機選擇服務器調用 -->
    <!-- 默認transportCallback不作任何事情,能夠配置AutoEvictTransportCallback作自動剔除失效連接 -->
    <bean id="failoverStrategy" class="com.baidu.beidou.navi.pbrpc.client.ha.FailOverStrategy"/>
    <bean id="roundRobinLoadBalanceStrategy" class="com.baidu.beidou.navi.pbrpc.client.ha.RRLoadBalanceStrategy">
        <property name="failStrategy" ref="failoverStrategy"/>
    </bean>
 
    <!-- 手工配置單點pbrpc客戶端,能夠配置1到多個 -->
    <!-- 這裏使用BlockingIO短鏈接 -->
    <bean id="pbrpcClient1" class="com.baidu.beidou.navi.pbrpc.client.BlockingIOPbrpcClient">
        <property name="ip" value="${pbrpc.client1.ip}"/>
        <property name="port" value="${pbrpc.client1.port}"/>
        <property name="readTimeout" value="${pbrpc.client.read.timeout}"/>
        <property name="connTimeout" value="${pbrpc.client.connect.timeout}"/>
    </bean>
    <bean id="pbrpcClient2" class="com.baidu.beidou.navi.pbrpc.client.BlockingIOPbrpcClient">
        <property name="ip" value="${pbrpc.client2.ip}"/>
        <property name="port" value="${pbrpc.client2.port}"/>
        <property name="readTimeout" value="${pbrpc.client.read.timeout}"/>
        <property name="connTimeout" value="${pbrpc.client.connect.timeout}"/>
    </bean>
 
    <!-- 高可用pbrpc客戶端,集成多個單點客戶端以及負載均衡策略 -->
    <bean id="haPbrpcClient" class="com.baidu.beidou.navi.pbrpc.client.HAPbrpcClient">
        <property name="loadBalanceStrategy" ref="roundRobinLoadBalanceStrategy"/>
        <property name="clientList">
            <list>
                <ref bean="pbrpcClient1"/>
                <ref bean="pbrpcClient2"/>
            </list>
        </property>
    </bean>
 
    <!-- Pbprc代理proxy生成器,須要指定高可用pbrpc客戶端和provider標示 -->
    <bean id="integrationProxy" class="com.baidu.beidou.navi.pbrpc.spring.JdkDynamicIntegrationProxy">
        <property name="pbrpcClient" ref="haPbrpcClient"/>
        <property name="provider" value="beidou"/>
    </bean>
 
    <!-- 服務bean定義,使用Spring的FactoryBean來作代理 -->
    <bean id="demoService" class="com.baidu.beidou.navi.pbrpc.spring.PbrpcProxyFactoryBean">
        <property name="integrationProxy" ref="integrationProxy"/>
        <property name="serviceInterface">
            <value>com.baidu.beidou.navi.pbrpc.demo.service.DemoService</value>
        </property>
    </bean>
 
</beans>

5.5.2 使用長鏈接池 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">
 
    <aop:aspectj-autoproxy proxy-target-class="true"/>
 
    <context:annotation-config/>
    <context:component-scan base-package="com.baidu.beidou"/>
 
    <!-- properties配置文件,內含ip端口列表或者一些timeout設置 -->
    <bean id="propertyPlaceholderConfigurerConfig"
          class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/>
        <property name="ignoreResourceNotFound" value="true"/>
        <property name="ignoreUnresolvablePlaceholders" value="true"/>
        <property name="locations">
            <list>
                <value>classpath:ipportstring_pooled/application.properties</value>
            </list>
        </property>
    </bean>
 
    <!-- 高可用相關配置,FailOverStrategy表明失敗重試,FailFastStrategy表明失敗當即退出 -->
    <!-- 負載均衡配置中,RRLoadBalanceStrategy表明輪訓調用服務器,RandomLoadBalanceStrategy表明隨機選擇服務器調用 -->
    <!-- 默認transportCallback不作任何事情,能夠配置AutoEvictTransportCallback作自動剔除失效連接 -->
    <bean id="failoverStrategy" class="com.baidu.beidou.navi.pbrpc.client.ha.FailOverStrategy"/>
    <bean id="roundRobinLoadBalanceStrategy" class="com.baidu.beidou.navi.pbrpc.client.ha.RRLoadBalanceStrategy">
        <property name="failStrategy" ref="failoverStrategy"/>
    </bean>
 
    <!-- Pbprc服務server定位locator工廠,這裏使用BlockingIO長鏈接池 -->
    <bean id="pbrpcServerLocator"
          class="com.baidu.beidou.navi.pbrpc.client.IpPortPooledBlockingIOPbrpcServerLocator"/>
 
    <!-- 經過Pbprc服務server定位locator工廠構造高可用客戶端 -->
    <bean id="haPbrpcClient"
          factory-bean="pbrpcServerLocator"
          factory-method="factory">
        <constructor-arg value="${pbrpc.client.server}"/>
        <constructor-arg value="${pbrpc.client.connect.timeout}"/>
        <constructor-arg value="${pbrpc.client.read.timeout}"/>
        <constructor-arg ref="roundRobinLoadBalanceStrategy"/>
    </bean>
 
    <!-- Pbprc代理proxy生成器,須要指定高可用pbrpc客戶端和provider標示 -->
    <bean id="integrationProxy" class="com.baidu.beidou.navi.pbrpc.spring.JdkDynamicIntegrationProxy">
        <property name="pbrpcClient" ref="haPbrpcClient"/>
        <property name="provider" value="beidou"/>
    </bean>
 
    <!-- 服務bean定義,使用Spring的FactoryBean來作代理 -->
    <bean id="demoService" class="com.baidu.beidou.navi.pbrpc.spring.PbrpcProxyFactoryBean">
        <property name="integrationProxy" ref="integrationProxy"/>
        <property name="serviceInterface">
            <value>com.baidu.beidou.navi.pbrpc.demo.service.DemoService</value>
        </property>
    </bean>
 
</beans>

 

6 附錄

6.1 NsHead頭結構

     Byte/      0       |       1       |       2       |       3       |
         /              |               |               |               |
        |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
        +---------------+---------------+---------------+---------------+
       0| id                            | flags                         |
        +---------------+---------------+---------------+---------------+
       4| log id                                                        |
        +---------------+---------------+---------------+---------------+
       8| provider                                                      |
        +                                                               +
        |                                                               |
        +                                                               +
      16|                                                               |
        +                                                               +
      20|                                                               |
        +---------------+---------------+---------------+---------------+
      24| magic number                                                  |
        +---------------+---------------+---------------+---------------+
      28| method id                                                     |
        +---------------+---------------+---------------+---------------+
      32| body length                                                   |
        +---------------+---------------+---------------+---------------+
        Total 36 bytes

Header各字段含義

  1. id請求的id。目前未使用。建議設置爲0。
  2. flags本次請求的一些標誌符。目前框架用於傳輸errorCode。
  3. log-id。本次請求的日誌id。Navi-rpc服務端用該id定位一個惟一的客戶端請求。
  4. provider標識調用方的表示。
  5. magic-number特殊標識,用於標識一個包的完整性。目前未使用。
  6. method-id是RPC方法的序列號。根據proto文件中定義的service順序,從註冊進入的起始值開始依次遞增。
  7. body-length消息體長度。

6.2 設計——UML類圖

6.3 性能測試報告

測試環境: Linux內核版本:2.6.32_1-11-0-0 CPU:Intel(R) Xeon(R) CPU E5-2620 0 @ 2.00GHz processor_count : 12 內存:64G 在同一臺物理機上測試。

JVM參數: -Xms512m -Xmx512m

測試壓力: 10w請求,20併發,測試期間會有4個以上的核所有100%負荷。

測試case: 客戶端發起請求,要求字符串長度以及數量,服務端返回一個指定數量的List給予客戶端,字符串爲隨機生成。

測試結果: 能夠看出在常見的請求區間10k左右數據大小,QPS能在18000+。 

傳輸數據大小  響應時間(毫秒)    QPS
50byte  3186    31387
1k  4063    24612
10k 5354    18677
20k 7833    12766
50k 12658   7900

 

6.4 長鏈接池PooledConfiguration配置詳解

/**
 * 控制池中空閒的對象的最大數量。 默認值是8,若是是負值表示沒限制。
 */
private int maxIdle = GenericObjectPool.DEFAULT_MAX_IDLE;
 
/**
 * whenExhaustedAction若是是WHEN_EXHAUSTED_BLOCK,指定等待的毫秒數。<br />
 * 若是maxWait是正數,那麼會等待maxWait的毫秒的時間,超時會拋出NoSuchElementException異常 ;<br />
 * 若是maxWait爲負值,會永久等待。 maxWait的默認值是-1。
 */
private long maxWait = GenericObjectPool.DEFAULT_MAX_WAIT;
 
/**
 * 若是testOnBorrow被設置,pool會在borrowObject返回對象以前使用PoolableObjectFactory的validateObject來驗證這個對象是否有效,要是對象沒經過驗證,這個對象會被丟棄,
 * 而後從新選擇一個新的對象。 testOnBorrow的默認值是false,可使用GenericObjectPool.DEFAULT_TEST_ON_BORROW;。
 * <p>&nbsp;</p>
 * 注意,對於長期idle的鏈接,服務端會默認關閉channel此時客戶端並不知曉,所以不能使用已經失效的channel,爲保證客戶端可用,這裏暫時使用這個策略每次borrow的時候都test
 */
private boolean testOnBorrow = true;
 
/**
 * 控制池中空閒的對象的最小數量。 默認值是0。
 */
private int minIdle = GenericObjectPool.DEFAULT_MIN_IDLE;
 
/**
 * 控制池中對象的最大數量。 默認值是8,若是是負值表示沒限制。
 */
private int maxActive = GenericObjectPool.DEFAULT_MAX_ACTIVE;
 
/**
 * 若是testOnReturn被設置,pool會在returnObject的時候經過PoolableObjectFactory的validateObject方法驗證對象,若是對象沒經過驗證,對象會被丟棄,不會被放到池中。
 * testOnReturn的默認值是false。
 */
private boolean testOnReturn = GenericObjectPool.DEFAULT_TEST_ON_RETURN;
 
/**
 * 指定idle對象是否應該使用PoolableObjectFactory的validateObject校驗,若是校驗失敗,這個對象會從對象池中被清除。
 * 這個設置僅在timeBetweenEvictionRunsMillis被設置成正值(&gt;0)的時候纔會生效。 testWhileIdle的默認值是false。
 */
private boolean testWhileIdle = GenericObjectPool.DEFAULT_TEST_WHILE_IDLE;
 
/**
 * 指定驅逐線程的休眠時間。若是這個值不是正數(&gt;0),不會有驅逐線程運行。 timeBetweenEvictionRunsMillis的默認值是-1。
 */
private long timeBetweenEvictionRunsMillis = GenericObjectPool.DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
 
/**
 * 設置驅逐線程每次檢測對象的數量。 這個設置僅在timeBetweenEvictionRunsMillis被設置成正值(&gt;0)的時候纔會生效。 numTestsPerEvictionRun的默認值是3。
 */
private int numTestsPerEvictionRun = GenericObjectPool.DEFAULT_NUM_TESTS_PER_EVICTION_RUN;
 
/**
 * 指定最小的空閒驅逐的時間間隔(空閒超過指定的時間的對象,會被清除掉)。 這個設置僅在timeBetweenEvictionRunsMillis被設置成正值(&gt;0)的時候纔會生效。
 * minEvictableIdleTimeMillis默認值是30分鐘。
 */
private long minEvictableIdleTimeMillis = GenericObjectPool.DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS;
 
/**
 * 與minEvictableIdleTimeMillis相似,也是指定最小的空閒驅逐的時間間隔(空閒超過指定的時間的對象,會被清除掉),不過會參考minIdle的值,只有idle對象的數量超過minIdle的值,對象纔會被清除。
 * 這個設置僅在timeBetweenEvictionRunsMillis被設置成正值
 * (&gt;0)的時候纔會生效,而且這個配置能被minEvictableIdleTimeMillis配置取代(minEvictableIdleTimeMillis配置項的優先級更高)。
 * softMinEvictableIdleTimeMillis的默認值是-1。
 */
private long softMinEvictableIdleTimeMillis = GenericObjectPool.DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS;
 
/**
 * 設置後進先出的池策略。pool能夠被配置成LIFO隊列(last-in-first-out)或FIFO隊列(first-in-first-out),來指定空閒對象被使用的次序。 lifo的默認值是true。
 */
private boolean lifo = GenericObjectPool.DEFAULT_LIFO;
 
/**
 * 指定池中對象被消耗完之後的行爲,有下面這些選擇: WHEN_EXHAUSTED_FAIL 0 WHEN_EXHAUSTED_GROW 2 WHEN_EXHAUSTED_BLOCK 1
 * 若是是WHEN_EXHAUSTED_FAIL,當池中對象達到上限之後,繼續borrowObject會拋出NoSuchElementException異常。
 * 若是是WHEN_EXHAUSTED_GROW,當池中對象達到上限之後,會建立一個新對象,並返回它。
 * 若是是WHEN_EXHAUSTED_BLOCK,當池中對象達到上限之後,會一直等待,直到有一個對象可用。這個行爲還與maxWait有關
 * ,若是maxWait是正數,那麼會等待maxWait的毫秒的時間,超時會拋出NoSuchElementException異常;若是maxWait爲負值,會永久等待。
 * whenExhaustedAction的默認值是WHEN_EXHAUSTED_BLOCK,maxWait的默認值是-1。
 */
private byte whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_BLOCK;

6.5 默認值手冊

/**
 * 默認客戶端鏈接超時時間,單位毫秒
 */
public static final int DEFAULT_CLIENT_CONN_TIMEOUT = 4000;
 
/**
 * 默認客戶端調用讀超時時間,單位毫秒
 */
public static final int DEFAULT_CLIENT_READ_TIMEOUT = 60000;
 
/**
 * 默認客戶端超時調用檢測器啓動時間,單位毫秒
 */
public static int CLIENT_TIMEOUT_EVICTOR_DELAY_START_TIME = 5000;
 
/**
 * 默認客戶端超時調用檢測器檢測間隔,單位毫秒
 */
public static int CLIENT_TIMEOUT_EVICTOR_CHECK_INTERVAL = 5000;

6.6 protoc生成原生proto代碼方法

1)下載的protobuffer編譯客戶端: github:https://github.com/google/protobuf/releases 目前經常使用的是2.5.0版本

2)重命名問xxx.proto爲本身想生成類名稱

3)修改文件中package爲本身的包前綴

4)調用命令:protoc –java_out=xxx.proto 

其餘生成方法可使用各類IDE或者編輯器(如sublime text)直接生成。

相關文章
相關標籤/搜索