分佈式ID生成器解決方案

1、分佈式系統帶來ID生成挑戰java

在複雜的系統中,每每須要對大量的數據如訂單,帳戶進行標識,以一個有意義的有序的序列號來做爲全局惟一的ID;node

而分佈式系統中咱們對ID生成器要求又有哪些呢?redis

  1. 全局惟一性:不能出現重複的ID號,既然是惟一標識,這是最基本的要求。算法

  2. 遞增:比較低要求的條件爲趨勢遞增,即保證下一個ID必定大於上一個ID,而比較苛刻的要求是連續遞增,如1,2,3等等。數據庫

  3. 高可用高性能:ID生成事關重大,一旦掛掉系統崩潰;高性能是指必需要在壓測下表現良好,若是達不到要求則在高併發環境下依然會致使系統癱瘓。apache

2、業內方案簡介

1. UUID方案

優勢:緩存

可以保證獨立性,程序能夠在不一樣的數據庫間遷移,效果不受影響。安全

保證生成的ID不只是表獨立的,並且是庫獨立的,這點在你想切分數據庫的時候尤其重要。服務器

缺點:網絡

1. 性能爲題:UUID太長,一般以36長度的字符串表示,對MySQL索引不利:若是做爲數據庫主鍵,在InnoDB引擎下,UUID的無序性可能會引發數據位置頻繁變更,嚴重影響性能

2. UUID無業務含義:不少須要ID能標識業務含義的地方不使用

3.不知足遞增要求

2. snowflake方案

snowflake是twitter開源的分佈式ID生成系統。 Twitter每秒有數十萬條消息的請求,每條消息都必須分配一條惟一的id,這些id還須要一些大體的順序(方便客戶端排序),而且在分佈式系統中不一樣機器產生的id必須不一樣。

snowflake的結構以下(每部分用-分開):

0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 – 000000000000

第一位爲未使用,接下來的41位爲毫秒級時間(41位的長度可使用69年),而後是5位datacenterId和5位workerId(10位的長度最多支持部署1024個節點) ,最後12位是毫秒內的計數(12位的計數順序號支持每一個節點每毫秒產生4096個ID序號)

一共加起來恰好64位,爲一個Long型。(轉換成字符串長度爲18)

snowflake生成的ID總體上按照時間自增排序,而且整個分佈式系統內不會產生ID碰撞(由datacenter和workerId做區分),而且效率較高。snowflake的缺點是:

  1. 強依賴時鐘,若是主機時間回撥,則會形成重複ID,會產生
  2. ID雖然有序,可是不連續

snowflake如今有較好的改良方案,好比美團點評開源的分佈式ID框架:leaf,經過使用ZooKeeper解決了時鐘依賴問題。

