http://my.oschina.net/huangyong/blog/361751

       

                    當前訪客身份:遊客 [ 登陸 | 加入開源中國 ]

當前訪客身份: 遊客 [ 登陸 | 加入開源中國 ]

   

                   

   

軟件

   

黃勇                        黃勇         men.png         關注此人            

    關注(39)     粉絲(1456)     積分(855)                          

   

學習 討論 總結 分享

.發送留言 .請教問題

博客分類  

閱讀排行  

  1. 1. Smart Framework:輕量級 Java Web 框架

  2. 2. AOP 那點事兒

  3. 3. ThreadLocal 那點事兒

  4. 4. Proxy 那點事兒

  5. 5. Entity 映射機制實現原理

  6. 6. ThreadLocal 那點事兒(續集)

  7. 7. 一個簡單的 Cache 淘汰策略

  8. 8. Maven 那點事兒

最新評論        

  • @黃勇:引用來自「夜夜獨唱」的評論 勇哥跟我同樣的習慣... 查看

  • @夜夜獨唱:勇哥跟我同樣的習慣啊,喜歡捕獲異常後轉爲Runti... 查看

  • @ZHAOXJMAIL:建議樓主再完善下,增長服務提供端與註冊端斷開後... 查看

  • @芝***:前半部分看懂了,後半部分zooKeeper部分不那麼容... 查看

  • @heters1:ChannelFuture future = bootstrap.connect(host... 查看

  • @三修:我的認爲能夠把checkAddClass, 改成getChecker,... 查看

  • @heters1:最近剛打算寫個協議棧,準備使用netty,樓主爲啥... 查看

  • @小菜鳥27:謝謝了 查看

  • @黃勇:引用來自「儀山湖」的評論protobuffer, thrift都... 查看

  • @黃勇:引用來自「王仁輝(java)」的評論求評價 個人res... 查看

友情連接  

訪客統計

  • 今日訪問:24

  • 昨日訪問:206

  • 本週訪問:1431

  • 本月訪問:2252

  • 全部訪問:121413

   

    空間 博客    

   

   

輕量級分佈式 RPC 框架

        發表於2周前(2014-12-29 00:16)       閱讀(2079) | 評論(30)             146人收藏此文章, 我要收藏    

23

 

1月10日 #長沙# OSC 源創會第32期開始報名

             

                分佈式                 RPC                 Netty                 Protostuff                 ZooKeeper                  

 


             

目錄[-]

   

           

RPC,即 Remote Procedure Call(遠程過程調用),說得通俗一點就是:調用遠程計算機上的服務,就像調用本地服務同樣。

RPC 可基於 HTTP 或 TCP 協議,Web Service 就是基於 HTTP 協議的 RPC,它具備良好的跨平臺性,但其性能卻不如基於 TCP 協議的 RPC。會兩方面會直接影響 RPC 的性能,一是傳輸方式,二是序列化。

衆所周知,TCP 是傳輸層協議,HTTP 是應用層協議,而傳輸層較應用層更加底層,在數據傳輸方面,越底層越快,所以,在通常狀況下,TCP 必定比 HTTP 快。就序列化而言,Java 提供了默認的序列化方式,但在高併發的狀況下,這種方式將會帶來一些性能上的瓶頸,因而市面上出現了一系列優秀的序列化框架,好比:Protobuf、 Kryo、Hessian、Jackson 等,它們能夠取代 Java 默認的序列化,從而提供更高效的性能。

爲了支持高併發,傳統的阻塞式 IO 顯然不太合適,所以咱們須要異步的 IO,即 NIO。Java 提供了 NIO 的解決方案,Java 7 也提供了更優秀的 NIO.2 支持,用 Java 實現 NIO 並非高不可攀的事情,只是須要咱們熟悉 NIO 的技術細節。

咱們須要將服務部署在分佈式環境下的不一樣節點上,經過服務註冊的方式,讓客戶端來自動發現當前可用的服務,並調用這些服務。這須要一種服務註冊表(Service Registry)的組件,讓它來註冊分佈式環境下全部的服務地址(包括:主機名與端口號)。

應用、服務、服務註冊表之間的關係見下圖:

系統架構

每臺 Server 上可發佈多個 Service,這些 Service 共用一個 host 與 port,在分佈式環境下會提供 Server 共同對外提供 Service。此外,爲防止 Service Registry 出現單點故障,所以須要將其搭建爲集羣環境。

本文將爲您揭曉開發輕量級分佈式 RPC 框架的具體過程,該框架基於 TCP 協議,提供了 NIO 特性,提供高效的序列化方式,同時也具有服務註冊與發現的能力。

根據以上技術需求,咱們可以使用以下技術選型:

  1. Spring:它是最強大的依賴注入框架,也是業界的權威標準。

  2. Netty:它使 NIO 編程更加容易,屏蔽了 Java 底層的 NIO 細節。

  3. Protostuff:它基於 Protobuf 序列化框架,面向 POJO,無需編寫 .proto 文件。

  4. ZooKeeper:提供服務註冊與發現功能,開發分佈式系統的必備選擇,同時它也具有天生的集羣能力。

相關 Maven 依賴請見附錄。

第一步:編寫服務接口

?

1
2
3
4
public interface HelloService {
 
     String hello(String name);
}

將該接口放在獨立的客戶端 jar 包中,以供應用使用。

第二步:編寫服務接口的實現類

?

1
2
3
4
5
6
7
8
@RpcService (HelloService. class ) // 指定遠程接口
public class HelloServiceImpl implements HelloService {
 
     @Override
     public String hello(String name) {
         return "Hello! " + name;
     }
}

使用RpcService註解定義在服務接口的實現類上,須要對該實現類指定遠程接口,由於實現類可能會實現多個接口,必定要告訴框架哪一個纔是遠程接口。

RpcService代碼以下:

?

1
2
3
4
5
6
7
@Target ({ElementType.TYPE})
@Retention (RetentionPolicy.RUNTIME)
@Component // 代表可被 Spring 掃描
public @interface RpcService {
 
     Class<?> value();
}

該註解具有 Spring 的Component註解的特性,可被 Spring 掃描。

該實現類放在服務端 jar 包中,該 jar 包還提供了一些服務端的配置文件與啓動服務的引導程序。

第三步:配置服務端

服務端 Spring 配置文件名爲spring.xml,內容以下:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
< beans ...>
     < context:component-scan base-package = "com.xxx.rpc.sample.server" />
 
     < context:property-placeholder location = "classpath:config.properties" />
 
     <!-- 配置服務註冊組件 -->
     < bean id = "serviceRegistry" class = "com.xxx.rpc.registry.ServiceRegistry" >
         < constructor-arg name = "registryAddress" value = "${registry.address}" />
     </ bean >
 
     <!-- 配置 RPC 服務器 -->
     < bean id = "rpcServer" class = "com.xxx.rpc.server.RpcServer" >
         < constructor-arg name = "serverAddress" value = "${server.address}" />
         < constructor-arg name = "serviceRegistry" ref = "serviceRegistry" />
     </ bean >
</ beans >

具體的配置參數在config.properties文件中,內容以下:

?

1
2
3
4
5
# ZooKeeper 服務器
registry.address= 127.0 . 0.1 : 2181
 
# RPC 服務器
server.address= 127.0 . 0.1 : 8000

以上配置代表:鏈接本地的 ZooKeeper 服務器,並在 8000 端口上發佈 RPC 服務。

第四步:啓動服務器併發布服務

爲了加載 Spring 配置文件來發布服務,只需編寫一個引導程序便可:

?

1
2
3
4
5
6
public class RpcBootstrap {
 
     public static void main(String[] args) {
         new ClassPathXmlApplicationContext( "spring.xml" );
     }
}

運行RpcBootstrap類的main方法便可啓動服務端,但還有兩個重要的組件還沒有實現,它們分別是:ServiceRegistryRpcServer,下文會給出具體實現細節。

第五步:實現服務註冊

使用 ZooKeeper 客戶端可輕鬆實現服務註冊功能,ServiceRegistry代碼以下:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ServiceRegistry {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry. class );
 
     private CountDownLatch latch = new CountDownLatch( 1 );
 
     private String registryAddress;
 
     public ServiceRegistry(String registryAddress) {
         this .registryAddress = registryAddress;
     }
 
     public void register(String data) {
         if (data != null ) {
             ZooKeeper zk = connectServer();
             if (zk != null ) {
                 createNode(zk, data);
             }
         }
     }
 
     private ZooKeeper connectServer() {
         ZooKeeper zk = null ;
         try {
             zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                 @Override
                 public void process(WatchedEvent event) {
                     if (event.getState() == Event.KeeperState.SyncConnected) {
                         latch.countDown();
                     }
                 }
             });
             latch.await();
         } catch (IOException | InterruptedException e) {
             LOGGER.error( "" , e);
         }
         return zk;
     }
 
     private void createNode(ZooKeeper zk, String data) {
         try {
             byte [] bytes = data.getBytes();
             String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
             LOGGER.debug( "create zookeeper node ({} => {})" , path, data);
         } catch (KeeperException | InterruptedException e) {
             LOGGER.error( "" , e);
         }
     }
}

