zk分佈式任務管理

在咱們的系統開發過程 中不可避免的會使用到定時任務的功能,而當咱們在生產環境部署的服務超過1臺時,就須要考慮任務調度的問題,防止兩臺或多臺服務器上執行同一個任務,這個問題今天我們就用zookeeper來解決。java

zookeeper的存儲模型

Zookeeper的數據存儲採用的是結構化存儲,結構化存儲是沒有文件和目錄的概念,裏邊的目錄和文件被抽象成了節點(node),zookeeper裏能夠稱爲znode。Znode的層次結構以下圖:node

每一個子目錄項如 NameService 都被稱做爲 znode(目錄節點),和文件系統同樣,咱們可以自由的增長、刪除znode,在一個znode下增長、刪除子znode,惟一的不一樣在於znode是能夠存儲數據的。spring

znode類型

  • PERSISTENT-持久化目錄節點apache

    客戶端與zookeeper斷開鏈接後,該節點依舊存在api

  • PERSISTENT_SEQUENTIAL-持久化順序編號目錄節點服務器

    客戶端與zookeeper斷開鏈接後,該節點依舊存在,只是Zookeeper給該節點名稱進行順序編號session

  • EPHEMERAL-臨時目錄節點dom

    客戶端與zookeeper斷開鏈接後,該節點被刪除分佈式

  • EPHEMERAL_SEQUENTIAL-臨時順序編號目錄節點ide

    客戶端與zookeeper斷開鏈接後,該節點被刪除,只是Zookeeper給該節點名稱進行順序編號

監聽通知機制

客戶端註冊監聽它關心的目錄節點,當目錄節點發生變化(數據改變、被刪除、子目錄節點增長刪除)時,zookeeper會通知客戶端。基於這種監聽,能夠實現註冊中心、分佈式同步等功能。

zk分佈式任務管理機制

使用zookeeper的臨時順序節點,來實現分佈式任務的調度功能,每一臺服務啓動的時候都向zookeepe指定的目錄下注冊一下臨時順序節點,並把該節點記錄的系統裏,每一次任務執行的時候,獲取全部的有序節點,跟當前系統創愛你的節點對比,若是當前服務建立的節點是全部節點中最小的,則執行任務,不然不執行任務,以下如所示:

 

代碼實現

一、pom引用

 <zookeeper.version>3.4.8</zookeeper.version>
<curator.version>2.11.1</curator.version>

<dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>${zookeeper.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>${curator.version}</version>
        </dependency>

 

二、ZkClient類

該類封裝了zookeeper的操做類,服務啓動的時候迴向zk上註冊有序臨時節點,目錄爲:/demo1/task/n,例如:/demo1/task/n00000001,/demo1/task/n00000002,建立的節點路徑保存到變量:curTaskNodeId

package com.blogs.client;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.springframework.stereotype.Component;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
@Data
public class ZkClient {
    private CuratorFramework client;
    public TreeCache cache;
    //記錄當前服務在zk上建立的nodeId
    public String curTaskNodeId="";
    //private ZookeeperProperties zookeeperProperties;
    
    public ZkClient(){
        init();
    }
    
    /**
     * 初始化zookeeper
     */
    public void init(){
        try {
            //初始sleep時間 ,毫秒,
            int baseSleepTimeMs=1000;
            //最大重試次數
            int maxRetries=3;
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs,maxRetries);
            Builder builder   = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181").retryPolicy(retryPolicy)
                    .sessionTimeoutMs( 1000) //會話超時時間,單位爲毫秒,默認60000ms,鏈接斷開後,其它客戶端還能請到臨時節點的時間
                    .connectionTimeoutMs( 6000)//鏈接建立超時時間,單位爲毫秒
                    .namespace( "demo1");//zk的根節點
            //如下注釋的爲建立節點的用戶名密碼
            //builder.authorization("digest", "rt:rt".getBytes("UTF-8"));
            /*
             builder.aclProvider(new ACLProvider() {
                 @Override
                 public List<ACL> getDefaultAcl() {
                     return ZooDefs.Ids.CREATOR_ALL_ACL;
                 }

                 @Override
                 public List<ACL> getAclForPath(final String path) {
                     return ZooDefs.Ids.CREATOR_ALL_ACL;
                 }
             });*/
             client = builder.build();
             client.start();
             
             client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                    public void stateChanged(CuratorFramework client, ConnectionState state) {
                        if (state == ConnectionState.LOST) {
                            //鏈接丟失
                            log.info("lost session with zookeeper");
                        } else if (state == ConnectionState.CONNECTED) {
                            //鏈接新建
                            log.info("connected with zookeeper");
                        } else if (state == ConnectionState.RECONNECTED) {
                            log.info("reconnected with zookeeper");
                        }
                    }
                });
             System.out.println("zk初始化完成");
             //獲取當前服務啓動時建立的節點,臨時有序節點,用做定時任務的執行
             curTaskNodeId=createNode(CreateMode.EPHEMERAL_SEQUENTIAL,"/task/n","");
            
        } catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        }
    }
    
    public void stop() {
        client.close();
    }
    
     public CuratorFramework getClient() {
            return client;
        }
     /**
         * 建立節點
         * @param mode       節點類型
         * 一、PERSISTENT 持久化目錄節點,存儲的數據不會丟失。
         * 二、PERSISTENT_SEQUENTIAL順序自動編號的持久化目錄節點,存儲的數據不會丟失
         * 三、EPHEMERAL臨時目錄節點,一旦建立這個節點的客戶端與服務器端口也就是session 超時,這種節點會被自動刪除
         *四、EPHEMERAL_SEQUENTIAL臨時自動編號節點,一旦建立這個節點的客戶端與服務器端口也就是session 超時,這種節點會被自動刪除,而且根據當前已近存在的節點數自動加 1,而後返回給客戶端已經成功建立的目錄節點名。
         * @param path  節點名稱
         * @param nodeData  節點數據
         */
        public String createNode(CreateMode mode, String path , String nodeData) {
            String nodepath="";
            try {
                //使用creatingParentContainersIfNeeded()以後Curator可以自動遞歸建立全部所需的父節點
                nodepath =  client.create().creatingParentsIfNeeded().withMode(mode).forPath(path,nodeData.getBytes("UTF-8"));
                System.out.println(nodepath);
            } catch (Exception e) {
                log.error("註冊出錯", e);
            }
            return nodepath;
        }
        
        /**
         * 建立節點
         * @param mode       節點類型
         *                   一、PERSISTENT 持久化目錄節點,存儲的數據不會丟失。
         *                   二、PERSISTENT_SEQUENTIAL順序自動編號的持久化目錄節點,存儲的數據不會丟失
         *                   三、EPHEMERAL臨時目錄節點,一旦建立這個節點的客戶端與服務器端口也就是session 超時,這種節點會被自動刪除
         *                   四、EPHEMERAL_SEQUENTIAL臨時自動編號節點,一旦建立這個節點的客戶端與服務器端口也就是session 超時,這種節點會被自動刪除,而且根據當前已近存在的節點數自動加 1,而後返回給客戶端已經成功建立的目錄節點名。
         * @param path  節點名稱
         */
        public void createNode(CreateMode mode,String path ) {
            try {
                //使用creatingParentContainersIfNeeded()以後Curator可以自動遞歸建立全部所需的父節點
                client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);
            } catch (Exception e) {
                log.error("註冊出錯", e);
            }
        }

        /**
         * 刪除節點數據
         *
         * @param path
         */
        public void deleteNode(final String path) {
            try {
                deleteNode(path,true);
            } catch (Exception ex) {
                log.error("{}",ex);
            }
        }


        /**
         * 刪除節點數據
         * @param path
         * @param deleteChildre   是否刪除子節點
         */
        public void deleteNode(final String path,Boolean deleteChildre){
            try {
                if(deleteChildre){
                    //guaranteed()刪除一個節點,強制保證刪除,
                    // 只要客戶端會話有效,那麼Curator會在後臺持續進行刪除操做,直到刪除節點成功
                    client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
                }else{
                    client.delete().guaranteed().forPath(path);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }


        /**
         * 設置指定節點的數據
         * @param path
         * @param datas
         */
        public void setNodeData(String path, byte[] datas){
            try {
                client.setData().forPath(path, datas);
            }catch (Exception ex) {
                log.error("{}",ex);
            }
        }

        /**
         * 獲取指定節點的數據
         * @param path
         * @return
         */
        public byte[] getNodeData(String path){
            Byte[] bytes = null;
            try {
                if(cache != null){
                    ChildData data = cache.getCurrentData(path);
                    if(data != null){
                        return data.getData();
                    }
                }
                client.getData().forPath(path);
                return client.getData().forPath(path);
            }catch (Exception ex) {
                log.error("{}",ex);
            }
            return null;
        }

        /**
         * 獲取數據時先同步
         * @param path
         * @return
         */
        public byte[] synNodeData(String path){
            client.sync();
            return getNodeData( path);
        }

        /**
         * 判斷路徑是否存在
         *
         * @param path
         * @return
         */
        public boolean isExistNode(final String path) {
            client.sync();
            try {
                return null != client.checkExists().forPath(path);
            } catch (Exception ex) {
                return false;
            }
        }


        /**
         * 獲取節點的子節點
         * @param path
         * @return
         */
        public List<String> getChildren(String path) {
            List<String> childrenList = new ArrayList<>();
            try {
                childrenList = client.getChildren().forPath(path);
            } catch (Exception e) {
                log.error("獲取子節點出錯", e);
            }
            return childrenList;
        }

        /**
         * 隨機讀取一個path子路徑, "/"爲根節點對應該namespace
         * 先從cache中讀取,若是沒有,再從zookeeper中查詢
         * @param path
         * @return
         * @throws Exception
         */
        public String getRandomData(String path)  {
            try{
                Map<String,ChildData> cacheMap = cache.getCurrentChildren(path);
                if(cacheMap != null && cacheMap.size() > 0) {
                    log.debug("get random value from cache,path="+path);
                    Collection<ChildData> values = cacheMap.values();
                    List<ChildData> list = new ArrayList<>(values);
                    Random rand = new Random();
                    byte[] b = list.get(rand.nextInt(list.size())).getData();
                    return new String(b,"utf-8");
                }
                if(isExistNode(path)) {
                    log.debug("path [{}] is not exists,return null",path);
                    return null;
                } else {
                    log.debug("read random from zookeeper,path="+path);
                    List<String> list = client.getChildren().forPath(path);
                    if(list == null || list.size() == 0) {
                        log.debug("path [{}] has no children return null",path);
                        return null;
                    }
                    Random rand = new Random();
                    String child = list.get(rand.nextInt(list.size()));
                    path = path + "/" + child;
                    byte[] b = client.getData().forPath(path);
                    String value = new String(b,"utf-8");
                    return value;
                }
            }catch(Exception e){
                log.error("{}",e);
            }
            return null;

        }

        

        /**
         * 獲取讀寫鎖
         * @param path
         * @return
         */
        public InterProcessReadWriteLock getReadWriteLock(String path){
            InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, path);
            return readWriteLock;
        }

        /**
         * 在註冊監聽器的時候,若是傳入此參數,當事件觸發時,邏輯由線程池處理
         */
        ExecutorService pool = Executors.newFixedThreadPool(2);

        /**
         * 監聽數據節點的變化狀況
         * @param watchPath
         * @param listener
         */
        public void watchPath(String watchPath,TreeCacheListener listener){
         //   NodeCache nodeCache = new NodeCache(client, watchPath, false);
            TreeCache cache = new TreeCache(client, watchPath);
            cache.getListenable().addListener(listener,pool);
            try {
                cache.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

}

 

三、定時任務調用

package com.blogs.client;

import java.time.LocalDateTime;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
@EnableScheduling
public class ScheduleTask {
    
    @Autowired
    private ZkClient zkClient;

    //添加定時任務
    @Scheduled(cron = "0/5 * * * * ?")
    private void configureTasks() {
        System.out.println("開始執行任務");
        //獲取全部節點
        List<String> taskNodes=zkClient.getChildren("/task");
        //查找最小節點
        int minNodeNum=Integer.MAX_VALUE;
        for (int i = 0; i < taskNodes.size(); i++) {
            //節點前面有一個n,把n替換掉,剩下的轉換爲數字
            int nodeNum=Integer.valueOf(taskNodes.get(i).replace("n", ""));
            if(nodeNum < minNodeNum){
                minNodeNum = nodeNum;
            }
            System.out.println("節點:"+taskNodes.get(i));
        }
        System.out.println("當前節點:"+zkClient.getCurTaskNodeId());
        //若是最小節點 等於該服務建立的節點,則執行任務
        int curNodeNum=Integer.valueOf(zkClient.getCurTaskNodeId().substring(zkClient.getCurTaskNodeId().lastIndexOf('/') + 2));
        if(minNodeNum - curNodeNum  == 0){
            System.out.println("執行任務");
        }else {
            System.out.println("不執行任務");
        }
        
        System.err.println("執行靜態定時任務時間: " + LocalDateTime.now());
    }
}

 

當前服務建立的服務爲節點最小的,則執行服務,不然不執行服務

執行結果

把服務的端口分別修改成:8080,8081,模擬啓動兩個服務,查看定時任務的執行狀況

當把兩個服務的任何一個服務關閉,定時任務還能夠正常執行。

zkCli查看查建立的目錄結構

 

做者: Eric.Chen
出處: https://www.cnblogs.com/lc-chenlong
若是喜歡做者的文章,請關注「寫代碼的猿」訂閱號以便第一時間得到最新內容。本文版權歸做者全部,歡迎轉載
相關文章
相關標籤/搜索