snowflake的關鍵源碼以下:

  1. /** 
  2.  * Twitter_Snowflake<br> 
  3.  * SnowFlake的結構以下(每部分用-分開):<br> 
  4.  * 0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000 <br> 
  5.  * 1位標識,因爲long基本類型在Java中是帶符號的,最高位是符號位,正數是0,負數是1,因此id通常是正數,最高位是0<br> 
  6.  * 41位時間截(毫秒級),注意,41位時間截不是存儲當前時間的時間截,而是存儲時間截的差值(當前時間截 - 開始時間截) 
  7.  * 獲得的值),這裏的的開始時間截,通常是咱們的id生成器開始使用的時間,由咱們程序來指定的(以下下面程序IdWorker類的startTime屬性)。41位的時間截,可使用69年,年T = (1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69<br> 
  8.  * 10位的數據機器位,能夠部署在1024個節點,包括5位datacenterId和5位workerId<br> 
  9.  * 12位序列,毫秒內的計數,12位的計數順序號支持每一個節點每毫秒(同一機器,同一時間截)產生4096個ID序號<br> 
  10.  * 加起來恰好64位,爲一個Long型。<br> 
  11.  * SnowFlake的優勢是,總體上按照時間自增排序,而且整個分佈式系統內不會產生ID碰撞(由數據中心ID和機器ID做區分),而且效率較高,經測試,SnowFlake每秒可以產生26萬ID左右。 
  12.  */  
  13. public class SnowflakeIdWorker {  
  14.     // ==============================Fields===========================================  
  15.     /** 開始時間截 (2015-01-01) */  
  16.     private final long twepoch = 1420041600000L;  
  17.     /** 機器id所佔的位數 */  
  18.     private final long workerIdBits = 5L;  
  19.     /** 數據標識id所佔的位數 */  
  20.     private final long datacenterIdBits = 5L;  
  21.     /** 支持的最大機器id,結果是31 (這個移位算法能夠很快的計算出幾位二進制數所能表示的最大十進制數) */  
  22.     private final long maxWorkerId = -1L ^ (-1L << workerIdBits);  
  23.     /** 支持的最大數據標識id,結果是31 */  
  24.     private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);  
  25.     /** 序列在id中佔的位數 */  
  26.     private final long sequenceBits = 12L;  
  27.     /** 機器ID向左移12位 */  
  28.     private final long workerIdShift = sequenceBits;  
  29.     /** 數據標識id向左移17位(12+5) */  
  30.     private final long datacenterIdShift = sequenceBits + workerIdBits;  
  31.     /** 時間截向左移22位(5+5+12) */  
  32.     private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;  
  33.     /** 生成序列的掩碼,這裏爲4095 (0b111111111111=0xfff=4095) */  
  34.     private final long sequenceMask = -1L ^ (-1L << sequenceBits);  
  35.     /** 工做機器ID(0~31) */  
  36.     private long workerId;  
  37.     /** 數據中心ID(0~31) */  
  38.     private long datacenterId;  
  39.     /** 毫秒內序列(0~4095) */  
  40.     private long sequence = 0L;  
  41.     /** 上次生成ID的時間截 */  
  42.     private long lastTimestamp = -1L;  
  43.     //==============================Constructors=====================================  
  44.     /** 
  45.      * 構造函數 
  46.      * @param workerId 工做ID (0~31) 
  47.      * @param datacenterId 數據中心ID (0~31) 
  48.      */  
  49.     public SnowflakeIdWorker(long workerId, long datacenterId) {  
  50.         if (workerId > maxWorkerId || workerId < 0) {  
  51.             throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));  
  52.         }  
  53.         if (datacenterId > maxDatacenterId || datacenterId < 0) {  
  54.             throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));  
  55.         }  
  56.         this.workerId = workerId;  
  57.         this.datacenterId = datacenterId;  
  58.     }  
  59.     // ==============================Methods==========================================  
  60.     /** 
  61.      * 得到下一個ID (該方法是線程安全的) 
  62.      * @return SnowflakeId 
  63.      */  
  64.     public synchronized long nextId() {  
  65.         long timestamp = timeGen();  
  66.         //若是當前時間小於上一次ID生成的時間戳,說明系統時鐘回退過這個時候應當拋出異常  
  67.         if (timestamp < lastTimestamp) {  
  68.             throw new RuntimeException(  
  69.                     String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));  
  70.         }  
  71.         //若是是同一時間生成的,則進行毫秒內序列  
  72.         if (lastTimestamp == timestamp) {  
  73.             sequence = (sequence + 1) & sequenceMask;  
  74.             //毫秒內序列溢出  
  75.             if (sequence == 0) {  
  76.                 //阻塞到下一個毫秒,得到新的時間戳  
  77.                 timestamp = tilNextMillis(lastTimestamp);  
  78.             }  
  79.         }  
  80.         //時間戳改變,毫秒內序列重置  
  81.         else {  
  82.             sequence = 0L;  
  83.         }  
  84.         //上次生成ID的時間截  
  85.         lastTimestamp = timestamp;  
  86.         //移位並經過或運算拼到一塊兒組成64位的ID  
  87.         return ((timestamp - twepoch) << timestampLeftShift) //  
  88.                 | (datacenterId << datacenterIdShift) //  
  89.                 | (workerId << workerIdShift) //  
  90.                 | sequence;  
  91.     }  
  92.     /** 
  93.      * 阻塞到下一個毫秒,直到得到新的時間戳 
  94.      * @param lastTimestamp 上次生成ID的時間截 
  95.      * @return 當前時間戳 
  96.      */  
  97.     protected long tilNextMillis(long lastTimestamp) {  
  98.         long timestamp = timeGen();  
  99.         while (timestamp <= lastTimestamp) {  
  100.             timestamp = timeGen();  
  101.         }  
  102.         return timestamp;  
  103.     }  
  104.     /** 
  105.      * 返回以毫秒爲單位的當前時間 
  106.      * @return 當前時間(毫秒) 
  107.      */  
  108.     protected long timeGen() {  
  109.         return System.currentTimeMillis();  
  110.     }  
  111.     //==============================Test=============================================  
  112.     /** 測試 */  
  113.     public static void main(String[] args) throws InterruptedException {  
  114.         SnowflakeIdWorker idWorker = new SnowflakeIdWorker(0, 0);  
  115.         for (int i = 0; i < 100; i++) {  
  116.             long id = idWorker.nextId();  
  117.             //System.out.println(Long.toBinaryString(id));  
  118.             Thread.sleep(1);  
  119.             System.out.println(id);  
  120.         }  
  121.     }  
  122. }  
/**
 * Twitter_Snowflake<br>
 * SnowFlake的結構以下(每部分用-分開):<br>
 * 0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000 <br>
 * 1位標識,因爲long基本類型在Java中是帶符號的,最高位是符號位,正數是0,負數是1,因此id通常是正數,最高位是0<br>
 * 41位時間截(毫秒級),注意,41位時間截不是存儲當前時間的時間截,而是存儲時間截的差值(當前時間截 - 開始時間截)
 * 獲得的值),這裏的的開始時間截,通常是咱們的id生成器開始使用的時間,由咱們程序來指定的(以下下面程序IdWorker類的startTime屬性)。41位的時間截,可使用69年,年T = (1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69<br>
 * 10位的數據機器位,能夠部署在1024個節點,包括5位datacenterId和5位workerId<br>
 * 12位序列,毫秒內的計數,12位的計數順序號支持每一個節點每毫秒(同一機器,同一時間截)產生4096個ID序號<br>
 * 加起來恰好64位,爲一個Long型。<br>
 * SnowFlake的優勢是,總體上按照時間自增排序,而且整個分佈式系統內不會產生ID碰撞(由數據中心ID和機器ID做區分),而且效率較高,經測試,SnowFlake每秒可以產生26萬ID左右。
 */