其中,經過Constant配置了全部的常量:

?

1
2
3
4
5
6
7
public interface Constant {
 
     int ZK_SESSION_TIMEOUT = 5000 ;
 
     String ZK_REGISTRY_PATH = "/registry" ;
     String ZK_DATA_PATH = ZK_REGISTRY_PATH + "/data" ;
}

注意:首先須要使用 ZooKeeper 客戶端命令行建立/registry永久節點,用於存放全部的服務臨時節點。

第六步:實現 RPC 服務器

使用 Netty 可實現一個支持 NIO 的 RPC 服務器,須要使用ServiceRegistry註冊服務地址,RpcServer代碼以下:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class RpcServer implements ApplicationContextAware, InitializingBean {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(RpcServer. class );
 
     private String serverAddress;
     private ServiceRegistry serviceRegistry;
 
     private Map<String, Object> handlerMap = new HashMap<>(); // 存放接口名與服務對象之間的映射關係
 
     public RpcServer(String serverAddress) {
         this .serverAddress = serverAddress;
     }
 
     public RpcServer(String serverAddress, ServiceRegistry serviceRegistry) {
         this .serverAddress = serverAddress;
         this .serviceRegistry = serviceRegistry;
     }
 
     @Override
     public void setApplicationContext(ApplicationContext ctx) throws BeansException {
         Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService. class ); // 獲取全部帶有 RpcService 註解的 Spring Bean
         if (MapUtils.isNotEmpty(serviceBeanMap)) {
             for (Object serviceBean : serviceBeanMap.values()) {
                 String interfaceName = serviceBean.getClass().getAnnotation(RpcService. class ).value().getName();
                 handlerMap.put(interfaceName, serviceBean);
             }
         }
     }
 
     @Override
     public void afterPropertiesSet() throws Exception {
         EventLoopGroup bossGroup = new NioEventLoopGroup();
         EventLoopGroup workerGroup = new NioEventLoopGroup();
         try {
             ServerBootstrap bootstrap = new ServerBootstrap();
             bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel. class )
                 .childHandler( new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel channel) throws Exception {
                         channel.pipeline()
                             .addLast( new RpcDecoder(RpcRequest. class )) // 將 RPC 請求進行解碼(爲了處理請求)
                             .addLast( new RpcEncoder(RpcResponse. class )) // 將 RPC 響應進行編碼(爲了返回響應)
                             .addLast( new RpcHandler(handlerMap)); // 處理 RPC 請求
                     }
                 })
                 .option(ChannelOption.SO_BACKLOG, 128 )
                 .childOption(ChannelOption.SO_KEEPALIVE, true );
 
             String[] array = serverAddress.split( ":" );
             String host = array[ 0 ];
             int port = Integer.parseInt(array[ 1 ]);
 
             ChannelFuture future = bootstrap.bind(host, port).sync();
             LOGGER.debug( "server started on port {}" , port);
 
             if (serviceRegistry != null ) {
                 serviceRegistry.register(serverAddress); // 註冊服務地址
             }
 
             future.channel().closeFuture().sync();
         } finally {
             workerGroup.shutdownGracefully();
             bossGroup.shutdownGracefully();
         }
     }
}

