springboot使用zookeeper(curator)實現註冊發現與負載均衡

最簡單的實現服務高可用的方法就是集羣化,也就是分佈式部署,可是分佈式部署會帶來一些問題。好比:java

一、各個實例之間的協同(鎖)spring

二、負載均衡apache

三、熱刪除api

這裏經過一個簡單的實例來講明如何解決註冊發現和負載均衡。服務器

 

一、先解決依賴,這裏只給出zk相關的依賴,pom.xml以下session

  <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.9.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>2.9.1</version>
  </dependency>

 

二、ZkClient負載均衡

這裏使用的是curator,curator是對zookeeper的簡單封裝,提供了一些集成的方法,或者是提供了更優雅的api,舉例來講分佈式

zk的create(path, mode, acl, data)方法 == curator create().withMode(mode).forPath(path)調用鏈ui

package com.dqa.prometheus.client.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;

public class ZkClient {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private CuratorFramework client;
    private String zookeeperServer;
    private int sessionTimeoutMs;
    private int connectionTimeoutMs;
    private int baseSleepTimeMs;
    private int maxRetries;

    public void setZookeeperServer(String zookeeperServer) {
        this.zookeeperServer = zookeeperServer;
    }
    public String getZookeeperServer() {
        return zookeeperServer;
    }
    public void setSessionTimeoutMs(int sessionTimeoutMs) {
        this.sessionTimeoutMs = sessionTimeoutMs;
    }
    public int getSessionTimeoutMs() {
        return sessionTimeoutMs;
    }
    public void setConnectionTimeoutMs(int connectionTimeoutMs) {
        this.connectionTimeoutMs = connectionTimeoutMs;
    }
    public int getConnectionTimeoutMs() {
        return connectionTimeoutMs;
    }
    public void setBaseSleepTimeMs(int baseSleepTimeMs) {
        this.baseSleepTimeMs = baseSleepTimeMs;
    }
    public int getBaseSleepTimeMs() {
        return baseSleepTimeMs;
    }
    public void setMaxRetries(int maxRetries) {
        this.maxRetries = maxRetries;
    }
    public int getMaxRetries() {
        return maxRetries;
    }

    public void init() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        client = CuratorFrameworkFactory.builder().connectString(zookeeperServer).retryPolicy(retryPolicy)
                .sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).build();
        client.start();
    }

    public void stop() {
        client.close();
    }

    public CuratorFramework getClient() {
        return client;
    }

    public void register() {
        try {
            String rootPath = "/" + "services";
            String hostAddress = InetAddress.getLocalHost().getHostAddress();
            String serviceInstance = "prometheus" + "-" +  hostAddress + "-";
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(rootPath + "/" + serviceInstance);
        } catch (Exception e) {
            logger.error("註冊出錯", e);
        }
    }

    public List<String> getChildren(String path) {
        List<String> childrenList = new ArrayList<>();
        try {
            childrenList = client.getChildren().forPath(path);
        } catch (Exception e) {
            logger.error("獲取子節點出錯", e);
        }
        return childrenList;
    }

    public int getChildrenCount(String path) {
        return getChildren(path).size();
    }

    public List<String> getInstances() {
        return getChildren("/services");
    }

    public int getInstancesCount() {
        return getInstances().size();
    }
}

二、configuration以下this

package com.dqa.prometheus.configuration;


import com.dqa.prometheus.client.zookeeper.ZkClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ZkConfiguration {
    @Value("${zookeeper.server}")
    private String zookeeperServer;
    @Value(("${zookeeper.sessionTimeoutMs}"))
    private int sessionTimeoutMs;
    @Value("${zookeeper.connectionTimeoutMs}")
    private int connectionTimeoutMs;
    @Value("${zookeeper.maxRetries}")
    private int maxRetries;
    @Value("${zookeeper.baseSleepTimeMs}")
    private int baseSleepTimeMs;

    @Bean(initMethod = "init", destroyMethod = "stop")
    public ZkClient zkClient() {
        ZkClient zkClient = new ZkClient();
        zkClient.setZookeeperServer(zookeeperServer);
        zkClient.setSessionTimeoutMs(sessionTimeoutMs);
        zkClient.setConnectionTimeoutMs(connectionTimeoutMs);
        zkClient.setMaxRetries(maxRetries);
        zkClient.setBaseSleepTimeMs(baseSleepTimeMs);
        return zkClient;
    }

}