public class SnowflakeIdWorker {
    // ==============================Fields===========================================
    /** 開始時間截 (2015-01-01) */
    private final long twepoch = 1420041600000L;
    /** 機器id所佔的位數 */
    private final long workerIdBits = 5L;
    /** 數據標識id所佔的位數 */
    private final long datacenterIdBits = 5L;
    /** 支持的最大機器id,結果是31 (這個移位算法能夠很快的計算出幾位二進制數所能表示的最大十進制數) */
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
    /** 支持的最大數據標識id,結果是31 */
    private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
    /** 序列在id中佔的位數 */
    private final long sequenceBits = 12L;
    /** 機器ID向左移12位 */
    private final long workerIdShift = sequenceBits;
    /** 數據標識id向左移17位(12+5) */
    private final long datacenterIdShift = sequenceBits + workerIdBits;
    /** 時間截向左移22位(5+5+12) */
    private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
    /** 生成序列的掩碼,這裏爲4095 (0b111111111111=0xfff=4095) */
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);
    /** 工做機器ID(0~31) */
    private long workerId;
    /** 數據中心ID(0~31) */
    private long datacenterId;
    /** 毫秒內序列(0~4095) */
    private long sequence = 0L;
    /** 上次生成ID的時間截 */
    private long lastTimestamp = -1L;
    //==============================Constructors=====================================
    /**
     * 構造函數
     * @param workerId 工做ID (0~31)
     * @param datacenterId 數據中心ID (0~31)
     */
    public SnowflakeIdWorker(long workerId, long datacenterId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
        }
        this.workerId = workerId;
        this.datacenterId = datacenterId;
    }
    // ==============================Methods==========================================
    /**
     * 得到下一個ID (該方法是線程安全的)
     * @return SnowflakeId
     */
    public synchronized long nextId() {
        long timestamp = timeGen();
        //若是當前時間小於上一次ID生成的時間戳,說明系統時鐘回退過這個時候應當拋出異常
        if (timestamp < lastTimestamp) {
            throw new RuntimeException(
                    String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }
        //若是是同一時間生成的,則進行毫秒內序列
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            //毫秒內序列溢出
            if (sequence == 0) {
                //阻塞到下一個毫秒,得到新的時間戳
                timestamp = tilNextMillis(lastTimestamp);
            }
        }
        //時間戳改變,毫秒內序列重置
        else {
            sequence = 0L;
        }
        //上次生成ID的時間截
        lastTimestamp = timestamp;
        //移位並經過或運算拼到一塊兒組成64位的ID
        return ((timestamp - twepoch) << timestampLeftShift) //
                | (datacenterId << datacenterIdShift) //
                | (workerId << workerIdShift) //
                | sequence;
    }
    /**
     * 阻塞到下一個毫秒,直到得到新的時間戳
     * @param lastTimestamp 上次生成ID的時間截
     * @return 當前時間戳
     */
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }
    /**
     * 返回以毫秒爲單位的當前時間
     * @return 當前時間(毫秒)
     */
    protected long timeGen() {
        return System.currentTimeMillis();
    }
    //==============================Test=============================================
    /** 測試 */
    public static void main(String[] args) throws InterruptedException {
        SnowflakeIdWorker idWorker = new SnowflakeIdWorker(0, 0);
        for (int i = 0; i < 100; i++) {
            long id = idWorker.nextId();
            //System.out.println(Long.toBinaryString(id));
            Thread.sleep(1);
            System.out.println(id);
        }
    }
}

3. 基於數據庫方案

 

利用數據庫生成ID是最多見的方案。可以確保ID全數據庫惟一。其優缺點以下:

優勢:

  • 很是簡單,利用現有數據庫系統的功能實現,成本小,有DBA專業維護。

  • ID號單調自增,能夠實現一些對ID有特殊要求的業務。

缺點:

  • 不一樣數據庫語法和實現不一樣,數據庫遷移的時候或多數據庫版本支持的時候須要處理。

  • 在單個數據庫或讀寫分離或一主多從的狀況下,只有一個主庫能夠生成。有單點故障的風險。
  • 在性能達不到要求的狀況下,比較難於擴展。
  • 若是涉及多個系統須要合併或者數據遷移會比較麻煩。
  • 分表分庫的時候會有麻煩。

4.其餘方案簡介

經過Redis生成ID(主要經過redis的自增函數)、ZooKeeper生成ID、MongoDB的ObjectID等都可實現惟一性的要求

3、咱們在實際應用中經歷的方案

1. 方案簡介

 實際業務中,除了分佈式ID全局惟一以外,還有是否趨勢/連續遞增的要求。根據具體業務需求的不一樣,有兩種可選方案。

一是隻保證全局惟一,不保證連續遞增。二是既保證全局惟一,又保證連續遞增。

2. 基於ZooKeeper和本地緩存的方案

基於zookeeper分佈式ID實現方案有不少種,本方案只使用ZooKeeper做爲分段節點協調工具。每臺服務器首先從zookeeper緩存一段,如1-1000的id,

此時zk上保存最大值1000,每次獲取的時候都會進行判斷,若是id<=1000,則更新本地的當前值,若是爲1001,則會將zk上的最大值更新至2000,本地緩存

段更新爲1001-2000,更新的時候使用curator的分佈式鎖來實現。

因爲ID是從本機獲取,所以本方案的優勢是性能很是好。缺點是若是多主機負載均衡,則會出現不連續的id,固然將遞增區段設置爲1也能保證連續的id,

