Spring+Netty+Protostuff+ZooKeeper實現輕量級RPC服務

  • RPC簡介

  • RPC,即 Remote Procedure Call(遠程過程調用),說得通俗一點就是:調用遠程計算機上的服務,就像調用本地服務同樣。java

    RPC 可基於 HTTP 或 TCP 協議,Web Service 就是基於 HTTP 協議的 RPC,它具備良好的跨平臺性,但其性能卻不如基於 TCP 協議的 RPC。會兩方面會直接影響 RPC 的性能,一是傳輸方式,二是序列化。node

    衆所周知,TCP 是傳輸層協議,HTTP 是應用層協議,而傳輸層較應用層更加底層,在數據傳輸方面,越底層越快,所以,在通常狀況下,TCP 必定比 HTTP 快。就序列化而言,Java 提供了默認的序列化方式,但在高併發的狀況下,這種方式將會帶來一些性能上的瓶頸,因而市面上出現了一系列優秀的序列化框架,好比:Protobuf、Kryo、Hessian、Jackson 等,它們能夠取代 Java 默認的序列化,從而提供更高效的性能。git

    爲了支持高併發,傳統的阻塞式 IO 顯然不太合適,所以咱們須要異步的 IO,即 NIO。Java 提供了 NIO 的解決方案,Java 7 也提供了更優秀的 NIO.2 支持,用 Java 實現 NIO 並非高不可攀的事情,只是須要咱們熟悉 NIO 的技術細節。web

    咱們須要將服務部署在分佈式環境下的不一樣節點上,經過服務註冊的方式,讓客戶端來自動發現當前可用的服務,並調用這些服務。這須要一種服務註冊表(Service Registry)的組件,讓它來註冊分佈式環境下全部的服務地址(包括:主機名與端口號)。spring

    應用、服務、服務註冊表之間的關係見下圖:apache

    系統架構

    每臺 Server 上可發佈多個 Service,這些 Service 共用一個 host 與 port,在分佈式環境下會提供 Server 共同對外提供 Service。此外,爲防止 Service Registry 出現單點故障,所以須要將其搭建爲集羣環境。編程

    本文將爲您揭曉開發輕量級分佈式 RPC 框架的具體過程,該框架基於 TCP 協議,提供了 NIO 特性,提供高效的序列化方式,同時也具有服務註冊與發現的能力。json

    根據以上技術需求,咱們可以使用以下技術選型:bootstrap

  • Spring:它是最強大的依賴注入框架,也是業界的權威標準。
  • Netty:它使 NIO 編程更加容易,屏蔽了 Java 底層的 NIO 細節。
  • Protostuff:它基於 Protobuf 序列化框架,面向 POJO,無需編寫 .proto 文件。
  • ZooKeeper:提供服務註冊與發現功能,開發分佈式系統的必備選擇,同時它也具有天生的集羣能力。
  •  
  • 服務端設計配置

  • 服務接口工程listen-rpc-service

  • 主要定義服務接口類和服務涉及到的實體類
  • listen-rpc-service工程目錄結構圖
  • listen-rpc-service工程pom.xml
  • <?xml version="1.0"?>
    <project
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
    	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    	<modelVersion>4.0.0</modelVersion>
    	<parent>
    		<groupId>listen</groupId>
    		<artifactId>listen-parent</artifactId>
    		<version>0.0.1-SNAPSHOT</version>
    	</parent>
    	<artifactId>listen-rpc-service</artifactId>
    	<packaging>jar</packaging>
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    	</properties>
    	<dependencies>
    		<dependency>
    			<groupId>listen</groupId>
    			<artifactId>listen-rpc-common</artifactId>
    			<version>0.0.1-SNAPSHOT</version>
    		</dependency>
    		<dependency>
    			<groupId>com.alibaba</groupId>
    			<artifactId>fastjson</artifactId>
    			<version>1.1.41</version>
    		</dependency>
    	</dependencies>
    </project>

     

  • listen-rpc-service工程服務接口IHelloService
  • package com.listen.rpc.service;
    
    import java.util.List;
    
    import com.listen.rpc.entity.User;
    
    public interface IHelloService {
    
    	public String hello(String name);
    	
    	public User getUser(String name);
    	
    	public List<User> getUsers(int size);
    	
    	public User updateUser(User user);
    	
    }
  • listen-rpc-service工程User類
  • package com.listen.rpc.entity;
    
    import java.util.Date;
    
    import com.alibaba.fastjson.JSON;
    
    public class User {
    
    	private String name;
    	private Date birthday;
    	private boolean sex;
    	public User(String name, Date birthday, boolean sex){
    		this.name = name;
    		this.birthday = birthday;
    		this.sex = sex;
    	}
    	public String getName() {
    		return name;
    	}
    	public void setName(String name) {
    		this.name = name;
    	}
    	public Date getBirthday() {
    		return birthday;
    	}
    	public void setBirthday(Date birthday) {
    		this.birthday = birthday;
    	}
    	public boolean isSex() {
    		return sex;
    	}
    	public void setSex(boolean sex) {
    		this.sex = sex;
    	}
    	public String toString(){
    		return JSON.toJSONString(this);
    	}
    }
  • 工程依賴引用的listen-rpc-common工程

  • 主要放置服務端和客戶端共用的組件,並且這些組件能夠被其餘服務包共用,因此要抽取出來
  • listen-rpc-common工程目錄結構圖
  • listen-rpc-common工程pom.xml文件
  • <?xml version="1.0"?>
    <project
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
    	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    	<modelVersion>4.0.0</modelVersion>
    	<parent>
    		<groupId>listen</groupId>
    		<artifactId>listen-parent</artifactId>
    		<version>0.0.1-SNAPSHOT</version>
    	</parent>
    	<artifactId>listen-rpc-common</artifactId>
    	<packaging>jar</packaging>
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    	</properties>
    	<dependencies>
    		<!-- Netty -->
    		<dependency>
    			<groupId>io.netty</groupId>
    			<artifactId>netty-all</artifactId>
    			<version>4.0.24.Final</version>
    		</dependency>
    		<!-- Protostuff -->
    		<dependency>
    			<groupId>com.dyuproject.protostuff</groupId>
    			<artifactId>protostuff-core</artifactId>
    			<version>1.0.8</version>
    		</dependency>
    		<dependency>
    			<groupId>com.dyuproject.protostuff</groupId>
    			<artifactId>protostuff-runtime</artifactId>
    			<version>1.0.8</version>
    		</dependency>
    		<!-- Objenesis -->
    		<dependency>
    			<groupId>org.objenesis</groupId>
    			<artifactId>objenesis</artifactId>
    			<version>2.1</version>
    		</dependency>
    		<!-- Spring -->
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-context</artifactId>
    			<version>3.2.12.RELEASE</version>
    		</dependency>
    	</dependencies>
    </project>

     

  • listen-rpc-common工程RpcService註解api

  • package com.listen.rpc.annotation;
    
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    import org.springframework.stereotype.Component;
    
    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Component // 代表可被 Spring 掃描
    public @interface RpcService {
    
    	Class<?> value();
    	
    }

    listen-rpc-common工程Constant接口

  • package com.listen.rpc.common;
    
    public interface Constant {
    
    	int ZK_SESSION_TIMEOUT = 5000;
    
    	//在建立數據節點前,先用zkCli.sh客戶端鏈接上服務端,查看目前存在的數據節點,
    	//把下面的/zookeeper/quota改成你本身的,/zookeeper/quota是我本身Zookeeper的節點
        String ZK_REGISTRY_PATH = "/zookeeper/quota";
        String ZK_DATA_PATH = ZK_REGISTRY_PATH + "/data";
    }

    listen-rpc-common工程RpcDecoder解碼類

  • package com.listen.rpc.common;
    
    import java.util.List;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    
    public class RpcDecoder extends ByteToMessageDecoder{
    
    	private Class<?> genericClass;
    
        public RpcDecoder(Class<?> genericClass) {
            this.genericClass = genericClass;
        }
    
        @Override
        public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            if (in.readableBytes() < 4) {
                return;
            }
            in.markReaderIndex();
            int dataLength = in.readInt();
            if (dataLength < 0) {
                ctx.close();
            }
            if (in.readableBytes() < dataLength) {
                in.resetReaderIndex();
            }
            byte[] data = new byte[dataLength];
            in.readBytes(data);
    
            Object obj = SerializationUtil.deserialize(data, genericClass);
            out.add(obj);
        }
    }

    listen-rpc-common工程RpcEncoder解碼類

  • package com.listen.rpc.common;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    public class RpcEncoder extends MessageToByteEncoder {
    
    	private Class<?> genericClass;
    
        public RpcEncoder(Class<?> genericClass) {
            this.genericClass = genericClass;
        }
    
        @Override
        public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
            if (genericClass.isInstance(in)) {
                byte[] data = SerializationUtil.serialize(in);
                out.writeInt(data.length);
                out.writeBytes(data);
            }
        }
    }

    listen-rpc-common工程RpcRequest請求類

  • package com.listen.rpc.common;
    
    public class RpcRequest {
    
    	private String requestId;
        private String className;
        private String methodName;
        private Class<?>[] parameterTypes;
        private Object[] parameters;
    	public String getRequestId() {
    		return requestId;
    	}
    	public void setRequestId(String requestId) {
    		this.requestId = requestId;
    	}
    	public String getClassName() {
    		return className;
    	}
    	public void setClassName(String className) {
    		this.className = className;
    	}
    	public String getMethodName() {
    		return methodName;
    	}
    	public void setMethodName(String methodName) {
    		this.methodName = methodName;
    	}
    	public Class<?>[] getParameterTypes() {
    		return parameterTypes;
    	}
    	public void setParameterTypes(Class<?>[] parameterTypes) {
    		this.parameterTypes = parameterTypes;
    	}
    	public Object[] getParameters() {
    		return parameters;
    	}
    	public void setParameters(Object[] parameters) {
    		this.parameters = parameters;
    	}
        
    }

    listen-rpc-common工程RpcResponse響應類

  • package com.listen.rpc.common;
    
    public class RpcResponse {
    
    	private String requestId;
        private Throwable error;
        private Object result;
    	public String getRequestId() {
    		return requestId;
    	}
    	public void setRequestId(String requestId) {
    		this.requestId = requestId;
    	}
    	public Throwable getError() {
    		return error;
    	}
    	public void setError(Throwable error) {
    		this.error = error;
    	}
    	public Object getResult() {
    		return result;
    	}
    	public void setResult(Object result) {
    		this.result = result;
    	}
        
    }

    listen-rpc-common工程SerializationUtil序列化反序列化類

  • 服務接口實現工程listen-rpc-service-impl

  • 主要放置listen-rpc-service接口的實現類,這個工程會依賴公共RPC服務工程listen-rpc-server,listen-rpc-server主要是鏈接Zookeeper實現服務的註冊與發現
  • listen-rpc-server工程的目錄結構圖

  • listen-rpc-server工程pom.xml
  • <?xml version="1.0"?>
    <project
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
    	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    	<modelVersion>4.0.0</modelVersion>
    	<parent>
    		<groupId>listen</groupId>
    		<artifactId>listen-parent</artifactId>
    		<version>0.0.1-SNAPSHOT</version>
    	</parent>
    	<artifactId>listen-rpc-server</artifactId>
    	<packaging>jar</packaging>
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    	</properties>
    	<dependencies>
    		<dependency>
    			<groupId>listen</groupId>
    			<artifactId>listen-rpc-common</artifactId>
    			<version>0.0.1-SNAPSHOT</version>
    		</dependency>
    
    		<!-- ZooKeeper -->
    		<dependency>
    			<groupId>org.apache.zookeeper</groupId>
    			<artifactId>zookeeper</artifactId>
    			<version>3.4.6</version>
    		</dependency>
    
    		<!-- Apache Commons Collections -->
    		<dependency>
    			<groupId>org.apache.commons</groupId>
    			<artifactId>commons-collections4</artifactId>
    			<version>4.0</version>
    		</dependency>
    	</dependencies>
    	
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-source-plugin</artifactId>
    				<version>2.2.1</version>
    				<executions>
    					<execution>
    						<id>attach-sources</id>
    						<goals>
    							<goal>jar</goal>
    						</goals>
    					</execution>
    				</executions>
    			</plugin>
    			<plugin>
    				<groupId>org.apache.tomcat.maven</groupId>
    				<artifactId>tomcat7-maven-plugin</artifactId>
    				<version>2.2</version>
    				<configuration>
    					<port>8000</port>
    					<path>/</path>
    				</configuration>
    			</plugin>
    		</plugins>
    	</build>
    </project>

     

  • listen-rpc-server工程RpcServer服務啓動類

  • package com.listen.rpc.server;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.commons.collections4.MapUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    
    import com.listen.rpc.annotation.RpcService;
    import com.listen.rpc.common.RpcDecoder;
    import com.listen.rpc.common.RpcEncoder;
    import com.listen.rpc.common.RpcRequest;
    import com.listen.rpc.common.RpcResponse;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class RpcServer implements ApplicationContextAware, InitializingBean{
    
    	private static final Logger LOGGER = LoggerFactory.getLogger(RpcServer.class);
    
        private String serverAddress;
        private ServiceRegistry serviceRegistry;
    
        private Map<String, Object> handlerMap = new HashMap<String, Object>(); // 存放接口名與服務對象之間的映射關係
    
        public RpcServer(String serverAddress) {
            this.serverAddress = serverAddress;
        }
    
        public RpcServer(String serverAddress, ServiceRegistry serviceRegistry) {
            this.serverAddress = serverAddress;
            this.serviceRegistry = serviceRegistry;
        }
    
        public void setApplicationContext(ApplicationContext ctx) throws BeansException {
            Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class); // 獲取全部帶有 RpcService 註解的 Spring Bean
            if (MapUtils.isNotEmpty(serviceBeanMap)) {
                for (Object serviceBean : serviceBeanMap.values()) {
                    String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();
                    handlerMap.put(interfaceName, serviceBean);
                }
            }
        }
    
        public void afterPropertiesSet() throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel channel) throws Exception {
                            ((Channel) channel).pipeline()
                                .addLast(new RpcDecoder(RpcRequest.class)) // 將 RPC 請求進行解碼(爲了處理請求)
                                .addLast(new RpcEncoder(RpcResponse.class)) // 將 RPC 響應進行編碼(爲了返回響應)
                                .addLast(new RpcHandler(handlerMap)); // 處理 RPC 請求
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
    
                String[] array = serverAddress.split(":");
                String host = array[0];
                int port = Integer.parseInt(array[1]);
    
                ChannelFuture future = bootstrap.bind(host, port).sync();
                LOGGER.debug("server started on port {}", port);
    
                if (serviceRegistry != null) {
                    serviceRegistry.register(serverAddress); // 註冊服務地址
                }
    
                future.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    }

    listen-rpc-server工程ServiceRegistry服務註冊類

  • package com.listen.rpc.server;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.ZooKeeper;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.listen.rpc.common.Constant;
    
    public class ServiceRegistry {
    
    	private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);
    
        private CountDownLatch latch = new CountDownLatch(1);
    
        private String registryAddress;
    
        public ServiceRegistry(String registryAddress) {
            this.registryAddress = registryAddress;
        }
    
        public void register(String data) {
            if (data != null) {
                ZooKeeper zk = connectServer();
                if (zk != null) {
                    createNode(zk, data);
                }
            }
        }
    
        private ZooKeeper connectServer() {
            ZooKeeper zk = null;
            try {
                zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                    public void process(WatchedEvent event) {
                        if (event.getState() == Event.KeeperState.SyncConnected) {
                            latch.countDown();
                        }
                    }
                });
                latch.await();
            } catch (IOException e) {
                LOGGER.error("", e);
            } catch (InterruptedException e){
            	LOGGER.error("", e);
            }
            return zk;
        }
    
        private void createNode(ZooKeeper zk, String data) {
            try {
                byte[] bytes = data.getBytes();
                String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                LOGGER.debug("create zookeeper node ({} => {})", path, data);
            } catch (KeeperException e) {
            	e.printStackTrace();
                LOGGER.error("", e);
            } catch (InterruptedException e){
            	LOGGER.error("", e);
            }
        }
    }

    listen-rpc-server工程RpcHandler請求統一處理類

  • listen-rpc-service-impl工程目錄結構圖

  • listen-rpc-service-impl服務實現工程pom.xml
  • <?xml version="1.0"?>
    <project
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
    	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    	<modelVersion>4.0.0</modelVersion>
    	<parent>
    		<groupId>listen</groupId>
    		<artifactId>listen-parent</artifactId>
    		<version>0.0.1-SNAPSHOT</version>
    	</parent>
    	<artifactId>listen-rpc-service-impl</artifactId>
    	<packaging>war</packaging>
    	<dependencies>
    		<dependency>
    			<groupId>listen</groupId>
    			<artifactId>listen-rpc-server</artifactId>
    			<version>0.0.1-SNAPSHOT</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-webmvc</artifactId>
    			<version>3.2.12.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>javax.servlet</groupId>
    			<artifactId>servlet-api</artifactId>
    			<version>2.5</version>
    			<scope>provided</scope>
    		</dependency>
    		<dependency>
    			<groupId>listen</groupId>
    			<artifactId>listen-rpc-service</artifactId>
    			<version>0.0.1-SNAPSHOT</version>
    		</dependency>
    	</dependencies>
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-source-plugin</artifactId>
    				<version>2.2.1</version>
    				<executions>
    					<execution>
    						<id>attach-sources</id>
    						<goals>
    							<goal>jar</goal>
    						</goals>
    					</execution>
    				</executions>
    			</plugin>
    			<plugin>
    				<groupId>org.apache.tomcat.maven</groupId>
    				<artifactId>tomcat7-maven-plugin</artifactId>
    				<version>2.2</version>
    				<configuration>
    					<port>8000</port>
    					<path>/</path>
    				</configuration>
    			</plugin>
    		</plugins>
    	</build>
    </project>

     

  • listen-rpc-service-impl工程HelloServiceImpl服務實現類

  • package com.listen.rpc.service.impl;
    
    import java.util.ArrayList;
    import java.util.Calendar;
    import java.util.Date;
    import java.util.List;
    
    import com.listen.rpc.annotation.RpcService;
    import com.listen.rpc.entity.User;
    import com.listen.rpc.service.IHelloService;
    
    @RpcService(IHelloService.class)// 指定遠程接口  使用 RpcService註解定義在服務接口的實現類上,須要對該實現類指定遠程接口,由於實現類可能會實現多個接口,必定要告訴框架哪一個纔是遠程接口。
    public class HelloServiceImpl implements IHelloService {
    
    	public String hello(String name) {
    		String result = "hello" + name;
    		System.out.println(result);
    		return result;
    	}
    
    	@Override
    	public User getUser(String name) {
    		User user = new User(name, new Date(), true);
    		return user;
    	}
    
    	@Override
    	public List<User> getUsers(int size) {
    		List<User> list = new ArrayList<User>();
    		User user = null;
    		String name = "foo";
    		Date birthday = new Date();
    		Calendar cal = Calendar.getInstance();
    		cal.setTime(birthday);
    		for(int i = 0; i < size; i++){
    			cal.add(Calendar.DAY_OF_MONTH, 1);
    			user = new User(name, cal.getTime(), i%2==0 ? true : false);
    			list.add(user);
    		}
    		return list;
    	}
    
    	@Override
    	public User updateUser(User user) {
    		user.setName(user.getName() + "-update");
    		return user;
    	}
    
    }

    listen-rpc-service-impl工程配置文件config.properties

  • # ZooKeeper 服務器
    registry.address=127.0.0.1:2181
    
    # RPC 服務器
    server.address=127.0.0.1:8000
    #以上配置代表:鏈接本地的 ZooKeeper 服務器,並在 8000 端口上發佈 RPC 服務。

    listen-rpc-service-impl工程spring.xml配置文件

  • <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
        xmlns:tx="http://www.springframework.org/schema/tx" xmlns:p="http://www.springframework.org/schema/p" xmlns:util="http://www.springframework.org/schema/util" xmlns:jdbc="http://www.springframework.org/schema/jdbc"
        xmlns:cache="http://www.springframework.org/schema/cache"
        xsi:schemaLocation="
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx.xsd
        http://www.springframework.org/schema/jdbc
        http://www.springframework.org/schema/jdbc/spring-jdbc-3.1.xsd
        http://www.springframework.org/schema/cache
        http://www.springframework.org/schema/cache/spring-cache-3.1.xsd
        http://www.springframework.org/schema/aop
        http://www.springframework.org/schema/aop/spring-aop.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd">
    
        <!-- 自動掃描web包 ,將帶有註解的類 歸入spring容器管理 -->
        <context:component-scan base-package="com.listen.rpc"></context:component-scan>
        <context:property-placeholder location="classpath:config.properties"/>
        
        <!-- 配置服務註冊組件 -->
        <bean id="serviceRegistry" class="com.listen.rpc.server.ServiceRegistry">
            <constructor-arg name="registryAddress" value="${registry.address}"/>
        </bean>
    
        <!-- 配置 RPC 服務器 -->
        <bean id="rpcServer" class="com.listen.rpc.server.RpcServer">
            <constructor-arg name="serverAddress" value="${server.address}"/>
            <constructor-arg name="serviceRegistry" ref="serviceRegistry"/>
        </bean>
    
    </beans>

     