配置文件以下

#============== zookeeper ===================
zookeeper.server=10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181
zookeeper.sessionTimeoutMs=6000
zookeeper.connectionTimeoutMs=6000
zookeeper.maxRetries=3
zookeeper.baseSleepTimeMs=1000

 

三、註冊發現

是經過上面封裝的ZkClient中的register方法實現的,調用以下。

package com.dqa.prometheus;

import com.dqa.prometheus.client.zookeeper.ZkClient;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.orm.jpa.EntityScan;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableAsync
@EnableScheduling
@EntityScan(basePackages="com.xiaoju.dqa.prometheus.model")
public class Application {
    public static void main(String[] args) {
        ApplicationContext context = SpringApplication.run(Application.class, args);
        ZkClient zkClient = context.getBean(ZkClient.class);
        zkClient.register();
    }
}

註冊代碼說明:

 public void register() {
        try {
            String rootPath = "/" + "services";
            String hostAddress = InetAddress.getLocalHost().getHostAddress();
            String serviceInstance = "prometheus" + "-" +  hostAddress + "-";
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(rootPath + "/" + serviceInstance);
        } catch (Exception e) {
            logger.error("註冊出錯", e);
        }
    }

一、zk中的註冊路徑

/services/prometheus-10.93.21.21-00000000001

二、CreateMode有四種,選擇EPHEMERAL_SEQUENTIAL的緣由是,服務關閉的時候session超時,zk節點會自動刪除,同時自增id能夠實現鎖和負載均衡,下面再說

1、PERSISTENT

持久化目錄節點,存儲的數據不會丟失。

2、PERSISTENT_SEQUENTIAL

順序自動編號的持久化目錄節點,存儲的數據不會丟失,而且根據當前已近存在的節點數自動加 1,而後返回給客戶端已經成功建立的目錄節點名。

3、EPHEMERAL

臨時目錄節點,一旦建立這個節點的客戶端與服務器端口也就是session 超時,這種節點會被自動刪除。 

4、EPHEMERAL_SEQUENTIAL

臨時自動編號節點,一旦建立這個節點的客戶端與服務器端口也就是session 超時,這種節點會被自動刪除,而且根據當前已近存在的節點數自動加 1,而後返回給客戶端已經成功建立的目錄節點名。

四、負載均衡

     /*
        *   我是第幾個實例, 作負載均衡
        * */
        List<String> instanceList = zkClient.getInstances();
        Collections.sort(instanceList);
        String hostAddress = NetFunction.getAddressHost();
        int instanceNo = 0;
        if (hostAddress !=  null) {
            for (int i=0; i<instanceList.size(); i++) {
                if (instanceList.get(i).split("-")[1].equals(hostAddress)) {
                    instanceNo = i;
                }
            }
        } else {
            logger.info("獲取本地IP失敗");
        }
        logger.info("[分發] 實例總數={}, 我是第{}個實例", instanceCount, instanceNo);
        List<CheckTask> waitingTasks = checkTaskDao.getTasks(taskType, TaskStatus.WAITING.getValue());
        Iterator<CheckTask> waitingIterator = waitingTasks.iterator();
        while (waitingIterator.hasNext()) {
            if (waitingIterator.next().getTaskId().hashCode() % instanceCount != instanceNo) {
                waitingIterator.remove();
            }
        }

說明:

一、例若有3個實例(zkClient.getInstances()),那麼經過IP咱們把3個實例按照自增id排序分別標號爲0,1,2

二、對第一個實例也就是instanceNo=0,只執行taskId.hashCode() % 3 == 0的任務,其餘兩個實例相似

三、當有一個實例掛掉,2個實例,instanceNo=0只執行taskId.hashCode() % 2 == 0的任務,實現熱刪除

相關文章
相關標籤/搜索