可是效率會受到很大影響。實現關鍵源碼以下:

  1. import org.apache.curator.framework.CuratorFramework;  
  2. import org.apache.curator.framework.CuratorFrameworkFactory;  
  3. import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;  
  4. import org.apache.curator.retry.ExponentialBackoffRetry;  
  5. import org.apache.zookeeper.CreateMode;  
  6. import org.apache.zookeeper.data.Stat;  
  7. import org.slf4j.Logger;  
  8. import org.slf4j.LoggerFactory;  
  9.    
  10. import java.io.UnsupportedEncodingException;  
  11. import java.util.Map;  
  12. import java.util.concurrent.ConcurrentHashMap;  
  13.    
  14. /** 
  15.  * 根據開源項目mycat實現基於zookeeper的遞增序列號 
  16.  * <p> 
  17.  * 只要配置好ZK地址和表名的以下屬性 
  18.  * MINID 某線程當前區間內最小值 
  19.  * MAXID 某線程當前區間內最大值 
  20.  * CURID 某線程當前區間內當前值 
  21.  * 
  22.  * @author wangwanbin 
  23.  * @version 1.0 
  24.  * @time 2017/9/1 
  25.  */  
  26. public class ZKCachedSequenceHandler extends SequenceHandler {  
  27.     protected static final Logger LOGGER = LoggerFactory.getLogger(ZKCachedSequenceHandler.class);  
  28.     private static final String KEY_MIN_NAME = ".MINID";// 1  
  29.     private static final String KEY_MAX_NAME = ".MAXID";// 10000  
  30.     private static final String KEY_CUR_NAME = ".CURID";// 888  
  31.     private final static long PERIOD = 1000;//每次緩存的ID段數量  
  32.     private static ZKCachedSequenceHandler instance = new ZKCachedSequenceHandler();  
  33.    
  34.     /** 
  35.      * 私有化構造方法,單例模式 
  36.      */  
  37.     private ZKCachedSequenceHandler() {  
  38.     }  
  39.    
  40.     /** 
  41.      * 獲取sequence工具對象的惟一方法 
  42.      * 
  43.      * @return 
  44.      */  
  45.     public static ZKCachedSequenceHandler getInstance() {  
  46.         return instance;  
  47.     }  
  48.    
  49.     private Map<String, Map<String, String>> tableParaValMap = null;  
  50.    
  51.     private CuratorFramework client;  
  52.     private InterProcessSemaphoreMutex interProcessSemaphore = null;  
  53.    
  54.     public void loadZK() {  
  55.         try {  
  56.             this.client = CuratorFrameworkFactory.newClient(zkAddress, new ExponentialBackoffRetry(1000, 3));  
  57.             this.client.start();  
  58.         } catch (Exception e) {  
  59.             LOGGER.error("Error caught while initializing ZK:" + e.getCause());  
  60.         }  
  61.     }  
  62.    
  63.     public Map<String, String> getParaValMap(String prefixName) {  
  64.         if (tableParaValMap == null) {  
  65.             try {  
  66.                 loadZK();  
  67.                 fetchNextPeriod(prefixName);  
  68.             } catch (Exception e) {  
  69.                 LOGGER.error("Error caught while loding configuration within current thread:" + e.getCause());  
  70.             }  
  71.         }  
  72.         Map<String, String> paraValMap = tableParaValMap.get(prefixName);  
  73.         return paraValMap;  
  74.     }  
  75.    
  76.     public Boolean fetchNextPeriod(String prefixName) {  
  77.         try {  
  78.             Stat stat = this.client.checkExists().forPath(PATH + "/" + prefixName + SEQ);  
  79.    
  80.             if (stat == null || (stat.getDataLength() == 0)) {  
  81.                 try {  
  82.                     client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)  
  83.                             .forPath(PATH + "/" + prefixName + SEQ, String.valueOf(0).getBytes());  
  84.                 } catch (Exception e) {  
  85.                     LOGGER.debug("Node exists! Maybe other instance is initializing!");  
  86.                 }  
  87.             }  
  88.             if (interProcessSemaphore == null) {  
  89.                 interProcessSemaphore = new InterProcessSemaphoreMutex(client, PATH + "/" + prefixName + SEQ);  
  90.             }  
  91.             interProcessSemaphore.acquire();  
  92.             if (tableParaValMap == null) {  
  93.                 tableParaValMap = new ConcurrentHashMap<>();  
  94.             }  
  95.             Map<String, String> paraValMap = tableParaValMap.get(prefixName);  
  96.             if (paraValMap == null) {  
  97.                 paraValMap = new ConcurrentHashMap<>();  
  98.                 tableParaValMap.put(prefixName, paraValMap);  
  99.             }  
  100.             long now = Long.parseLong(new String(client.getData().forPath(PATH + "/" + prefixName + SEQ)));  
  101.             client.setData().forPath(PATH + "/" + prefixName + SEQ, ((now + PERIOD) + "").getBytes());  
  102.             if (now == 1) {  
  103.                 paraValMap.put(prefixName + KEY_MAX_NAME, PERIOD + "");  
  104.                 paraValMap.put(prefixName + KEY_MIN_NAME, "1");  
  105.                 paraValMap.put(prefixName + KEY_CUR_NAME, "0");  
  106.             } else {  
  107.                 paraValMap.put(prefixName + KEY_MAX_NAME, (now + PERIOD) + "");  
  108.                 paraValMap.put(prefixName + KEY_MIN_NAME, (now) + "");  
  109.                 paraValMap.put(prefixName + KEY_CUR_NAME, (now) + "");  
  110.             }  
  111.         } catch (Exception e) {  
  112.             LOGGER.error("Error caught while updating period from ZK:" + e.getCause());  
  113.         } finally {  
  114.             try {  
  115.                 interProcessSemaphore.release();  
  116.             } catch (Exception e) {  
  117.                 LOGGER.error("Error caught while realeasing distributed lock" + e.getCause());  
  118.             }  
  119.         }  
  120.         return true;  
  121.     }  
  122.    
  123.     public Boolean updateCURIDVal(String prefixName, Long val) {  
  124.         Map<String, String> paraValMap = tableParaValMap.get(prefixName);  
  125.         if (paraValMap == null) {  
  126.             throw new IllegalStateException("ZKCachedSequenceHandler should be loaded first!");  
  127.         }  
  128.         paraValMap.put(prefixName + KEY_CUR_NAME, val + "");  
  129.         return true;  
  130.     }  
  131.    
  132.     /** 
  133.      * 獲取自增ID 
  134.      * 
  135.      * @param sequenceEnum 
  136.      * @return 
  137.      */  
  138.     @Override  
  139.     public synchronized long nextId(SequenceEnum sequenceEnum) {  
  140.         String prefixName = sequenceEnum.getCode();  
  141.         Map<String, String> paraMap = this.getParaValMap(prefixName);  
  142.         if (null == paraMap) {  
  143.             throw new RuntimeException("fetch Param Values error.");  
  144.         }  
  145.         Long nextId = Long.parseLong(paraMap.get(prefixName + KEY_CUR_NAME)) + 1;  
  146.         Long maxId = Long.parseLong(paraMap.get(prefixName + KEY_MAX_NAME));  
  147.         if (nextId > maxId) {  
  148.             fetchNextPeriod(prefixName);  
  149.             return nextId(sequenceEnum);  
  150.         }  
  151.         updateCURIDVal(prefixName, nextId);  
  152.         return nextId.longValue();  
  153.     }  
  154.    
  155.     public static void main(String[] args) throws UnsupportedEncodingException {  
  156.         long startTime = System.currentTimeMillis();   //獲取開始時間  
  157.         final ZKCachedSequenceHandler sequenceHandler = getInstance();  
  158.         sequenceHandler.loadZK();  
  159.         new Thread() {  
  160.             public void run() {  
  161.                 long startTime2 = System.currentTimeMillis();   //獲取開始時間  
  162.                 for (int i = 0; i < 5000; i++) {  
  163.                     System.out.println("線程1 " + sequenceHandler.nextId(SequenceEnum.ACCOUNT));  
  164.                 }  
  165.                 long endTime2 = System.currentTimeMillis(); //獲取結束時間  
  166.                 System.out.println("程序運行時間1: " + (endTime2 - startTime2) + "ms");  
  167.             }  
  168.         }.start();  
  169.         for (int i = 0; i < 5000; i++) {  
  170.             System.out.println("線程2 " + sequenceHandler.nextId(SequenceEnum.ACCOUNT));  
  171.         }  
  172.         long endTime = System.currentTimeMillis(); //獲取結束時間  
  173.         System.out.println("程序運行時間2: " + (endTime - startTime) + "ms");  
  174.     }  
  175. }  
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
/**
 * 根據開源項目mycat實現基於zookeeper的遞增序列號
 * <p>
 * 只要配置好ZK地址和表名的以下屬性
 * MINID 某線程當前區間內最小值
 * MAXID 某線程當前區間內最大值
 * CURID 某線程當前區間內當前值
 *
 * @author wangwanbin
 * @version 1.0
 * @time 2017/9/1
 */