以上代碼中,有兩個重要的 POJO 須要描述一下,它們分別是RpcRequestRpcResponse

使用RpcRequest封裝 RPC 請求,代碼以下:

?

1
2
3
4
5
6
7
8
9
10
public class RpcRequest {
 
     private String requestId;
     private String className;
     private String methodName;
     private Class<?>[] parameterTypes;
     private Object[] parameters;
 
     // getter/setter...
}

使用RpcResponse封裝 RPC 響應,代碼以下:

?

1
2
3
4
5
6
7
8
public class RpcResponse {
 
     private String requestId;
     private Throwable error;
     private Object result;
 
     // getter/setter...
}

使用RpcDecoder提供 RPC 解碼,只需擴展 Netty 的ByteToMessageDecoder抽象類的decode方法便可,代碼以下:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class RpcDecoder extends ByteToMessageDecoder {
 
     private Class<?> genericClass;
 
     public RpcDecoder(Class<?> genericClass) {
         this .genericClass = genericClass;
     }
 
     @Override
     public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
         if (in.readableBytes() < 4 ) {
             return ;
         }
         in.markReaderIndex();
         int dataLength = in.readInt();
         if (dataLength < 0 ) {
             ctx.close();
         }
         if (in.readableBytes() < dataLength) {
             in.resetReaderIndex();
         }
         byte [] data = new byte [dataLength];
         in.readBytes(data);
 
         Object obj = SerializationUtil.deserialize(data, genericClass);
         out.add(obj);
     }
}

