最近對snowflake比較感興趣,就看了一些分佈式惟一ID生成器(發號器)的開源項目的源碼,例如百度的uid-generator,美團的leaf。大體看了一遍後感受uid-generator代碼寫的要更好一些,十分的精煉,短小精悍。git
正好手頭有個任務要搞個發號器,百度的這個源碼是不能直接運行起來提供服務的,爲了練練手,就把百度的uid-generator遷移到spring boot上重寫了一遍。github
代碼基本如出一轍,就作了一些工程化的東西,讓uid-generator能以服務的形式跑起來,經過http接口對外提供服務。算法
可運行的代碼地址:點這裏spring
這裏借用一下uid-generator的圖:數據庫
這一個結構就是一個snowflake算法裏的id,共計64位,就是一個long。數據結構
當前時間,相對於時間基點"2016-05-20"的增量值,單位:秒,最多可支持約8.7年。時間基點是本身配置的。28位即最大表示2^28的數值的秒數,換算一下就是8.7年左右。併發
機器id,最多可支持約420w次機器啓動。內置實現爲在啓動時由數據庫分配。420w = 2^22分佈式
每秒下的併發序列,13 bits可支持每秒8192個併發,即2^13個併發spring-boot
這些位數都是能夠改變的,對於不少公司來講,28位 delta seconds帶來的8.7年的最大支持時間是可預期的不夠用的,而22bit的worker id和13bit的sequence則是遠遠超出可預期的業務場景的,那麼就能夠自由的根據本身的需求,對這三個參數進行調整。工具
例如,{"workerBits":20,"timeBits":31,"seqBits":12}這樣的配置能夠68年,100W次重啓,單機每秒4096個併發的狀況,我的感受仍是比較合適的。
snowflake的實現有不少種方式,不過思想上都是同樣的。
在瞭解SnowFlake的數據結構後,就能夠來看看具體是如何生成ID的了。
其實這個過程,就是往delta seconds,sequence,worker id三個結構裏填充數據的過程。
總體類圖以下:
SnowFlakeGenerator就是基於SnowFlake算法的UidGenerator的實現類,SnowFlake的實現就是在這個類裏;
BitsAllocator就是對SnowFlake 的ID進行位操做的共聚類;
DatabaseWorkerIdAssigner就是一個基於DB自增的worker id 分配器的實現。
這個類是進行一些位操做的工具類,給每個id 的delta seconds,sequence,worker id賦值就是經過這個類來實現的。這個類有如下成員變量:
/** * Total 64 bits */ public static final int TOTAL_BITS = 1 << 6; /** * Bits for [sign-> second-> workId-> sequence] */ private int signBits = 1; private final int timestampBits; private final int workerIdBits; private final int sequenceBits; /** * Max value for workId & sequence */ private final long maxDeltaSeconds; private final long maxWorkerId; private final long maxSequence; /** * Shift for timestamp & workerId */ /** * timestamp須要位移多少位 */ private final int timestampShift; /** * workerId須要位移多少位 */ private final int workerIdShift;
其餘字段都好說,看名稱和註釋都能明白。最下面倆shift,可能如今看着有些摸不着頭腦,不過看後面的賦值過程就知道什麼叫「shift」了
構造器:
/** * Constructor with timestampBits, workerIdBits, sequenceBits<br> * The highest bit used for sign, so <code>63</code> bits for timestampBits, workerIdBits, sequenceBits */ public BitsAllocator(int timestampBits, int workerIdBits, int sequenceBits) { // make sure allocated 64 bits int allocateTotalBits = signBits + timestampBits + workerIdBits + sequenceBits; Assert.isTrue(allocateTotalBits == TOTAL_BITS, "allocate not enough 64 bits"); // initialize bits this.timestampBits = timestampBits; this.workerIdBits = workerIdBits; this.sequenceBits = sequenceBits; // initialize max value //先將-1左移timestampBits位,獲得-10000...(timestampBits-1個零) //而後取反,獲得1111(timestampBits-1)個1 //等價於2的timestampBits次方-1 this.maxDeltaSeconds = ~(-1L << timestampBits); this.maxWorkerId = ~(-1L << workerIdBits); this.maxSequence = ~(-1L << sequenceBits); // initialize shift this.timestampShift = workerIdBits + sequenceBits; this.workerIdShift = sequenceBits; }
也很簡單,重點就在 「~(-1L << timestampBits) 」這樣一坨操做,可能理解起來會有些困難。這是一連串的位操做,這裏進行一下分解:
這一通操做其實也就至關於2的timestampBits次方-1,也就是timestampBits位二進制最大能表示的數字,不過是用位運算來作的。若是不懂二進制的位移和取反,能夠百度「位操做」補充一下基礎,這裏就不展開了。
分配操做:
/** * Allocate bits for UID according to delta seconds & workerId & sequence<br> * <b>Note that: </b>The highest bit will always be 0 for sign * * 這裏就是把不一樣的字段放到相應的位上 * id的整體結構是: * sign (fixed 1bit) -> deltaSecond -> workerId -> sequence(within the same second) * deltaSecond 左移(workerIdBits + sequenceBits)位,workerId左移sequenceBits位,此時就完成了字節的分配 * @param deltaSeconds * @param workerId * @param sequence * @return */ public long allocate(long deltaSeconds, long workerId, long sequence) { return (deltaSeconds << timestampShift) | (workerId << workerIdShift) | sequence; }
這裏就是對delta seconds,sequence,worker id三個結構進行賦值的地方了,核心代碼之一。能夠再看一下最上面的圖,sequence是在最右側(最低位),因此sequence不用作位移,直接就是在對的位置;
而workerId,須要左移workerIdShift才能到正確的位置。workerIdShift看上面的構造器,就是sequenceBits,就是sequence的位數;
deltaSeconds 左移timestampShift位,也就是workerIdBits + sequenceBits;
而後對這三個位移後的值進行「或」操做,就把正確的值賦到正確的位數上了。
SnowFlake中,deltaSeconds依賴時間戳,能夠經過系統獲取;sequence能夠經過自增來控制;這倆字段都是項目能夠自給自足的,而WorkerId則必須還有一個策略來提供。
這個策略要保證每次服務啓動的時候拿到的WorkerId都能不重複,否則就有可能集羣不一樣的機器拿到不一樣的workerid,會發重複的號了;
而服務啓動又是個相對低頻的行爲,也不影響發號性能,因此能夠用DB自增ID來實現。
DatabaseWorkerIdAssigner就是依賴DB自增ID實現的workerId分配器。
代碼就不貼了,就是個簡單的save而後取到DB的自增ID。
這裏就是控制發號邏輯的地方了。
先看當作員變量和初始化部分:
@Value("${snowflake.timeBits}") protected int timeBits = 28; @Value("${snowflake.workerBits}") protected int workerBits = 22; @Value("${snowflake.seqBits}") protected int seqBits = 13; @Value("${snowflake.epochStr}") /** Customer epoch, unit as second. For example 2016-05-20 (ms: 1463673600000)*/ protected String epochStr = "2016-05-20"; protected long epochSeconds = TimeUnit.MILLISECONDS.toSeconds(1463673600000L); @Autowired @Qualifier(value = "dbWorkerIdAssigner") protected WorkerIdAssigner workerIdAssigner; /** Stable fields after spring bean initializing */ protected BitsAllocator bitsAllocator; protected long workerId; /** Volatile fields caused by nextId() */ protected long sequence = 0L; protected long lastSecond = -1L; @PostConstruct public void afterPropertiesSet() throws Exception { bitsAllocator = new BitsAllocator(timeBits,workerBits,seqBits); // initialize worker id workerId = workerIdAssigner.assignWorkerId(); if(workerId > bitsAllocator.getMaxWorkerId()){ throw new RuntimeException("Worker id " + workerId + " exceeds the max " + bitsAllocator.getMaxWorkerId()); } if (StringUtils.isNotBlank(epochStr)) { this.epochSeconds = TimeUnit.MILLISECONDS.toSeconds(DateUtils.parseByDayPattern(epochStr).getTime()); } log.info("Initialized bits(1, {}, {}, {}) for workerID:{}", timeBits, workerBits, seqBits, workerId); }
@Value注入的都是配置文件裏讀取的值。
afterPropertiesSet裏,將配置文件讀取到的值傳遞給BitsAllocator,夠造出一個對應的BitsAllocator;
而後生成一個workerId(插入一條DB記錄),初始化過程就完成了。
再看核心發號控制邏輯:
/** * Get UID * * @return UID * @throws UidGenerateException in the case: Clock moved backwards; Exceeds the max timestamp */ protected synchronized long nextId() { long currentSecond = getCurrentSecond(); // Clock moved backwards, refuse to generate uid //todo 時鐘回撥問題待解決 if (currentSecond < lastSecond) { long refusedSeconds = lastSecond - currentSecond; throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds); } // At the same second, increase sequence //同一秒內的,seq加一 if (currentSecond == lastSecond) { //seq 加一,若是大於MaxSequence,就變成0 //若是大於MaxSequence 就是seq能取到的最大值,二進制(seqBits -1)位全是1 sequence = (sequence + 1) & bitsAllocator.getMaxSequence(); // Exceed the max sequence, we wait the next second to generate uid //號發完了,等到下一秒 if (sequence == 0) { currentSecond = getNextSecond(lastSecond); } // At the different second, sequence restart from zero } else { //新的一秒,從新開始發號 sequence = 0L; } lastSecond = currentSecond; // Allocate bits for UID return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence); }
注意這是個synchronized方法,這是關鍵。
getCurrentSecond就是獲取當前以秒爲單位的時間戳;
若是currentSecond和lastSecond同樣,那說明本次發號請求不是本秒的第一次,只要將sequence直接+1便可;若是+1後大於了MaxSequence(這裏會用& bitsAllocator.getMaxSequence()設置爲0),那說明本秒的sequence已經用完了,此時請求已經超出了本秒系統的最大吞吐量,這裏須要調用getNextSecond(詳見github),來等待到下一秒;
若是currentSecond和lastSecond不同,說名本次請求是全新的一秒,這時候sequence設置爲0便可。
就是currentSecond - epochSeconds,當前時間減去初始時間的秒數。
此時,workerId,deltaSecond,sequence都已經肯定了具體的值,而後調用bitsAllocator.allocate方法,就能夠生成一個全新的ID了,至此發號完成。