public class ZKCachedSequenceHandler extends SequenceHandler {
    protected static final Logger LOGGER = LoggerFactory.getLogger(ZKCachedSequenceHandler.class);
    private static final String KEY_MIN_NAME = ".MINID";// 1
    private static final String KEY_MAX_NAME = ".MAXID";// 10000
    private static final String KEY_CUR_NAME = ".CURID";// 888
    private final static long PERIOD = 1000;//每次緩存的ID段數量
    private static ZKCachedSequenceHandler instance = new ZKCachedSequenceHandler();
 
    /**
     * 私有化構造方法,單例模式
     */
    private ZKCachedSequenceHandler() {
    }
 
    /**
     * 獲取sequence工具對象的惟一方法
     *
     * @return
     */
    public static ZKCachedSequenceHandler getInstance() {
        return instance;
    }
 
    private Map<String, Map<String, String>> tableParaValMap = null;
 
    private CuratorFramework client;
    private InterProcessSemaphoreMutex interProcessSemaphore = null;
 
    public void loadZK() {
        try {
            this.client = CuratorFrameworkFactory.newClient(zkAddress, new ExponentialBackoffRetry(1000, 3));
            this.client.start();
        } catch (Exception e) {
            LOGGER.error("Error caught while initializing ZK:" + e.getCause());
        }
    }
 
    public Map<String, String> getParaValMap(String prefixName) {
        if (tableParaValMap == null) {
            try {
                loadZK();
                fetchNextPeriod(prefixName);
            } catch (Exception e) {
                LOGGER.error("Error caught while loding configuration within current thread:" + e.getCause());
            }
        }
        Map<String, String> paraValMap = tableParaValMap.get(prefixName);
        return paraValMap;
    }
 
