RPC框架原理及從零實現系列文章(四):支持zookeeper註冊中心與負載均衡

Java打造RPC框架系列第四篇

上一篇文章中 給你們講了zookeeper做爲註冊中心的基本原理java

http://blog.csdn.net/we_phone/article/details/78993394node

這篇文章中 我講的是RPC框架接入對單點zookeeper的支持的源碼實現git

詳細代碼可見:Github MeiZhuoRPCgithub

看懂這篇文章須要的前提

  1. 看了前面的系列文章
  2. 熟悉java.util.concurrent包

文章出於詳細講解的目的,篇幅較長 望耐心閱讀 有疑問歡迎評論spring

首先看使用效果

咱們啓動兩個提供者服務數據庫

 

@Test
    public void multi1and2() throws InterruptedException, IOException {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                new String[] { "file:src/test/java/rpcTest/MultiServer1and2Context.xml" });
        context.start();
        //啓動spring後纔可啓動 防止容器還沒有加載完畢
        RPC.start();
    }

    @Test
    public void multi2() throws InterruptedException, IOException {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                new String[] { "file:src/test/java/rpcTest/MultiServer2Context.xml" });
        context.start();
        //啓動spring後纔可啓動 防止容器還沒有加載完畢
        RPC.start();
    }


下面是他們對應的spring配置文件json

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    <bean class="org.meizhuo.rpc.server.ServerConfig">
        <property name="port" value="9012"></property>
        <property name="zooKeeperHost" value="127.0.0.1:2181"></property>
        <property name="serverImplMap">
            <map>
                <!--配置對應的抽象接口及其實現-->
                <entry key="rpcTest.Service1" value="rpcTest.Service1Impl"></entry>
                <entry key="rpcTest.Service2" value="rpcTest.Service2Impl"></entry>
            </map>
        </property>
    </bean>
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    <bean class="org.meizhuo.rpc.server.ServerConfig">
        <property name="port" value="9002"></property>
        <property name="zooKeeperHost" value="127.0.0.1:2181"></property>
        <property name="serverImplMap">
            <map>
                <!--配置對應的抽象接口及其實現-->
                <entry key="rpcTest.Service2" value="rpcTest.Service2Impl"></entry>
            </map>
        </property>
    </bean>
</beans>

一個提供者只註冊了service1 一個提供者service1 2都註冊了性能優化

接下來調用消費者服務器

 

ExecutorService executorService= Executors.newFixedThreadPool(8);
        for (int i = 0; i <1000 ; i++) {
            int finalI = i+1;
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    Service1 service1= (Service1) RPC.call(Service1.class);
                    Service2 service2= (Service2) RPC.call(Service2.class);
                    System.out.println("第"+ finalI +"次發出請求");
                    service1.count();
                    service2.count();
                }
            });
        }

 

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    <bean class="org.meizhuo.rpc.client.ClientConfig">
        <property name="zooKeeperHost" value="127.0.0.1:2181"></property>
        <property name="serviceInterface">
            <set>
                <value>rpcTest.Service1</value>
                <value>rpcTest.Service2</value>
            </set>
        </property>
        <!--負載均衡策略-->
        <property name="loadBalance" ref="Random"></property>
    </bean>
    <bean scope="prototype" class="org.meizhuo.rpc.zksupport.LoadBalance.RandomBalance" id="Random">
    </bean>
</beans>

 

這裏用線程池 發出1000個請求,每一個提供者者接收到請求後只是計數後輸出,這裏我就不貼代碼了 我github上的單元測試有 session

調用效果以下

即註冊service1也註冊2的提供者輸出:Service1 計數:1000 Service2 計數:515

只註冊service2的輸出:Service2 計數:485

485+515 恰好是1000

也就是service1只有1個提供者 請求就所有落在了一個節點上 service2 就分攤給了兩個節點

這就是咱們經過zookeeper註冊中心實現的負載均衡的RPC調用的效果
 