使用RpcEncoder提供 RPC 編碼,只需擴展 Netty 的MessageToByteEncoder抽象類的encode方法便可,代碼以下:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class RpcEncoder extends MessageToByteEncoder {
 
     private Class<?> genericClass;
 
     public RpcEncoder(Class<?> genericClass) {
         this .genericClass = genericClass;
     }
 
     @Override
     public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
         if (genericClass.isInstance(in)) {
             byte [] data = SerializationUtil.serialize(in);
             out.writeInt(data.length);
             out.writeBytes(data);
         }
     }
}

編寫一個SerializationUtil工具類,使用Protostuff實現序列化:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class SerializationUtil {
 
     private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();
 
     private static Objenesis objenesis = new ObjenesisStd( true );
 
     private SerializationUtil() {
     }
 
     @SuppressWarnings ( "unchecked" )
     private static <T> Schema<T> getSchema(Class<T> cls) {
         Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
         if (schema == null ) {
             schema = RuntimeSchema.createFrom(cls);
             if (schema != null ) {
                 cachedSchema.put(cls, schema);
             }
         }
         return schema;
     }
 
     @SuppressWarnings ( "unchecked" )
     public static <T> byte [] serialize(T obj) {
         Class<T> cls = (Class<T>) obj.getClass();
         LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
         try {
             Schema<T> schema = getSchema(cls);
             return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
         } catch (Exception e) {
             throw new IllegalStateException(e.getMessage(), e);
         } finally {
             buffer.clear();
         }
     }
 
     public static <T> T deserialize( byte [] data, Class<T> cls) {
         try {
             T message = (T) objenesis.newInstance(cls);
             Schema<T> schema = getSchema(cls);
             ProtostuffIOUtil.mergeFrom(data, message, schema);
             return message;
         } catch (Exception e) {
             throw new IllegalStateException(e.getMessage(), e);
         }
     }
}

以上了使用 Objenesis 來實例化對象,它是比 Java 反射更增強大。

注意:如須要替換其它序列化框架,只需修改SerializationUtil便可。固然,更好的實現方式是提供配置項來決定使用哪一種序列化方式。

使用RpcHandler中處理 RPC 請求,只需擴展 Netty 的SimpleChannelInboundHandler抽象類便可,代碼以下:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(RpcHandler. class );
 
     private final Map<String, Object> handlerMap;
 
     public RpcHandler(Map<String, Object> handlerMap) {
         this .handlerMap = handlerMap;
     }
 
     @Override
     public void channelRead0( final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
         RpcResponse response = new RpcResponse();
         response.setRequestId(request.getRequestId());
         try {
             Object result = handle(request);
             response.setResult(result);
         } catch (Throwable t) {
             response.setError(t);
         }
         ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
     }
 
     private Object handle(RpcRequest request) throws Throwable {
         String className = request.getClassName();
         Object serviceBean = handlerMap.get(className);
 
         Class<?> serviceClass = serviceBean.getClass();
         String methodName = request.getMethodName();
         Class<?>[] parameterTypes = request.getParameterTypes();
         Object[] parameters = request.getParameters();
 
         /*Method method = serviceClass.getMethod(methodName, parameterTypes);
         method.setAccessible(true);
         return method.invoke(serviceBean, parameters);*/
 
         FastClass serviceFastClass = FastClass.create(serviceClass);
         FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
         return serviceFastMethod.invoke(serviceBean, parameters);
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
         LOGGER.error( "server caught exception" , cause);
         ctx.close();
     }
}