    public Boolean fetchNextPeriod(String prefixName) {
        try {
            Stat stat = this.client.checkExists().forPath(PATH + "/" + prefixName + SEQ);
 
            if (stat == null || (stat.getDataLength() == 0)) {
                try {
                    client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                            .forPath(PATH + "/" + prefixName + SEQ, String.valueOf(0).getBytes());
                } catch (Exception e) {
                    LOGGER.debug("Node exists! Maybe other instance is initializing!");
                }
            }
            if (interProcessSemaphore == null) {
                interProcessSemaphore = new InterProcessSemaphoreMutex(client, PATH + "/" + prefixName + SEQ);
            }
            interProcessSemaphore.acquire();
            if (tableParaValMap == null) {
                tableParaValMap = new ConcurrentHashMap<>();
            }
            Map<String, String> paraValMap = tableParaValMap.get(prefixName);
            if (paraValMap == null) {
                paraValMap = new ConcurrentHashMap<>();
                tableParaValMap.put(prefixName, paraValMap);
            }
            long now = Long.parseLong(new String(client.getData().forPath(PATH + "/" + prefixName + SEQ)));
            client.setData().forPath(PATH + "/" + prefixName + SEQ, ((now + PERIOD) + "").getBytes());
            if (now == 1) {
                paraValMap.put(prefixName + KEY_MAX_NAME, PERIOD + "");
                paraValMap.put(prefixName + KEY_MIN_NAME, "1");
                paraValMap.put(prefixName + KEY_CUR_NAME, "0");
            } else {
                paraValMap.put(prefixName + KEY_MAX_NAME, (now + PERIOD) + "");
                paraValMap.put(prefixName + KEY_MIN_NAME, (now) + "");
                paraValMap.put(prefixName + KEY_CUR_NAME, (now) + "");
            }
        } catch (Exception e) {
            LOGGER.error("Error caught while updating period from ZK:" + e.getCause());
        } finally {
            try {
                interProcessSemaphore.release();
            } catch (Exception e) {
                LOGGER.error("Error caught while realeasing distributed lock" + e.getCause());
            }
        }
        return true;
    }
 
    public Boolean updateCURIDVal(String prefixName, Long val) {
        Map<String, String> paraValMap = tableParaValMap.get(prefixName);
        if (paraValMap == null) {
            throw new IllegalStateException("ZKCachedSequenceHandler should be loaded first!");
        }
        paraValMap.put(prefixName + KEY_CUR_NAME, val + "");
        return true;
    }
 
    /**
     * 獲取自增ID
     *
     * @param sequenceEnum
     * @return
     */
    @Override
    public synchronized long nextId(SequenceEnum sequenceEnum) {
        String prefixName = sequenceEnum.getCode();
        Map<String, String> paraMap = this.getParaValMap(prefixName);
        if (null == paraMap) {
            throw new RuntimeException("fetch Param Values error.");
        }
        Long nextId = Long.parseLong(paraMap.get(prefixName + KEY_CUR_NAME)) + 1;
        Long maxId = Long.parseLong(paraMap.get(prefixName + KEY_MAX_NAME));
        if (nextId > maxId) {
            fetchNextPeriod(prefixName);
            return nextId(sequenceEnum);
        }
        updateCURIDVal(prefixName, nextId);
        return nextId.longValue();
    }
 
    public static void main(String[] args) throws UnsupportedEncodingException {
        long startTime = System.currentTimeMillis();   //獲取開始時間
        final ZKCachedSequenceHandler sequenceHandler = getInstance();
        sequenceHandler.loadZK();
        new Thread() {
            public void run() {
                long startTime2 = System.currentTimeMillis();   //獲取開始時間
                for (int i = 0; i < 5000; i++) {
                    System.out.println("線程1 " + sequenceHandler.nextId(SequenceEnum.ACCOUNT));
                }
                long endTime2 = System.currentTimeMillis(); //獲取結束時間
                System.out.println("程序運行時間1: " + (endTime2 - startTime2) + "ms");
            }
        }.start();
        for (int i = 0; i < 5000; i++) {
            System.out.println("線程2 " + sequenceHandler.nextId(SequenceEnum.ACCOUNT));
        }
        long endTime = System.currentTimeMillis(); //獲取結束時間
        System.out.println("程序運行時間2: " + (endTime - startTime) + "ms");
    }
}

能夠看到,因爲不須要進行過多的網絡消耗,緩存式的zk協調方案性能至關了得,生成10000個ID僅需553ms(兩個線程耗時較長者) , 平均每一個ID=0.05ms

 3.利用zk的永久自增節點策略實現持續遞增ID

 

使用zk的永久sequence策略建立節點,並獲取返回值,而後刪除前一個節點,這樣既防止zk服務器存在過多的節點,又提升了效率;節點刪除採用線程池來統一處理,提升響應速度