首先看MeiZhuoRPC中zookeeper數據模型

路徑的常量類以下

public class ZKConst {

    public static final Integer sessionTimeout=2000;
    public static final String rootPath="/MeiZhuoRPC";
    public static final String providersPath="/providers";
    public static final String consumersPath="/consumers";
    public static final String servicePath="/service";
}

meizhuoRPC 數據模型

本例中尚無使用到/consumers節點

每一個providers都進行監聽 監聽提供者IP的變化

 

zookeeper的操做

我封裝了ZKTempZnodes 做爲對zookeeper的主要操做

ZKServerService是對ZKTempZnodes的進一步封裝 

相似與數據庫操做Service和DAO的封裝

 

ZKServerService的操做以下

 

  • 初始化一些根節點,例如/MeiZhuo /service這些 不存在則須要建立。
  • 生成每一個服務的providers和ip節點
  • 得到並監聽全部的IP
//生成全部註冊的服務znode
    public void createServerService() throws KeeperException, InterruptedException {
        ZKTempZnodes zkTempZnodes=new ZKTempZnodes(zooKeeper);
        Map<String,String> serviceMap= RPC.getServerConfig().getServerImplMap();
        String ip=RPC.getServerConfig().getServerHost();
        for (Map.Entry<String,String> entry:serviceMap.entrySet()){
            //獲取配置中設置的IP設置爲IP順序節點的值 默認127.0.0.1:8888
            zkTempZnodes.createTempZnode(ZKConst.rootPath+ZKConst.servicePath+"/"+entry.getKey()+ZKConst.providersPath+"/"+ip,null);
            //建立鏈接數節點 首次增長時鏈接數爲0
//            zkTempZnodes.createTempZnode(ZKConst.rootPath+ZKConst.balancePath+"/"+entry.getKey()+"/"+ip,0+"");
        }
    }

    //得到這個服務全部的提供者 包含監聽註冊
    public List<String> getAllServiceIP(String serviceName) throws KeeperException, InterruptedException {
        ZKTempZnodes zkTempZnodes=new ZKTempZnodes(zooKeeper);
        IPWatcher ipWatcher=new IPWatcher(zooKeeper);
        return zkTempZnodes.getPathChildren(ZKConst.rootPath+ZKConst.servicePath+"/"+serviceName+ZKConst.providersPath,ipWatcher);
    }

    //初始化根節點及服務提供者節點 均爲持久節點
    public void initZnode() throws KeeperException, InterruptedException {
        ZKTempZnodes zkTempZnodes=new ZKTempZnodes(zooKeeper);
        StringBuilder pathBuilder=new StringBuilder(ZKConst.rootPath);
//        String balancePath=ZKConst.rootPath;
        zkTempZnodes.createSimpleZnode(pathBuilder.toString(),null);
//        balancePath=balancePath+ZKConst.balancePath;
//        zkTempZnodes.createSimpleZnode(balancePath,null);
        pathBuilder.append(ZKConst.servicePath);
        zkTempZnodes.createSimpleZnode(pathBuilder.toString(),null);
        Map<String,String> serverImplMap=RPC.getServerConfig().getServerImplMap();
        for (Map.Entry<String,String> entry:serverImplMap.entrySet()){
//            zkTempZnodes.createSimpleZnode(balancePath+"/"+entry.getKey(),null);
            StringBuilder serviceBuilder=new StringBuilder(pathBuilder.toString());
            serviceBuilder.append("/");
            serviceBuilder.append(entry.getKey());
            zkTempZnodes.createSimpleZnode(serviceBuilder.toString(),null);
            serviceBuilder.append(ZKConst.providersPath);
            zkTempZnodes.createSimpleZnode(serviceBuilder.toString(),null);
        }
    }

負載均衡策略

常見的負載均衡策略,(加權)隨機,輪詢,最小鏈接數,一致性Hash

