在咱們的系統開發過程 中不可避免的會使用到定時任務的功能,而當咱們在生產環境部署的服務超過1臺時,就須要考慮任務調度的問題,防止兩臺或多臺服務器上執行同一個任務,這個問題今天我們就用zookeeper來解決。java
Zookeeper的數據存儲採用的是結構化存儲,結構化存儲是沒有文件和目錄的概念,裏邊的目錄和文件被抽象成了節點(node),zookeeper裏能夠稱爲znode。Znode的層次結構以下圖:node
每一個子目錄項如 NameService 都被稱做爲 znode(目錄節點),和文件系統同樣,咱們可以自由的增長、刪除znode,在一個znode下增長、刪除子znode,惟一的不一樣在於znode是能夠存儲數據的。spring
PERSISTENT-持久化目錄節點apache
客戶端與zookeeper斷開鏈接後,該節點依舊存在api
PERSISTENT_SEQUENTIAL-持久化順序編號目錄節點服務器
客戶端與zookeeper斷開鏈接後,該節點依舊存在,只是Zookeeper給該節點名稱進行順序編號session
EPHEMERAL-臨時目錄節點dom
客戶端與zookeeper斷開鏈接後,該節點被刪除分佈式
EPHEMERAL_SEQUENTIAL-臨時順序編號目錄節點ide
客戶端與zookeeper斷開鏈接後,該節點被刪除,只是Zookeeper給該節點名稱進行順序編號
客戶端註冊監聽它關心的目錄節點,當目錄節點發生變化(數據改變、被刪除、子目錄節點增長刪除)時,zookeeper會通知客戶端。基於這種監聽,能夠實現註冊中心、分佈式同步等功能。
使用zookeeper的臨時順序節點,來實現分佈式任務的調度功能,每一臺服務啓動的時候都向zookeepe指定的目錄下注冊一下臨時順序節點,並把該節點記錄的系統裏,每一次任務執行的時候,獲取全部的有序節點,跟當前系統創愛你的節點對比,若是當前服務建立的節點是全部節點中最小的,則執行任務,不然不執行任務,以下如所示:
一、pom引用
<zookeeper.version>3.4.8</zookeeper.version> <curator.version>2.11.1</curator.version> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>${zookeeper.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>${curator.version}</version> </dependency>
二、ZkClient類
該類封裝了zookeeper的操做類,服務啓動的時候迴向zk上註冊有序臨時節點,目錄爲:/demo1/task/n,例如:/demo1/task/n00000001,/demo1/task/n00000002,建立的節點路徑保存到變量:curTaskNodeId
package com.blogs.client; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.CuratorFrameworkFactory.Builder; import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.springframework.stereotype.Component; import lombok.Data; import lombok.extern.slf4j.Slf4j; @Component @Slf4j @Data public class ZkClient { private CuratorFramework client; public TreeCache cache; //記錄當前服務在zk上建立的nodeId public String curTaskNodeId=""; //private ZookeeperProperties zookeeperProperties; public ZkClient(){ init(); } /** * 初始化zookeeper */ public void init(){ try { //初始sleep時間 ,毫秒, int baseSleepTimeMs=1000; //最大重試次數 int maxRetries=3; RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs,maxRetries); Builder builder = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181").retryPolicy(retryPolicy) .sessionTimeoutMs( 1000) //會話超時時間,單位爲毫秒,默認60000ms,鏈接斷開後,其它客戶端還能請到臨時節點的時間 .connectionTimeoutMs( 6000)//鏈接建立超時時間,單位爲毫秒 .namespace( "demo1");//zk的根節點 //如下注釋的爲建立節點的用戶名密碼 //builder.authorization("digest", "rt:rt".getBytes("UTF-8")); /* builder.aclProvider(new ACLProvider() { @Override public List<ACL> getDefaultAcl() { return ZooDefs.Ids.CREATOR_ALL_ACL; } @Override public List<ACL> getAclForPath(final String path) { return ZooDefs.Ids.CREATOR_ALL_ACL; } });*/ client = builder.build(); client.start(); client.getConnectionStateListenable().addListener(new ConnectionStateListener() { public void stateChanged(CuratorFramework client, ConnectionState state) { if (state == ConnectionState.LOST) { //鏈接丟失 log.info("lost session with zookeeper"); } else if (state == ConnectionState.CONNECTED) { //鏈接新建 log.info("connected with zookeeper"); } else if (state == ConnectionState.RECONNECTED) { log.info("reconnected with zookeeper"); } } }); System.out.println("zk初始化完成"); //獲取當前服務啓動時建立的節點,臨時有序節點,用做定時任務的執行 curTaskNodeId=createNode(CreateMode.EPHEMERAL_SEQUENTIAL,"/task/n",""); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } public void stop() { client.close(); } public CuratorFramework getClient() { return client; } /** * 建立節點 * @param mode 節點類型 * 一、PERSISTENT 持久化目錄節點,存儲的數據不會丟失。 * 二、PERSISTENT_SEQUENTIAL順序自動編號的持久化目錄節點,存儲的數據不會丟失 * 三、EPHEMERAL臨時目錄節點,一旦建立這個節點的客戶端與服務器端口也就是session 超時,這種節點會被自動刪除 *四、EPHEMERAL_SEQUENTIAL臨時自動編號節點,一旦建立這個節點的客戶端與服務器端口也就是session 超時,這種節點會被自動刪除,而且根據當前已近存在的節點數自動加 1,而後返回給客戶端已經成功建立的目錄節點名。 * @param path 節點名稱 * @param nodeData 節點數據 */ public String createNode(CreateMode mode, String path , String nodeData) { String nodepath=""; try { //使用creatingParentContainersIfNeeded()以後Curator可以自動遞歸建立全部所需的父節點 nodepath = client.create().creatingParentsIfNeeded().withMode(mode).forPath(path,nodeData.getBytes("UTF-8")); System.out.println(nodepath); } catch (Exception e) { log.error("註冊出錯", e); } return nodepath; } /** * 建立節點 * @param mode 節點類型 * 一、PERSISTENT 持久化目錄節點,存儲的數據不會丟失。 * 二、PERSISTENT_SEQUENTIAL順序自動編號的持久化目錄節點,存儲的數據不會丟失 * 三、EPHEMERAL臨時目錄節點,一旦建立這個節點的客戶端與服務器端口也就是session 超時,這種節點會被自動刪除 * 四、EPHEMERAL_SEQUENTIAL臨時自動編號節點,一旦建立這個節點的客戶端與服務器端口也就是session 超時,這種節點會被自動刪除,而且根據當前已近存在的節點數自動加 1,而後返回給客戶端已經成功建立的目錄節點名。 * @param path 節點名稱 */ public void createNode(CreateMode mode,String path ) { try { //使用creatingParentContainersIfNeeded()以後Curator可以自動遞歸建立全部所需的父節點 client.create().creatingParentsIfNeeded().withMode(mode).forPath(path); } catch (Exception e) { log.error("註冊出錯", e); } } /** * 刪除節點數據 * * @param path */ public void deleteNode(final String path) { try { deleteNode(path,true); } catch (Exception ex) { log.error("{}",ex); } } /** * 刪除節點數據 * @param path * @param deleteChildre 是否刪除子節點 */ public void deleteNode(final String path,Boolean deleteChildre){ try { if(deleteChildre){ //guaranteed()刪除一個節點,強制保證刪除, // 只要客戶端會話有效,那麼Curator會在後臺持續進行刪除操做,直到刪除節點成功 client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path); }else{ client.delete().guaranteed().forPath(path); } } catch (Exception e) { e.printStackTrace(); } } /** * 設置指定節點的數據 * @param path * @param datas */ public void setNodeData(String path, byte[] datas){ try { client.setData().forPath(path, datas); }catch (Exception ex) { log.error("{}",ex); } } /** * 獲取指定節點的數據 * @param path * @return */ public byte[] getNodeData(String path){ Byte[] bytes = null; try { if(cache != null){ ChildData data = cache.getCurrentData(path); if(data != null){ return data.getData(); } } client.getData().forPath(path); return client.getData().forPath(path); }catch (Exception ex) { log.error("{}",ex); } return null; } /** * 獲取數據時先同步 * @param path * @return */ public byte[] synNodeData(String path){ client.sync(); return getNodeData( path); } /** * 判斷路徑是否存在 * * @param path * @return */ public boolean isExistNode(final String path) { client.sync(); try { return null != client.checkExists().forPath(path); } catch (Exception ex) { return false; } } /** * 獲取節點的子節點 * @param path * @return */ public List<String> getChildren(String path) { List<String> childrenList = new ArrayList<>(); try { childrenList = client.getChildren().forPath(path); } catch (Exception e) { log.error("獲取子節點出錯", e); } return childrenList; } /** * 隨機讀取一個path子路徑, "/"爲根節點對應該namespace * 先從cache中讀取,若是沒有,再從zookeeper中查詢 * @param path * @return * @throws Exception */ public String getRandomData(String path) { try{ Map<String,ChildData> cacheMap = cache.getCurrentChildren(path); if(cacheMap != null && cacheMap.size() > 0) { log.debug("get random value from cache,path="+path); Collection<ChildData> values = cacheMap.values(); List<ChildData> list = new ArrayList<>(values); Random rand = new Random(); byte[] b = list.get(rand.nextInt(list.size())).getData(); return new String(b,"utf-8"); } if(isExistNode(path)) { log.debug("path [{}] is not exists,return null",path); return null; } else { log.debug("read random from zookeeper,path="+path); List<String> list = client.getChildren().forPath(path); if(list == null || list.size() == 0) { log.debug("path [{}] has no children return null",path); return null; } Random rand = new Random(); String child = list.get(rand.nextInt(list.size())); path = path + "/" + child; byte[] b = client.getData().forPath(path); String value = new String(b,"utf-8"); return value; } }catch(Exception e){ log.error("{}",e); } return null; } /** * 獲取讀寫鎖 * @param path * @return */ public InterProcessReadWriteLock getReadWriteLock(String path){ InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, path); return readWriteLock; } /** * 在註冊監聽器的時候,若是傳入此參數,當事件觸發時,邏輯由線程池處理 */ ExecutorService pool = Executors.newFixedThreadPool(2); /** * 監聽數據節點的變化狀況 * @param watchPath * @param listener */ public void watchPath(String watchPath,TreeCacheListener listener){ // NodeCache nodeCache = new NodeCache(client, watchPath, false); TreeCache cache = new TreeCache(client, watchPath); cache.getListenable().addListener(listener,pool); try { cache.start(); } catch (Exception e) { e.printStackTrace(); } } }
三、定時任務調用
package com.blogs.client; import java.time.LocalDateTime; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component @EnableScheduling public class ScheduleTask { @Autowired private ZkClient zkClient; //添加定時任務 @Scheduled(cron = "0/5 * * * * ?") private void configureTasks() { System.out.println("開始執行任務"); //獲取全部節點 List<String> taskNodes=zkClient.getChildren("/task"); //查找最小節點 int minNodeNum=Integer.MAX_VALUE; for (int i = 0; i < taskNodes.size(); i++) { //節點前面有一個n,把n替換掉,剩下的轉換爲數字 int nodeNum=Integer.valueOf(taskNodes.get(i).replace("n", "")); if(nodeNum < minNodeNum){ minNodeNum = nodeNum; } System.out.println("節點:"+taskNodes.get(i)); } System.out.println("當前節點:"+zkClient.getCurTaskNodeId()); //若是最小節點 等於該服務建立的節點,則執行任務 int curNodeNum=Integer.valueOf(zkClient.getCurTaskNodeId().substring(zkClient.getCurTaskNodeId().lastIndexOf('/') + 2)); if(minNodeNum - curNodeNum == 0){ System.out.println("執行任務"); }else { System.out.println("不執行任務"); } System.err.println("執行靜態定時任務時間: " + LocalDateTime.now()); } }
當前服務建立的服務爲節點最小的,則執行服務,不然不執行服務
把服務的端口分別修改成:8080,8081,模擬啓動兩個服務,查看定時任務的執行狀況
當把兩個服務的任何一個服務關閉,定時任務還能夠正常執行。