爲了不使用 Java 反射帶來的性能問題,咱們可使用 CGLib 提供的反射 API,如上面用到的FastClassFastMethod

第七步:配置客戶端

一樣使用 Spring 配置文件來配置 RPC 客戶端,spring.xml代碼以下:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
<beans ...>
     <context:property-placeholder location= "classpath:config.properties" />
 
     <!-- 配置服務發現組件 -->
     <bean id= "serviceDiscovery" class = "com.xxx.rpc.registry.ServiceDiscovery" >
         <constructor-arg name= "registryAddress" value= "${registry.address}" />
     </bean>
 
     <!-- 配置 RPC 代理 -->
     <bean id= "rpcProxy" class = "com.xxx.rpc.client.RpcProxy" >
         <constructor-arg name= "serviceDiscovery" ref= "serviceDiscovery" />
     </bean>
</beans>

其中config.properties提供了具體的配置:

?

1
2
# ZooKeeper 服務器
registry.address= 127.0 . 0.1 : 2181

第八步:實現服務發現

一樣使用 ZooKeeper 實現服務發現功能,見以下代碼:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class ServiceDiscovery {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery. class );
 
     private CountDownLatch latch = new CountDownLatch( 1 );
 
     private volatile List<String> dataList = new ArrayList<>();
 
     private String registryAddress;
 
     public ServiceDiscovery(String registryAddress) {
         this .registryAddress = registryAddress;
 
         ZooKeeper zk = connectServer();
         if (zk != null ) {
             watchNode(zk);
         }
     }
 
     public String discover() {
         String data = null ;
         int size = dataList.size();
         if (size > 0 ) {
             if (size == 1 ) {
                 data = dataList.get( 0 );
                 LOGGER.debug( "using only data: {}" , data);
             } else {
                 data = dataList.get(ThreadLocalRandom.current().nextInt(size));
                 LOGGER.debug( "using random data: {}" , data);
             }
         }
         return data;
     }
 
     private ZooKeeper connectServer() {
         ZooKeeper zk = null ;
         try {
             zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                 @Override
                 public void process(WatchedEvent event) {
                     if (event.getState() == Event.KeeperState.SyncConnected) {
                         latch.countDown();
                     }
                 }
             });
             latch.await();
         } catch (IOException | InterruptedException e) {
             LOGGER.error( "" , e);
         }
         return zk;
     }
 
     private void watchNode( final ZooKeeper zk) {
         try {
             List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
                 @Override
                 public void process(WatchedEvent event) {
                     if (event.getType() == Event.EventType.NodeChildrenChanged) {
                         watchNode(zk);
                     }
                 }
             });
             List<String> dataList = new ArrayList<>();
             for (String node : nodeList) {
                 byte [] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false , null );
                 dataList.add( new String(bytes));
             }
             LOGGER.debug( "node data: {}" , dataList);
             this .dataList = dataList;
         } catch (KeeperException | InterruptedException e) {
             LOGGER.error( "" , e);
         }
     }
}

第九步:實現 RPC 代理

這裏使用 Java 提供的動態代理技術實現 RPC 代理(固然也可使用 CGLib 來實現),具體代碼以下:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class RpcProxy {
 
     private String serverAddress;
     private ServiceDiscovery serviceDiscovery;
 
     public RpcProxy(String serverAddress) {
         this .serverAddress = serverAddress;
     }
 
     public RpcProxy(ServiceDiscovery serviceDiscovery) {
         this .serviceDiscovery = serviceDiscovery;
     }
 
     @SuppressWarnings ( "unchecked" )
     public <T> T create(Class<?> interfaceClass) {
         return (T) Proxy.newProxyInstance(
             interfaceClass.getClassLoader(),
             new Class<?>[]{interfaceClass},
             new InvocationHandler() {
                 @Override
                 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                     RpcRequest request = new RpcRequest(); // 建立並初始化 RPC 請求
                     request.setRequestId(UUID.randomUUID().toString());
                     request.setClassName(method.getDeclaringClass().getName());
                     request.setMethodName(method.getName());
                     request.setParameterTypes(method.getParameterTypes());
                     request.setParameters(args);
 
                     if (serviceDiscovery != null ) {
                         serverAddress = serviceDiscovery.discover(); // 發現服務
                     }
 
                     String[] array = serverAddress.split( ":" );
                     String host = array[ 0 ];
                     int port = Integer.parseInt(array[ 1 ]);
 
                     RpcClient client = new RpcClient(host, port); // 初始化 RPC 客戶端
                     RpcResponse response = client.send(request); // 經過 RPC 客戶端發送 RPC 請求並獲取 RPC 響應
 
                     if (response.isError()) {
                         throw response.getError();
                     } else {
                         return response.getResult();
                     }
                 }
             }
         );
     }
}