這裏我只寫好了隨機和一致性hash方式的負載均衡

一致性hash原理講解篇幅較多 讀者能夠自行查閱和看我github上對應的實現

 

負載均衡我用一個接口抽象出來

/**
 * Created by wephone on 18-1-8.
 * 負載均衡策略抽象接口
 * 其餘模塊不耦合負載均衡代碼
 */
public interface LoadBalance {

    /**
     * 負載均衡選擇服務中已選中的IP之一
     * @param serviceName
     * @return
     */
    String chooseIP(String serviceName) throws ProvidersNoFoundException;
}

各類負載均衡策略對這個接口進行具體的實現 

也就是依賴倒置原則  依賴抽象 不依賴具體實現
 

隨機負載均衡的實現以下

/**
 * Created by wephone on 18-1-18.
 */
public class RandomBalance implements LoadBalance {

    @Override
    public String chooseIP(String serviceName) throws ProvidersNoFoundException {
        RPCRequestNet.getInstance().serviceLockMap.get(serviceName).readLock().lock();
        Set<String> ipSet=RPCRequestNet.getInstance().serviceNameInfoMap.get(serviceName).getServiceIPSet();
        int ipNum=ipSet.size();
        if (ipNum==0){
            throw new ProvidersNoFoundException();
        }
        RPCRequestNet.getInstance().serviceLockMap.get(serviceName).readLock().unlock();
        Random random = new Random();
        //生成[0,num)區間的整數:
        int index = random.nextInt(ipNum);
        int count = 0;
        for (String ip : ipSet) {
            if (count == index) {
                //返回隨機生成的索引位置ip
                return ip;
            }
            count++;
        }
        return null;
    }
}

服務端的代碼

服務端相對比較簡單,就是啓動netty服務器,而後向zookeeper註冊本身的IP節點

 

RPC.start方法以下

 