優勢:能建立連續遞增的ID,又能下降ZK消耗。關鍵實現代碼以下:

  1. package com.zb.p2p.utils;  
  2.   
  3. import com.zb.p2p.enums.SequenceEnum;  
  4. import org.apache.commons.pool2.PooledObject;  
  5. import org.apache.commons.pool2.PooledObjectFactory;  
  6. import org.apache.commons.pool2.impl.DefaultPooledObject;  
  7. import org.apache.commons.pool2.impl.GenericObjectPool;  
  8. import org.apache.commons.pool2.impl.GenericObjectPoolConfig;  
  9. import org.apache.curator.framework.CuratorFramework;  
  10. import org.apache.curator.framework.CuratorFrameworkFactory;  
  11. import org.apache.curator.retry.ExponentialBackoffRetry;  
  12. import org.apache.zookeeper.CreateMode;  
  13. import org.slf4j.Logger;  
  14. import org.slf4j.LoggerFactory;  
  15.   
  16. import java.util.ArrayDeque;  
  17. import java.util.Iterator;  
  18. import java.util.Queue;  
  19. import java.util.concurrent.ConcurrentLinkedQueue;  
  20. import java.util.concurrent.CountDownLatch;  
  21. import java.util.concurrent.ExecutorService;  
  22. import java.util.concurrent.Executors;  
  23.   
  24. /** 
  25.  * 基於zk的永久型自增節點PERSISTENT_SEQUENTIAL實現 
  26.  * 每次生成節點後會使用線程池執行刪除節點任務,以減少zk的負擔 
  27.  * Created by wangwanbin on 2017/9/5. 
  28.  */  
  29. public class ZKIncreaseSequenceHandler extends SequenceHandler implements PooledObjectFactory<CuratorFramework> {  
  30.     protected static final Logger LOGGER = LoggerFactory.getLogger(ZKCachedSequenceHandler.class);  
  31.     private static ZKIncreaseSequenceHandler instance = new ZKIncreaseSequenceHandler();  
  32.     private static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);  
  33.     private GenericObjectPool genericObjectPool;  
  34.     private Queue<Long> preNodes = new ConcurrentLinkedQueue<>();  
  35.     private static String ZK_ADDRESS = ""; //192.168.0.65  
  36.     private static String PATH = "";//  /sequence/p2p  
  37.     private static String SEQ = "";//seq;  
  38.   
  39.     /** 
  40.      * 私有化構造方法,單例模式 
  41.      */  
  42.     private ZKIncreaseSequenceHandler() {  
  43.         GenericObjectPoolConfig config = new GenericObjectPoolConfig();  
  44.         config.setMaxTotal(4);  
  45.         genericObjectPool = new GenericObjectPool(this, config);  
  46.     }  
  47.   
  48.     /** 
  49.      * 獲取sequence工具對象的惟一方法 
  50.      * 
  51.      * @return 
  52.      */  
  53.     public static ZKIncreaseSequenceHandler getInstance(String zkAddress, String path, String seq) {  
  54.         ZK_ADDRESS = zkAddress;  
  55.         PATH = path;  
  56.         SEQ = seq;  
  57.         return instance;  
  58.     }  
  59.   
  60.     @Override  
  61.     public long nextId(final SequenceEnum sequenceEnum) {  
  62.         String result = createNode(sequenceEnum.getCode());  
  63.         final String idstr = result.substring((PATH + "/" + sequenceEnum.getCode() + "/" + SEQ).length());  
  64.         final long id = Long.parseLong(idstr);  
  65.         preNodes.add(id);  
  66.         //刪除上一個節點  
  67.         fixedThreadPool.execute(new Runnable() {  
  68.             @Override  
  69.             public void run() {  
  70.                 Iterator<Long> iterator = preNodes.iterator();  
  71.                 if (iterator.hasNext()) {  
  72.                     long preNode = iterator.next();  
  73.                     if (preNode < id) {  
  74.                         final String format = "%0" + idstr.length() + "d";  
  75.                         String preIdstr = String.format(format, preNode);  
  76.                         final String prePath = PATH + "/" + sequenceEnum.getCode() + "/" + SEQ + preIdstr;  
  77.                         CuratorFramework client = null;  
  78.                         try {  
  79.                             client = (CuratorFramework) genericObjectPool.borrowObject();  
  80.                             client.delete().forPath(prePath);  
  81.                             preNodes.remove(preNode);  
  82.                         } catch (Exception e) {  
  83.                             LOGGER.error("delete preNode error", e);  
  84.                         } finally {  
  85.                             if (client != null)  
  86.                                 genericObjectPool.returnObject(client);  
  87.                         }  
  88.                     }  
  89.                 }  
  90.             }  
  91.         });  
  92.         return id;  
  93.     }  
  94.   
  95.   
  96.     private String createNode(String prefixName) {  
  97.         CuratorFramework client = null;  
  98.         try {  
  99.             client = (CuratorFramework) genericObjectPool.borrowObject();  
  100.             String result = client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL)  
  101.                     .forPath(PATH + "/" + prefixName + "/" + SEQ, String.valueOf(0).getBytes());  
  102.             return result;  
  103.         } catch (Exception e) {  
  104.             throw new RuntimeException("create zookeeper node error", e);  
  105.         } finally {  
  106.             if (client != null)  
  107.                 genericObjectPool.returnObject(client);  
  108.         }  
  109.     }  
  110.   
  111.     public static void main(String[] args) {  
  112.         ExecutorService executorService = Executors.newFixedThreadPool(1);  
  113.         long startTime = System.currentTimeMillis();   //獲取開始時間  
  114.         final ZKIncreaseSequenceHandler sequenceHandler = ZKIncreaseSequenceHandler.getInstance("192.168.0.65", "/sequence/p2p", "seq");  
  115.         int count = 10;  
  116.         final CountDownLatch cd = new CountDownLatch(count);  
  117.         for (int i = 0; i < count; i++) {  
  118.             executorService.execute(new Runnable() {  
  119.                 public void run() {  
  120.                     System.out.printf("線程 %s %d \n", Thread.currentThread().getId(), sequenceHandler.nextId(SequenceEnum.ORDER));  
  121.                     cd.countDown();  
  122.                 }  
  123.             });  
  124.         }  
  125.         try {  
  126.             cd.await();  
  127.         } catch (InterruptedException e) {  
  128.             LOGGER.error("Interrupted thread",e);  
  129.             Thread.currentThread().interrupt();  
  130.         }  
  131.         long endTime = System.currentTimeMillis(); //獲取結束時間  
  132.         System.out.println("程序運行時間: " + (endTime - startTime) + "ms");  
  133.   
  134.     }  
  135.   
  136.     @Override  
  137.     public PooledObject<CuratorFramework> makeObject() throws Exception {  
  138.         CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new ExponentialBackoffRetry(1000, 3));  
  139.         client.start();  
  140.         return new DefaultPooledObject<>(client);  
  141.     }  
  142.   
  143.     @Override  
  144.     public void destroyObject(PooledObject<CuratorFramework> p) throws Exception {  
  145.   
  146.     }  
  147.   
  148.     @Override  
  149.     public boolean validateObject(PooledObject<CuratorFramework> p) {  
  150.         return false;  
  151.     }  
  152.   
  153.     @Override  
  154.     public void activateObject(PooledObject<CuratorFramework> p) throws Exception {  
  155.   
  156.     }  
  157.   
  158.     @Override  
  159.     public void passivateObject(PooledObject<CuratorFramework> p) throws Exception {  
  160.   
  161.     }  
  162. }  