使用RpcClient類實現 RPC 客戶端,只需擴展 Netty 提供的SimpleChannelInboundHandler抽象類便可,代碼以下:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient. class );
 
     private String host;
     private int port;
 
     private RpcResponse response;
 
     private final Object obj = new Object();
 
     public RpcClient(String host, int port) {
         this .host = host;
         this .port = port;
     }
 
     @Override
     public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
         this .response = response;
 
         synchronized (obj) {
             obj.notifyAll(); // 收到響應,喚醒線程
         }
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         LOGGER.error( "client caught exception" , cause);
         ctx.close();
     }
 
     public RpcResponse send(RpcRequest request) throws Exception {
         EventLoopGroup group = new NioEventLoopGroup();
         try {
             Bootstrap bootstrap = new Bootstrap();
             bootstrap.group(group).channel(NioSocketChannel. class )
                 .handler( new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel channel) throws Exception {
                         channel.pipeline()
                             .addLast( new RpcEncoder(RpcRequest. class )) // 將 RPC 請求進行編碼(爲了發送請求)
                             .addLast( new RpcDecoder(RpcResponse. class )) // 將 RPC 響應進行解碼(爲了處理響應)
                             .addLast(RpcClient. this ); // 使用 RpcClient 發送 RPC 請求
                     }
                 })
                 .option(ChannelOption.SO_KEEPALIVE, true );
 
             ChannelFuture future = bootstrap.connect(host, port).sync();
             future.channel().writeAndFlush(request).sync();
 
             synchronized (obj) {
                 obj.wait(); // 未收到響應,使線程等待
             }
 
             if (response != null ) {
                 future.channel().closeFuture().sync();
             }
             return response;
         } finally {
             group.shutdownGracefully();
         }
     }
}

第十步:發送 RPC 請求

使用 JUnit 結合 Spring 編寫一個單元測試,代碼以下:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RunWith (SpringJUnit4Cla***unner. class )
@ContextConfiguration (locations = "classpath:spring.xml" )
public class HelloServiceTest {
 
     @Autowired
     private RpcProxy rpcProxy;
 
     @Test
     public void helloTest() {
         HelloService helloService = rpcProxy.create(HelloService. class );
         String result = helloService.hello( "World" );
         Assert.assertEquals( "Hello! World" , result);
     }
}

運行以上單元測試,若是不出意外的話,您應該會看到綠條。

總結

本文經過 Spring + Netty + Protostuff + ZooKeeper 實現了一個輕量級 RPC 框架,使用 Spring 提供依賴注入與參數配置,使用 Netty 實現 NIO 方式的數據傳輸,使用 Protostuff 實現對象序列化,使用 ZooKeeper 實現服務註冊與發現。使用該框架,可將服務部署到分佈式環境中的任意節點上,客戶端經過遠程接口來調用服務端的具體實現,讓服務端與客戶端的開發徹底分 離,爲實現大規模分佈式應用提供了基礎支持。

附錄:Maven 依賴

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
<!-- JUnit -->
< dependency >
     < groupId >junit</ groupId >
     < artifactId >junit</ artifactId >
     < version >4.11</ version >
     < scope >test</ scope >
</ dependency >
 
<!-- SLF4J -->
< dependency >
     < groupId >org.slf4j</ groupId >
     < artifactId >slf4j-log4j12</ artifactId >
     < version >1.7.7</ version >
</ dependency >
 