public static void start() throws InterruptedException, IOException {
        System.out.println("welcome to use MeiZhuoRPC");
        ZooKeeper zooKeeper= new ZKConnect().serverConnect();
        ZKServerService zkServerService=new ZKServerService(zooKeeper);
        try {
            zkServerService.initZnode();
            //建立全部提供者服務的znode
            zkServerService.createServerService();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        //阻塞服務端不會退出
        RPCResponseNet.connect();
    }

服務端的更新基本在這裏,讀配置把提供的服務註冊在zookeeper上(初始化節點,建立服務節點)

服務端如此操做就能夠了 其他的和前面1.0版本的同樣 等待消費者鏈接便可

接下來看調用者端

調用者端主要使用的兩個類

serviceInfo和IPChannelInfo

 

/**
 * Created by wephone on 18-1-8.
 * 每一個服務對應的信息存放類
 * 用在一個key爲服務名字的serviceNameInfoMap裏
 */
public class ServiceInfo {

    //用於輪詢負載均衡策略
    private AtomicInteger index=new AtomicInteger(0);
    //這個服務所鏈接的提供者IP Set 只能由負載均衡類操做
    private Set<String> serviceIPSet=new HashSet<>();


//    public void setServiceIPSet(Set<String> serviceIPSet) {
    public void setServiceIPSet(List<String> newIPSet) {
        Set<String> set=new HashSet<>();
        set.addAll(newIPSet);
        this.serviceIPSet.clear();
        this.serviceIPSet.addAll(set);
    }

    public Set<String> getServiceIPSet() {
        return serviceIPSet;
    }

    public int getConnectIPSetCount(){
        return serviceIPSet.size();
    }

    public void addConnectIP(String IP) {
        serviceIPSet.add(IP);
    }

    public void removeConnectIP(String IP){
        serviceIPSet.remove(IP);
    }
}


 

ServiceInfo類 

負載均衡以服務做爲單位,由於每一個服務的提供者IP均可能不同。

一個服務(即一個抽象接口,例如上面的service1)對應一個這樣的類

保存了該服務提供者的全部IP以及輪詢策略的索引

後續可能擴展更多屬性

 

/**
 * Created by wephone on 18-1-8.
 * IP對應的channel類 用於一個IP映射的Map IPChannelMap
 * 存放一個IP對應的channel
 */
public class IPChannelInfo {

    private EventLoopGroup group;
    private Channel channel;
//    //保證多線程修改時引用計數正確
//    private AtomicInteger serviceQuoteNum=new AtomicInteger(0);//原子變量要賦初值

    public EventLoopGroup getGroup() {
        return group;
    }

    public void setGroup(EventLoopGroup group) {
        this.group = group;
    }

    public Channel getChannel() {
        return channel;
    }

    public void setChannel(Channel channel) {
        this.channel = channel;
    }

}


 

IPChannelInfo類  

用於存儲一個提供者的IP地址對應NIO通道及其Netty線程組(本例中暫無使用到此屬性,以前想過可能提取釋放鏈接資源才放了這個屬性進來)

 

這兩個基本類在RPCRequestNet中用一個Map維持 都是全局單例的

 

public class RPCRequestNet {

    //全局map 每一個請求對應的鎖 用於同步等待每一個異步的RPC請求
    public Map requestLockMap=new ConcurrentHashMap<String,RPCRequest>();
    //每一個IP對應一個鎖 防止重複鏈接一個IP屢次
    public Map<String,Lock> connectlock=new ConcurrentHashMap<String,Lock>();
    //服務名稱 映射 服務信息類
    public Map<String,ServiceInfo> serviceNameInfoMap=new ConcurrentHashMap<>();
    //IP地址 映射 對應的NIO Channel及其引用次數
    public Map<String,IPChannelInfo> IPChannelMap=new ConcurrentHashMap<>();
    //全局讀寫鎖 更新ip時爲寫操做 負載均衡選中IP爲讀操做
    public ConcurrentHashMap<String,ReadWriteLock> serviceLockMap=new ConcurrentHashMap<>();
//    public CountDownLatch countDownLatch=new CountDownLatch(1);
    private LoadBalance loadBalance;
    private static RPCRequestNet instance;

ServiceInfo的key爲服務名稱 即一個服務對應一個ServiceInfo

IPChannelInfo的key爲ip地址 即一個ip地址對應一個IPChannelInfo
 

說完基本的類,咱們來看看消費者端的啓動

clientConfig得到IOC容器後  得到全部可用的提供者IP 並做爲serviceInfo的初始值進行初始化

@Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RPC.clientContext=applicationContext;
        //得到IOC容器後 讀取配置中的服務
        try {
            ZooKeeper zooKeeper= new ZKConnect().clientConnect();
            ZKServerService zkServerService=new ZKServerService(zooKeeper);
            Set<String> services=RPC.getClientConfig().getServiceInterface();
            //初始化全部可用IP 初始化讀寫鎖
            for (String service:services){
                List<String> ips=zkServerService.getAllServiceIP(service);
                for (String ip:ips){
                    RPCRequestNet.getInstance().IPChannelMap.putIfAbsent(ip,new IPChannelInfo());
                }
                ServiceInfo serviceInfo=new ServiceInfo();
                serviceInfo.setServiceIPSet(ips);
                ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
                RPCRequestNet.getInstance().serviceLockMap.putIfAbsent(service,readWriteLock);
                RPCRequestNet.getInstance().serviceNameInfoMap.putIfAbsent(service,serviceInfo);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

zookeeper的watcher

這裏只使用到了一個watcher IPWatcher

 

/**
 * Created by wephone on 18-1-7.
 * 服務提供者和調用者的IP監控器 即監聽服務的可用性
 */
public class IPWatcher implements Watcher{

    private ZooKeeper zooKeeper;

    public IPWatcher(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        /**
         * 監聽到節點提供者IP節點變化時被調用
         * 調用後進行平衡操做
         */
        String path=watchedEvent.getPath();
        String[] pathArr=path.split("/");
        String serviceName=pathArr[3];//第四個部分則爲服務名
        RPCRequestNet.getInstance().serviceLockMap.get(serviceName).writeLock().lock();
        System.out.println("providers changed...Lock write Lock");
        try {
            List<String> children=zooKeeper.getChildren(path,this);
            for (String ip:children){
                RPCRequestNet.getInstance().IPChannelMap.putIfAbsent(ip,new IPChannelInfo());
            }
            RPCRequestNet.getInstance().serviceNameInfoMap.get(serviceName).setServiceIPSet(children);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        RPCRequestNet.getInstance().serviceLockMap.get(serviceName).writeLock().unlock();
    }
}

 

也就是收到watch通知後 從新獲取可用IP 寫入serviceInfo 並再次註冊watcher

這裏我用到一種鎖機制 讀寫鎖
當IP發生更新時,咱們改寫ServiceInfo中的IP集合 

這時須要阻塞IP的獲取操做 以防獲取到已經不存在的IP節點

因此採用讀寫鎖,各個RPC獲取IP時加讀鎖,相互不阻塞,當IP發生改變時 上寫鎖,相互阻塞 直至IP更新完畢

發送RPC請求時操做以下

 

public void send(RPCRequest request) throws ProvidersNoFoundException {
        String serviceName=request.getClassName();
        String ip=loadBalance.chooseIP(serviceName);
        Channel channel=connect(ip);
//        System.out.println("Send RPC Thread:"+Thread.currentThread().getName());
        try {
            //編解碼對象爲json 發送請求
            String requestJson= null;
            try {
                requestJson = RPC.requestEncode(request);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            ByteBuf requestBuf= Unpooled.copiedBuffer(requestJson.getBytes());
            channel.writeAndFlush(requestBuf);
//            System.out.println("調用"+request.getRequestID()+"已發送");
            //掛起等待實現端處理完畢返回 TODO 後續配置超時時間
            synchronized (request) {
                //放棄對象鎖 並阻塞等待notify
                request.wait();
            }
//            System.out.println("調用"+request.getRequestID()+"接收完畢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


即選擇IP 進行鏈接(若是已鏈接則在Map中取出Channel) 最後發送  基本和前面的版本一致

 

最後的補充

隨機和輪詢每一個提供者都要和所有的調用者保持長鏈接的問題,可能致使提供者維持的長鏈接數可能過多的問題

例如10調用者 5提供者,最優狀況下 一個提供者持有2個調用者鏈接便可,而上面的負載均衡方式,每一個提供者都須要持有10個長鏈接,即便再加多一個提供者 他依舊得持有10個長鏈接。

一致性hash策略每一個調用者只會映射給一個提供者,在調用者數量遠遠大於提供者時,大大減小了多餘的長鏈接,但在調用者數小於提供者的時候,會有部分提供者一直沒有收到請求的狀況。這時候建議使用隨機等策略,保證每一個提供者都能被負載均衡。

框架使用者能夠根據提供者調用者數量比較來選擇相應的負載均衡策略。

以前想了一些方式來優化使每一個提供者只持有夠用的長鏈接,並且不會出現部分提供者不被請求到的狀況,但最後都發現不合適,也比較複雜,所以github上的代碼會有一些我註釋掉和廢棄掉的類和方法,但不影響使用。

 


以上就是MeiZhuoRPC支持zookeeper註冊中心的核心代碼

主要是接入封裝了zookeeper的API和作了相應的負載均衡基礎設施

更詳細的代碼能夠在個人github上clone一份看看 

https://github.com/wephone/MeiZhuoRPC

 

 

 後續會繼續完善各類負載均衡策略,逐步支持集羣式zookeeper以及性能優化等等

歡迎持續關注個人博客及github

相關文章
相關標籤/搜索