至此,服務端代碼編寫完畢,接下來開始編寫客戶端代碼,客戶端我打算用springMVC實現服務的測試

  • 客戶端工程listen-rpc-client

  • 主要是springMVC實現服務端服務接口的調用測試
  • listen-rpc-client工程目錄結構圖
  •  
  • listen-rpc-client工程的pom.xml文件
  • <?xml version="1.0"?>
    <project
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
    	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    	<modelVersion>4.0.0</modelVersion>
    	<parent>
    		<groupId>listen</groupId>
    		<artifactId>listen-parent</artifactId>
    		<version>0.0.1-SNAPSHOT</version>
    	</parent>
    	<artifactId>listen-rpc-client</artifactId>
    	<packaging>war</packaging>
    	<dependencies>
    		<dependency>
    			<groupId>listen</groupId>
    			<artifactId>listen-rpc-common</artifactId>
    			<version>0.0.1-SNAPSHOT</version>
    		</dependency>
    		<dependency>
    			<groupId>listen</groupId>
    			<artifactId>listen-rpc-service</artifactId>
    			<version>0.0.1-SNAPSHOT</version>
    		</dependency>
    		<dependency>
    			<groupId>javax.servlet</groupId>
    			<artifactId>javax.servlet-api</artifactId>
    			<version>3.1.0</version>
    			<scope>provided</scope>
    		</dependency>
    		<!-- Spring -->
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-context</artifactId>
    			<version>3.2.12.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-webmvc</artifactId>
    			<version>3.2.12.RELEASE</version>
    		</dependency>
    
    		<!-- ZooKeeper -->
    		<dependency>
    			<groupId>org.apache.zookeeper</groupId>
    			<artifactId>zookeeper</artifactId>
    			<version>3.4.6</version>
    		</dependency>
    
    		<!-- CGLib -->
    		<dependency>
    			<groupId>cglib</groupId>
    			<artifactId>cglib</artifactId>
    			<version>3.1</version>
    		</dependency>
    	</dependencies>
    	<build>
    		<finalName>listen-rpc-client</finalName>
    		<plugins>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-source-plugin</artifactId>
    				<version>2.2.1</version>
    				<executions>
    					<execution>
    						<id>attach-sources</id>
    						<goals>
    							<goal>jar</goal>
    						</goals>
    					</execution>
    				</executions>
    			</plugin>
    			<plugin>
    				<groupId>org.apache.tomcat.maven</groupId>
    				<artifactId>tomcat7-maven-plugin</artifactId>
    				<version>2.2</version>
    				<configuration>
    					<port>8080</port>
    					<path>/</path>
    				</configuration>
    			</plugin>
    		</plugins>
    	</build>
    </project>

     

  • listen-rpc-client工程HelloController類
  • package com.listen.rpc.controller;
    
    import java.util.Date;
    import java.util.List;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    
    import com.listen.rpc.client.RpcProxy;
    import com.listen.rpc.entity.User;
    import com.listen.rpc.service.IHelloService;
    
    @Controller
    public class HelloController {
    
    	@Autowired
    	private RpcProxy rpcProxy;
    	@RequestMapping("/hello")
    	public void hello(String name){
    		IHelloService service = rpcProxy.create(IHelloService.class);
    		String result = service.hello(name);
    		System.out.println(result);
    	}
    	
    	@RequestMapping("/getUser")
    	public void getUser(String name){
    		IHelloService service = rpcProxy.create(IHelloService.class);
    		System.out.println(service.getUser(name).toString());
    	}
    	
    	@RequestMapping("/getUsers")
    	public void getUsers(int size){
    		IHelloService service = rpcProxy.create(IHelloService.class);
    		List<User> list = service.getUsers(size);
    		for(User user : list){
    			System.out.println(user.toString());
    		}
    	}
    	
    	@RequestMapping("/updateUser")
    	public void updateUser(String name){
    		User user = new User(name, new Date(), true);
    		IHelloService service = rpcProxy.create(IHelloService.class);
    		user = service.updateUser(user);
    		System.out.println(user.toString());
    	}
    }

    listen-rpc-client工程代理類RpcProxy

  • package com.listen.rpc.client;
    
    import java.lang.reflect.Method;
    import java.util.UUID;
    
    import com.listen.rpc.common.RpcRequest;
    import com.listen.rpc.common.RpcResponse;
    
    import net.sf.cglib.proxy.InvocationHandler;
    import net.sf.cglib.proxy.Proxy;
    
    public class RpcProxy {
    
    	private String serverAddress;
        private ServiceDiscovery serviceDiscovery;
    
        public RpcProxy(String serverAddress) {
            this.serverAddress = serverAddress;
        }
    
        public RpcProxy(ServiceDiscovery serviceDiscovery) {
            this.serviceDiscovery = serviceDiscovery;
        }
    
        @SuppressWarnings("unchecked")
        public <T> T create(Class<?> interfaceClass) {
            return (T) Proxy.newProxyInstance(
                interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new InvocationHandler() {
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        RpcRequest request = new RpcRequest(); // 建立並初始化 RPC 請求
                        request.setRequestId(UUID.randomUUID().toString());
                        request.setClassName(method.getDeclaringClass().getName());
                        request.setMethodName(method.getName());
                        request.setParameterTypes(method.getParameterTypes());
                        request.setParameters(args);
    
                        if (serviceDiscovery != null) {
                            serverAddress = serviceDiscovery.discover(); // 發現服務
                        }
    
                        String[] array = serverAddress.split(":");
                        String host = array[0];
                        int port = Integer.parseInt(array[1]);
    
                        RpcClient client = new RpcClient(host, port); // 初始化 RPC 客戶端
                        RpcResponse response = client.send(request); // 經過 RPC 客戶端發送 RPC 請求並獲取 RPC 響應
    
                        if (response.getError() != null) {
                            throw response.getError();
                        } else {
                            return response.getResult();
                        }
                    }
                }
            );
        }
    }

    listen-rpc-client工程客戶端類RpcClient

  • package com.listen.rpc.client;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.listen.rpc.common.RpcDecoder;
    import com.listen.rpc.common.RpcEncoder;
    import com.listen.rpc.common.RpcRequest;
    import com.listen.rpc.common.RpcResponse;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {
    
    	private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class);
    
        private String host;
        private int port;
    
        private RpcResponse response;
    
        private final Object obj = new Object();
    
        public RpcClient(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        @Override
        public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
            this.response = response;
    
            synchronized (obj) {
                obj.notifyAll(); // 收到響應,喚醒線程
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            LOGGER.error("client caught exception", cause);
            ctx.close();
        }
    
        public RpcResponse send(RpcRequest request) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline()
                                .addLast(new RpcEncoder(RpcRequest.class)) // 將 RPC 請求進行編碼(爲了發送請求)
                                .addLast(new RpcDecoder(RpcResponse.class)) // 將 RPC 響應進行解碼(爲了處理響應)
                                .addLast(RpcClient.this); // 使用 RpcClient 發送 RPC 請求
                        }
                    })
                    .option(ChannelOption.SO_KEEPALIVE, true);
    
                ChannelFuture future = bootstrap.connect(host, port).sync();
                future.channel().writeAndFlush(request).sync();
    
                synchronized (obj) {
                    obj.wait(); // 未收到響應,使線程等待
                }
    
                if (response != null) {
                    future.channel().closeFuture().sync();
                }
                return response;
            } finally {
                group.shutdownGracefully();
            }
        }
    
    }

    listen-rpc-client工程服務發現類ServiceDiscovery

  • package com.listen.rpc.client;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.listen.rpc.common.Constant;
    
    import io.netty.util.internal.ThreadLocalRandom;
    
    public class ServiceDiscovery {
    
    	private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);
    
        private CountDownLatch latch = new CountDownLatch(1);
    
        private volatile List<String> dataList = new ArrayList<String>();
    
        private String registryAddress;
    
        public ServiceDiscovery(String registryAddress) {
            this.registryAddress = registryAddress;
    
            ZooKeeper zk = connectServer();
            if (zk != null) {
                watchNode(zk);
            }
        }
    
        public String discover() {
            String data = null;
            int size = dataList.size();
            if (size > 0) {
                if (size == 1) {
                    data = dataList.get(0);
                    LOGGER.debug("using only data: {}", data);
                } else {
                    data = dataList.get(ThreadLocalRandom.current().nextInt(size));
                    LOGGER.debug("using random data: {}", data);
                }
            }
            return data;
        }
    
        private ZooKeeper connectServer() {
            ZooKeeper zk = null;
            try {
                zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                    public void process(WatchedEvent event) {
                        if (event.getState() == Event.KeeperState.SyncConnected) {
                            latch.countDown();
                        }
                    }
                });
                latch.await();
            } catch (IOException e) {
                LOGGER.error("", e);
            } catch (InterruptedException e){
            	LOGGER.error("", e);
            }
            return zk;
        }
    
        private void watchNode(final ZooKeeper zk) {
            try {
                List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
                    public void process(WatchedEvent event) {
                        if (event.getType() == Event.EventType.NodeChildrenChanged) {
                            watchNode(zk);
                        }
                    }
                });
                List<String> dataList = new ArrayList<String>();
                for (String node : nodeList) {
                    byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);
                    dataList.add(new String(bytes));
                }
                LOGGER.debug("node data: {}", dataList);
                this.dataList = dataList;
            } catch (KeeperException e) {
            	e.printStackTrace();
                LOGGER.error("", e);
            } catch (InterruptedException e){
            	LOGGER.error("", e);
            }
        }
    }

    listen-rpc-client工程配置文件config.properties

  • # ZooKeeper 服務器
    registry.address=127.0.0.1:2181
    #以上配置代表:鏈接本地的 ZooKeeper 服務器,並在 8000 端口上發佈 RPC 服務。

    listen-rpc-client工程spring.xml配置文件

  • <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:aop="http://www.springframework.org/schema/aop" xmlns:p="http://www.springframework.org/schema/p"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
               http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
               http://www.springframework.org/schema/context
               http://www.springframework.org/schema/context/spring-context-3.2.xsd
                http://www.springframework.org/schema/aop
               http://www.springframework.org/schema/aop/spring-aop-3.2.xsd">
        <!-- 啓用spring mvc 註解 -->
        <context:annotation-config />
        <!-- 完成請求和註解POJO的映射 -->
        <bean class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter" />
        <!-- 對轉向頁面的路徑解析。prefix:前綴, suffix:後綴 -->
        <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver" p:prefix="/WEB-INF/jsp/" p:suffix=".jsp" />
        <context:component-scan base-package="com.listen.rpc" />
        <context:property-placeholder location="classpath:config.properties"/>
    
        <!-- 配置服務發現組件 -->
        <bean id="serviceDiscovery" class="com.listen.rpc.client.ServiceDiscovery">
            <constructor-arg name="registryAddress" value="${registry.address}"/>
        </bean>
    
        <!-- 配置 RPC 代理 -->
        <bean id="rpcProxy" class="com.listen.rpc.client.RpcProxy">
            <constructor-arg name="serviceDiscovery" ref="serviceDiscovery"/>
        </bean>
        
    </beans>

     