<!-- Spring -->
< dependency >
     < groupId >org.springframework</ groupId >
     < artifactId >spring-context</ artifactId >
     < version >3.2.12.RELEASE</ version >
</ dependency >
< dependency >
     < groupId >org.springframework</ groupId >
     < artifactId >spring-test</ artifactId >
     < version >3.2.12.RELEASE</ version >
     < scope >test</ scope >
</ dependency >
 
<!-- Netty -->
< dependency >
     < groupId >io.netty</ groupId >
     < artifactId >netty-all</ artifactId >
     < version >4.0.24.Final</ version >
</ dependency >
 
<!-- Protostuff -->
< dependency >
     < groupId >com.dyuproject.protostuff</ groupId >
     < artifactId >protostuff-core</ artifactId >
     < version >1.0.8</ version >
</ dependency >
< dependency >
     < groupId >com.dyuproject.protostuff</ groupId >
     < artifactId >protostuff-runtime</ artifactId >
     < version >1.0.8</ version >
</ dependency >
 
<!-- ZooKeeper -->
< dependency >
     < groupId >org.apache.zookeeper</ groupId >
     < artifactId >zookeeper</ artifactId >
     < version >3.4.6</ version >
</ dependency >
 
<!-- Apache Commons Collections -->
< dependency >
     < groupId >org.apache.commons</ groupId >
     < artifactId >commons-collections4</ artifactId >
     < version >4.0</ version >
</ dependency >
 
<!-- Objenesis -->
< dependency >
     < groupId >org.objenesis</ groupId >
     < artifactId >objenesis</ artifactId >
     < version >2.1</ version >
</ dependency >
 
<!-- CGLib -->
< dependency >
     < groupId >cglib</ groupId >
     < artifactId >cglib</ artifactId >
     < version >3.1</ version >
</ dependency >

源碼地址:http://www.oschina.net/code/snippet_223750_45050

           

    分享到:     23

  聲明:OSCHINA 博客文章版權屬於做者,受法律保護。未經做者贊成不得轉載。    

 

   

   

評論30

  •        
        129590_50.jpg?t=1414036625000        

        1樓:石頭哥哥 發表於 2014-12-29 10:09        回覆此評論    

       

    漂亮  頂一個13

       
  •        
        269836_50.jpg?t=1405772066000        

        2樓:-Bin- Android 發表於 2014-12-29 10:12        回覆此評論    

       

    +1,頗有幫助

       
  •        
        2007826_50.JPG?t=1410031578000        

        3樓:CrazyHarry 發表於 2014-12-29 10:28        回覆此評論    

       

    大讚

       
  •        
        251904_50.jpg?t=1399018499000        

        4樓:liubingsmile 發表於 2014-12-29 13:39        回覆此評論    

       

    漂亮,贊一個!簡單經典!

       
  •        
        1456257_50.jpg?t=1418367460000        

        5樓:jkguowen 發表於 2014-12-29 14:27        回覆此評論    

       

    很好很強大,進來學習、膜拜一下!!1313

       
  •        
        781569_50.png?t=1416116056000        

        6樓:lylvgg 發表於 2014-12-29 14:28        回覆此評論    

       

    mark!

       
  •        
        347227_50.jpg?t=1398170907000        

        7樓:wangyunzhong 發表於 2014-12-29 18:20        回覆此評論    

       

    dubbo

       
  •        
        134984_50.jpg        

        8樓:dargoner 發表於 2014-12-29 19:45        回覆此評論    

       

    太強了,dubbo細節版

       
  •        
        1011594_50.jpg?t=1374591030000        

        9樓:anduo iPhone 發表於 2014-12-29 20:03        回覆此評論    

       

    漂亮

       
  •        
        1866528_50.jpg?t=1415513178000        

        10樓:fate-testarossa 發表於 2014-12-29 20:15        回覆此評論    

       

    mark

       

     

       

        黃勇        

       

       

插入: 表情 開源軟件

關閉相關文章閱讀

開源中國(OSChina.NET) | 關於咱們 | 廣告聯繫 | @新浪微博 | 開源中國手機版 | 粵ICP備12009483號-3 開源中國手機客戶端: Android iPhone WP7
開源中國社區(OSChina.net)是工信部 開源軟件推動聯盟 指定的官方社區
相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息