package com.zb.p2p.utils;

import com.zb.p2p.enums.SequenceEnum;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 基於zk的永久型自增節點PERSISTENT_SEQUENTIAL實現
 * 每次生成節點後會使用線程池執行刪除節點任務,以減少zk的負擔
 * Created by wangwanbin on 2017/9/5.
 */
public class ZKIncreaseSequenceHandler extends SequenceHandler implements PooledObjectFactory<CuratorFramework> {
    protected static final Logger LOGGER = LoggerFactory.getLogger(ZKCachedSequenceHandler.class);
    private static ZKIncreaseSequenceHandler instance = new ZKIncreaseSequenceHandler();
    private static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);
    private GenericObjectPool genericObjectPool;
    private Queue<Long> preNodes = new ConcurrentLinkedQueue<>();
    private static String ZK_ADDRESS = ""; //192.168.0.65
    private static String PATH = "";//  /sequence/p2p
    private static String SEQ = "";//seq;

    /**
     * 私有化構造方法,單例模式
     */
    private ZKIncreaseSequenceHandler() {
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        config.setMaxTotal(4);
        genericObjectPool = new GenericObjectPool(this, config);
    }

    /**
     * 獲取sequence工具對象的惟一方法
     *
     * @return
     */
    public static ZKIncreaseSequenceHandler getInstance(String zkAddress, String path, String seq) {
        ZK_ADDRESS = zkAddress;
        PATH = path;
        SEQ = seq;
        return instance;
    }

    @Override
    public long nextId(final SequenceEnum sequenceEnum) {
        String result = createNode(sequenceEnum.getCode());
        final String idstr = result.substring((PATH + "/" + sequenceEnum.getCode() + "/" + SEQ).length());
        final long id = Long.parseLong(idstr);
        preNodes.add(id);
        //刪除上一個節點
        fixedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                Iterator<Long> iterator = preNodes.iterator();
                if (iterator.hasNext()) {
                    long preNode = iterator.next();
                    if (preNode < id) {
                        final String format = "%0" + idstr.length() + "d";
                        String preIdstr = String.format(format, preNode);
                        final String prePath = PATH + "/" + sequenceEnum.getCode() + "/" + SEQ + preIdstr;
                        CuratorFramework client = null;
                        try {
                            client = (CuratorFramework) genericObjectPool.borrowObject();
                            client.delete().forPath(prePath);
                            preNodes.remove(preNode);
                        } catch (Exception e) {
                            LOGGER.error("delete preNode error", e);
                        } finally {
                            if (client != null)
                                genericObjectPool.returnObject(client);
                        }
                    }
                }
            }
        });
        return id;
    }


    private String createNode(String prefixName) {
        CuratorFramework client = null;
        try {
            client = (CuratorFramework) genericObjectPool.borrowObject();
            String result = client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL)
                    .forPath(PATH + "/" + prefixName + "/" + SEQ, String.valueOf(0).getBytes());
            return result;
        } catch (Exception e) {
            throw new RuntimeException("create zookeeper node error", e);
        } finally {
            if (client != null)
                genericObjectPool.returnObject(client);
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        long startTime = System.currentTimeMillis();   //獲取開始時間
        final ZKIncreaseSequenceHandler sequenceHandler = ZKIncreaseSequenceHandler.getInstance("192.168.0.65", "/sequence/p2p", "seq");
        int count = 10;
        final CountDownLatch cd = new CountDownLatch(count);
        for (int i = 0; i < count; i++) {
            executorService.execute(new Runnable() {
                public void run() {
                    System.out.printf("線程 %s %d \n", Thread.currentThread().getId(), sequenceHandler.nextId(SequenceEnum.ORDER));
                    cd.countDown();
                }
            });
        }
        try {
            cd.await();
        } catch (InterruptedException e) {
            LOGGER.error("Interrupted thread",e);
            Thread.currentThread().interrupt();
        }
        long endTime = System.currentTimeMillis(); //獲取結束時間
        System.out.println("程序運行時間: " + (endTime - startTime) + "ms");

    }

    @Override
    public PooledObject<CuratorFramework> makeObject() throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new ExponentialBackoffRetry(1000, 3));
        client.start();
        return new DefaultPooledObject<>(client);
    }

    @Override
    public void destroyObject(PooledObject<CuratorFramework> p) throws Exception {

    }

    @Override
    public boolean validateObject(PooledObject<CuratorFramework> p) {
        return false;
    }

    @Override
    public void activateObject(PooledObject<CuratorFramework> p) throws Exception {

    }

    @Override
    public void passivateObject(PooledObject<CuratorFramework> p) throws Exception {

    }
}

測試結果以下,生成10000個ID消耗=9443ms(兩個線程耗時較長者),  平均每一個ID=0.9ms

這還只是單zk鏈接的狀況下,若是使用鏈接池來維護多個zk的連接,效率將成倍的提高

相關文章
相關標籤/搜索