至此,客戶端代碼編寫完畢,接下來就是測試了

測試服務接口

  • 啓動服務端 tomcat7:run;勾選Skip test
  •  
  • 啓動客戶端 tomcat7:run;勾選Skip test
  • 訪問地址測試
  • http://localhost:8080/hello?name=aaa 返回helloaaa
  • http://localhost:8080/getUser?name=aaa 返回{"birthday":1464258865975,"name":"aaa","sex":true}
  • http://localhost:8080/getUsers?size=5 返回{"birthday":1464345304049,"name":"foo","sex":true}
    {"birthday":1464431704049,"name":"foo","sex":false}
    {"birthday":1464518104049,"name":"foo","sex":true}
    {"birthday":1464604504049,"name":"foo","sex":false}
    {"birthday":1464690904049,"name":"foo","sex":true}
  • http://localhost:8080/updateUser?name=kaka 返回{"birthday":1464258949048,"name":"kaka-update","sex":true}

至此說明服務能夠正常發佈及調用。

須要注意的點爲Zookeeper數據地址的獲取,可參考 Windows下Zookeeper的簡單配置

 

工程源碼地址:

https://git.oschina.net/git.listen/listen-parent

https://git.oschina.net/git.listen/listen-rpc-common

https://git.oschina.net/git.listen/listen-rpc-server

https://git.oschina.net/git.listen/listen-rpc-service

https://git.oschina.net/git.listen/listen-rpc-service-impl

https://git.oschina.net/git.listen/listen-rpc-client

相關文章
相關標籤/搜索