https://github.com/baidu/uid-generatorhtml
uid-generator是基於Twitter開源的snowflake算法實現的。git
snowflake將long的64位分爲了3部分,時間戳、工做機器id和序列號,位數分配以下。github
其中,時間戳部分的時間單位通常爲毫秒。也就是說1臺工做機器1毫秒可產生4096個id(2的12次方)。算法
與原始的snowflake算法不一樣,uid-generator支持自定義時間戳、工做機器id和序列號等各部分的位數,以應用於不一樣場景。默認分配方式以下。數據庫
sign(1bit)
固定1bit符號標識,即生成的UID爲正數。編程
delta seconds (28 bits)
當前時間,相對於時間基點"2016-05-20"的增量值,單位:秒,最多可支持約8.7年(注意:1. 這裏的單位是秒,而不是毫秒! 2.注意這裏的用詞,是「最多」可支持8.7年,爲何是「最多」,後面會講)數組
worker id (22 bits)
機器id,最多可支持約420w次機器啓動。內置實現爲在啓動時由數據庫分配,默認分配策略爲用後即棄,後續可提供複用策略。緩存
sequence (13 bits)
每秒下的併發序列,13 bits可支持每秒8192個併發。(注意下這個地方,默認支持qps最大爲8192個)併發
DefaultUidGenerator的產生id的方法與基本上就是常見的snowflake算法實現,僅有一些不一樣,如以秒爲爲單位而不是毫秒。框架
DefaultUidGenerator的產生id的方法以下。
protected synchronized long nextId() { long currentSecond = getCurrentSecond(); // Clock moved backwards, refuse to generate uid if (currentSecond < lastSecond) { long refusedSeconds = lastSecond - currentSecond; throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds); } // At the same second, increase sequence if (currentSecond == lastSecond) { sequence = (sequence + 1) & bitsAllocator.getMaxSequence(); // Exceed the max sequence, we wait the next second to generate uid if (sequence == 0) { currentSecond = getNextSecond(lastSecond); } // At the different second, sequence restart from zero } else { sequence = 0L; } lastSecond = currentSecond; // Allocate bits for UID return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence); }
CachedUidGenerator支持緩存生成的id。
關於CachedUidGenerator,文檔上是這樣介紹的。
使用RingBuffer緩存生成的id。RingBuffer是個環形數組,默認大小爲8192個,裏面緩存着生成的id。
會從ringbuffer中拿一個id,支持併發獲取
RingBuffer填充時機
程序啓動時,將RingBuffer填充滿,緩存着8192個id
在調用getUID()獲取id時,檢測到RingBuffer中的剩餘id個數小於總個數的50%,將RingBuffer填充滿,使其緩存8192個id
定時填充(可配置是否使用以及定時任務的週期)
由於delta seconds部分是以秒爲單位的,因此1個worker 1秒內最多生成的id書爲8192個(2的13次方)。
從上可知,支持的最大qps爲8192,因此經過緩存id來提升吞吐量。
爲何叫藉助將來時間?
由於每秒最多生成8192個id,當1秒獲取id數多於8192時,RingBuffer中的id很快消耗完畢,在填充RingBuffer時,生成的id的delta seconds 部分只能使用將來的時間。
(由於使用了將來的時間來生成id,因此上面說的是,【最多】可支持約8.7年)
@Override public long getUID() { try { return ringBuffer.take(); } catch (Exception e) { LOGGER.error("Generate unique id exception. ", e); throw new UidGenerateException(e); } }
(注意:這裏的RingBuffer不是Disruptor框架中的RingBuffer,可是藉助了不少Disruptor中RingBuffer的設計思想,好比使用緩存行填充解決僞共享問題)
RingBuffer爲環形數組,默認容量爲sequence可容納的最大值(8192個),能夠經過boostPower參數設置大小。
tail指針、Cursor指針用於環形數組上讀寫slot:
Tail指針
表示Producer生產的最大序號(此序號從0開始,持續遞增)。Tail不能超過Cursor,即生產者不能覆蓋未消費的slot。當Tail已遇上curosr,此時可經過rejectedPutBufferHandler指定PutRejectPolicy
Cursor指針
表示Consumer消費到的最小序號(序號序列與Producer序列相同)。Cursor不能超過Tail,即不能消費未生產的slot。當Cursor已遇上tail,此時可經過rejectedTakeBufferHandler指定TakeRejectPolicy
CachedUidGenerator採用了雙RingBuffer,Uid-RingBuffer用於存儲Uid、Flag-RingBuffer用於存儲Uid狀態(是否可填充、是否可消費)
因爲數組元素在內存中是連續分配的,可最大程度利用CPU cache以提高性能。但同時會帶來「僞共享」FalseSharing問題,爲此在Tail、Cursor指針、Flag-RingBuffer中採用了CacheLine 補齊方式。
public class RingBuffer { private static final Logger LOGGER = LoggerFactory.getLogger(RingBuffer.class); /** Constants */ private static final int START_POINT = -1; private static final long CAN_PUT_FLAG = 0L; //用於標記當前slot的狀態,表示能夠put一個id進去 private static final long CAN_TAKE_FLAG = 1L; //用於標記當前slot的狀態,表示能夠take一個id public static final int DEFAULT_PADDING_PERCENT = 50; //用於控制什麼時候填充slots的默認閾值:當剩餘的可用的slot的個數,小於bufferSize的50%時,須要生成id將slots填滿 /** The size of RingBuffer's slots, each slot hold a UID */ private final int bufferSize; //slots的大小,默認爲sequence可容量的最大值,即8192個 private final long indexMask; private final long[] slots; //slots用於緩存已經生成的id private final PaddedAtomicLong[] flags; //flags用於存儲id的狀態(是否可填充、是否可消費) /** Tail: last position sequence to produce */ //Tail指針 //表示Producer生產的最大序號(此序號從0開始,持續遞增)。Tail不能超過Cursor,即生產者不能覆蓋未消費的slot。當Tail已遇上curosr,此時可經過rejectedPutBufferHandler指定PutRejectPolicy private final AtomicLong tail = new PaddedAtomicLong(START_POINT); // /** Cursor: current position sequence to consume */ //表示Consumer消費到的最小序號(序號序列與Producer序列相同)。Cursor不能超過Tail,即不能消費未生產的slot。當Cursor已遇上tail,此時可經過rejectedTakeBufferHandler指定TakeRejectPolicy private final AtomicLong cursor = new PaddedAtomicLong(START_POINT); /** Threshold for trigger padding buffer*/ private final int paddingThreshold; //用於控制什麼時候填充slots的閾值 /** Reject put/take buffer handle policy */ //當slots滿了,沒法繼續put時的處理策略。默認實現:沒法進行put,僅記錄日誌 private RejectedPutBufferHandler rejectedPutHandler = this::discardPutBuffer; //當slots空了,沒法繼續take時的處理策略。默認實現:僅拋出異常 private RejectedTakeBufferHandler rejectedTakeHandler = this::exceptionRejectedTakeBuffer; /** Executor of padding buffer */ //用於運行【生成id將slots填滿】任務 private BufferPaddingExecutor bufferPaddingExecutor;
RingBuffer填充時機
程序啓動時,將RingBuffer填充滿,緩存着8192個id
在調用getUID()獲取id時,檢測到RingBuffer中的剩餘id個數小於總個數的50%,將RingBuffer填充滿,使其緩存8192個id
定時填充(可配置是否使用以及定時任務的週期)
填充RingBuffer
/** * Padding buffer fill the slots until to catch the cursor */ public void paddingBuffer() { LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer); // is still running if (!running.compareAndSet(false, true)) { LOGGER.info("Padding buffer is still running. {}", ringBuffer); return; } // fill the rest slots until to catch the cursor boolean isFullRingBuffer = false; while (!isFullRingBuffer) { //獲取生成的id,放到RingBuffer中。 List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet()); for (Long uid : uidList) { isFullRingBuffer = !ringBuffer.put(uid); if (isFullRingBuffer) { break; } } } // not running now running.compareAndSet(true, false); LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer); }
生成id(上面代碼中的uidProvider.provide調用的就是這個方法)
/** * Get the UIDs in the same specified second under the max sequence * * @param currentSecond * @return UID list, size of {@link BitsAllocator#getMaxSequence()} + 1 */ protected List<Long> nextIdsForOneSecond(long currentSecond) { // Initialize result list size of (max sequence + 1) int listSize = (int) bitsAllocator.getMaxSequence() + 1; List<Long> uidList = new ArrayList<>(listSize); // Allocate the first sequence of the second, the others can be calculated with the offset //這裏的實現很取巧 //由於1秒內生成的id是連續的,因此利用第1個id來生成後面的id,而不用頻繁調用snowflake算法 long firstSeqUid = bitsAllocator.allocate(currentSecond - epochSeconds, workerId, 0L); for (int offset = 0; offset < listSize; offset++) { uidList.add(firstSeqUid + offset); } return uidList; }
填充緩存行解決「僞共享」
關於僞共享,能夠參考這篇文章《僞共享(false sharing),併發編程無聲的性能殺手》
//數組在物理上是連續存儲的,flags數組用來保存id的狀態(是否可消費、是否可填充),在填入id和消費id時,會被頻繁的修改。 //若是不進行緩存行填充,會致使頻繁的緩存行失效,直接從內存中讀數據。 private final PaddedAtomicLong[] flags; //tail和cursor都使用緩存行填充,是爲了不tail和cursor落到同一個緩存行上。 /** Tail: last position sequence to produce */ private final AtomicLong tail = new PaddedAtomicLong(START_POINT); /** Cursor: current position sequence to consume */ private final AtomicLong cursor = new PaddedAtomicLong(START_POINT)
/** * Represents a padded {@link AtomicLong} to prevent the FalseSharing problem<p> * * The CPU cache line commonly be 64 bytes, here is a sample of cache line after padding:<br> * 64 bytes = 8 bytes (object reference) + 6 * 8 bytes (padded long) + 8 bytes (a long value) * @author yutianbao */ public class PaddedAtomicLong extends AtomicLong { private static final long serialVersionUID = -3415778863941386253L; /** Padded 6 long (48 bytes) */ public volatile long p1, p2, p3, p4, p5, p6 = 7L; /** * Constructors from {@link AtomicLong} */ public PaddedAtomicLong() { super(); } public PaddedAtomicLong(long initialValue) { super(initialValue); } /** * To prevent GC optimizations for cleaning unused padded references */ public long sumPaddingToPreventOptimization() { return p1 + p2 + p3 + p4 + p5 + p6; } }
能夠參考下面文章
一個Java對象到底佔用多大內存?http://www.javashuo.com/article/p-qarlzynv-en.html
寫Java也得了解CPU--僞共享 http://www.javashuo.com/article/p-wzchvjzp-dy.html