PS:最近作了一個面試題解答的開源項目,你們能夠看一看,若是對你們有幫助,但願你們幫忙給一個star,謝謝你們了!html
《面試指北》項目地址:github.com/NotFound9/i…java
本文主要是對美團的分佈式ID框架Leaf的原理進行介紹,針對Leaf原項目中的一些issue,對Leaf項目進行功能加強,問題修復及優化改進,改進後的項目地址在這裏:node
Leaf項目改進計劃 github.com/NotFound9/L…mysql
7849276-4d1955394baa3c6d.png git
snowflake算法對於ID的位數是上圖這樣分配的:1位的符號位+41位時間戳+10位workID+12位序列號github
加起來一共是64個二進制位,正好與Java中的long類型的位數同樣。面試
美團的Leaf框架對於snowflake算法進行了一些位數調整,位數分配是這樣:算法
最大41位時間差+10位的workID+12位序列化sql
雖然看美團對Leaf的介紹文章裏面說數據庫
Leaf-snowflake方案徹底沿用snowflake方案的bit位設計,便是「1+41+10+12」的方式組裝ID號。
其實看代碼裏面是沒有專門設置符號位的,若是timestamp過大,致使時間差佔用42個二進制位,時間差的第一位爲1時,可能生成的id轉換爲十進制後會是負數:
//timestampLeftShift是22,workerIdShift是12
long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
複製代碼
由於時間戳是以1970年01月01日00時00分00秒做爲起始點,其實咱們通常取的時間戳實際上是起始點到如今的時間差,若是咱們能肯定咱們取的時間都是某個時間點之後的時間,那麼能夠將時間戳的起始點改爲這個時間點,Leaf項目中,若是不設置起始時間,默認是2010年11月4日09:42:54,這樣可使得支持的最大時間增加,Leaf框架的支持最大時間是起始點以後的69年。
Leaf使用Zookeeper做爲註冊中心,每次機器啓動時去Zookeeper特定路徑/forever/下讀取子節點列表,每一個子節點存儲了IP:Port及對應的workId,遍歷子節點列表,若是存在當前IP:Port對應的workId,就使用節點信息中存儲的workId,不存在就建立一個永久有序節點,將序號做爲workId,而且將workId信息寫入本地緩存文件workerID.properties,供啓動時鏈接Zookeeper失敗,讀取使用。由於workId只分配了10個二進制位,因此取值範圍是0-1023。
序列號是12個二進制位,取值範圍是0到4095,主要保證同一個leaf服務在同一毫秒內,生成的ID的惟一性。 序列號是生成流程以下: 1.當前時間戳與上一個ID的時間戳在同一毫秒內,那麼對sequence+1,若是sequence+1超過了4095,那麼進行等待,等到下一毫秒到了以後再生成ID。 2.當前時間戳與上一個ID的時間戳不在同一毫秒內,取一個100之內的隨機數做爲序列號。
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
//seq 爲0的時候表示是下一毫秒時間開始對seq作隨機
sequence = RANDOM.nextInt(100);
timestamp = tilNextMillis(lastTimestamp);
}
} else {
//若是是新的ms開始
sequence = RANDOM.nextInt(100);
}
lastTimestamp = timestamp;
複製代碼
5e4ff128.png
這種模式須要依賴MySQL,表字段biz_tag表明業務名,max_id表明該業務目前已分配的最大ID值,step表明每次Leaf往數據庫請求時,一次性分配的ID數量。大體流程就是每一個Leaf服務在內存中有兩個Segment實例,每一個Segement保存一個分段的ID,
一個Segment是當前用於分配ID,有一個value屬性保存這個分段已分配的最大ID,以及一個max屬性這個分段最大的ID。
另一個Segement是備用的,當一個Segement用完時,會進行切換,使用另外一個Segement進行使用。
當一個Segement的分段ID使用率達到10%時,就會觸發另外一個Segement去DB獲取分段ID,初始化好分段ID供以後使用。
Segment {
private AtomicLong value = new AtomicLong(0);
private volatile long max;
private volatile int step;
}
SegmentBuffer {
private String key;
private Segment[] segments; //雙buffer
private volatile int currentPos; //當前的使用的segment的index
private volatile boolean nextReady; //下一個segment是否處於可切換狀態
private volatile boolean initOk; //是否初始化完成
private final AtomicBoolean threadRunning; //線程是否在運行中
private final ReadWriteLock lock;
private volatile int step;
private volatile int minStep;
private volatile long updateTimestamp;
}
複製代碼
目前Leaf項目存在的問題是
Snowflake生成ID相關:
而對於一些小公司或者項目組,其餘業務沒有使用到Zookeeper的話,爲了部署Leaf服務而維護一個Zookeeper集羣的代價太大。因此原項目中有issue在問」怎麼支持非Zookeeper的註冊中心「,因爲通常項目中使用MySQL的機率會大不少,因此增長了使用MySQL做爲註冊中心,本地配置做爲註冊中心的功能。
因爲啓動前,服務器時間調到了之前的時間或者進行了回撥,鏈接Zookeeper失敗時會使用本地緩存文件workerID.properties中的workerId,而沒有校驗該ID生成的最大時間戳,可能會形成ID重複,對這個問題進行了修復。
由於缺乏對時間差的校驗,當時間差過大,轉換爲二進制數後超過41位後,在生成ID時會形成溢出,使得符號位爲1,生成id爲負數。
Segement生成ID相關:
沒有太多問題,主要是根據一些issue對代碼進行了性能優化。
Leaf項目原來的註冊中心的模式(咱們暫時命令爲zk_normal模式) 使用Zookeeper做爲註冊中心,每次機器啓動時去Zookeeper特定路徑下讀取子節點列表,若是存在當前IP:Port對應的workId,就使用節點信息中存儲的workId,不存在就建立一個永久有序節點,將序號做爲workId,而且將workId信息寫入本地緩存文件workerID.properties,供啓動時鏈接Zookeeper失敗,讀取使用。
issue#84:workid是否支持回收?
SnowflakeService模式中,workid是否支持回收?分佈式環境下,每次從新部署可能就換了一個ip,若是沒有回收的話1024個機器標識很快就會消耗完,爲何zk不用臨時節點去存儲呢,這樣能動態感知服務上下線,對workid進行管理回收?
開發了zk_recycle模式,針對使用snowflake生成分佈式ID的技術方案,本來是使用Zookeeper做爲註冊中心爲每一個服務根據IP:Port分配一個固定的workId,workId生成範圍爲0到1023,workId不支持回收,因此在Leaf的原項目中有人提出了一個issue#84 workid是否支持回收?,由於當部署Leaf的服務的IP和Port不固定時,若是workId不支持回收,當workId超過最大值時,會致使生成的分佈式ID的重複。因此增長了workId循環使用的模式zk_recycle。
在Leaf/leaf-server/src/main/resources/leaf.properties中添加如下配置
//開啓snowflake服務
leaf.snowflake.enable=true
//leaf服務的端口,用於生成workId
leaf.snowflake.port=
//將snowflake模式設置爲zk_recycle,此時註冊中心爲Zookeeper,而且workerId可複用
leaf.snowflake.mode=zk_recycle
//zookeeper的地址
leaf.snowflake.zk.address=localhost:2181
複製代碼
啓動LeafServerApplication,調用/api/snowflake/get/test就能夠得到此種模式下生成的分佈式ID。
curl domain/api/snowflake/get/test
1256557484213448722
複製代碼
按照上面的配置在leaf.properties裏面進行配置後,
if(mode.equals(SnowflakeMode.ZK_RECYCLE)) {//註冊中心爲zk,對ip:port分配的workId是課循環利用的模式
String zkAddress = properties.getProperty(Constants.LEAF_SNOWFLAKE_ZK_ADDRESS);
RecyclableZookeeperHolder holder = new RecyclableZookeeperHolder(Utils.getIp(),port,zkAddress);
idGen = new SnowflakeIDGenImpl(holder);
if (idGen.init()) {
logger.info("Snowflake Service Init Successfully in mode " + mode);
} else {
throw new InitException("Snowflake Service Init Fail");
}
}
複製代碼
此時SnowflakeIDGenImpl使用的holder是RecyclableZookeeperHolder的實例,workId是可循環利用的,RecyclableZookeeperHolder工做流程以下: 1.首先會在未使用的workId池(zookeeper路徑爲/snowflake/leaf.name/recycle/notuse/)中生成全部workId。 2.而後每次服務器啓動時都是去未使用的workId池取一個新的workId,而後放到正在使用的workId池(zookeeper路徑爲/snowflake/leaf.name/recycle/inuse/)下,將此workId用於Id生成,而且定時上報時間戳,更新zookeeper中的節點信息。 3.而且定時檢測正在使用的workId池,發現某個workId超過最大時間沒有更新時間戳的workId,會把它從正在使用的workId池移出,而後放到未使用的workId池中,以供workId循環使用。 4.而且正在使用這個很長時間沒有更新時間戳的workId的服務器,在發現本身超過最大時間,尚未上報時間戳成功後,會中止id生成服務,以防workId被其餘服務器循環使用,致使id重複。
issue#100:如何使用非zk的註冊中心?
開發了mysql模式,這種模式註冊中心爲MySQL,針對每一個ip:port的workid是固定的。
須要先在數據庫執行項目中的leaf_workerid_alloc.sql,完成建表,而後在Leaf/leaf-server/src/main/resources/leaf.properties中添加如下配置
//開啓snowflake服務
leaf.snowflake.enable=true
//leaf服務的端口,用於生成workId
leaf.snowflake.port=
//將snowflake模式設置爲mysql,此時註冊中心爲Zookeeper,workerId爲固定分配
leaf.snowflake.mode=mysql
//mysql數據庫地址
leaf.jdbc.url=
leaf.jdbc.username=
leaf.jdbc.password=
複製代碼
啓動LeafServerApplication,調用/api/snowflake/get/test就能夠得到此種模式下生成的分佈式ID。
curl domain/api/snowflake/get/test
1256557484213448722
複製代碼
使用上面的配置後,此時SnowflakeIDGenImpl使用的holder是SnowflakeMySQLHolder的實例。實現原理與Leaf原項目默認的模式,使用Zookeeper做爲註冊中心,每一個ip:port的workid是固定的實現原理相似,只是註冊,獲取workid,及更新時間戳是與MySQL進行交互,而不是Zookeeper。
if (mode.equals(SnowflakeMode.MYSQL)) {//註冊中心爲mysql
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl(properties.getProperty(Constants.LEAF_JDBC_URL));
dataSource.setUsername(properties.getProperty(Constants.LEAF_JDBC_USERNAME));
dataSource.setPassword(properties.getProperty(Constants.LEAF_JDBC_PASSWORD));
dataSource.init();
// Config Dao
WorkerIdAllocDao dao = new WorkerIdAllocDaoImpl(dataSource);
SnowflakeMySQLHolder holder = new SnowflakeMySQLHolder(Utils.getIp(), port, dao);
idGen = new SnowflakeIDGenImpl(holder);
if (idGen.init()) {
logger.info("Snowflake Service Init Successfully in mode " + mode);
} else {
throw new InitException("Snowflake Service Init Fail");
}
}
複製代碼
issue#100:如何使用非zk的註冊中心?
開發了local模式,這種模式就是適用於部署Leaf服務的IP和Port基本不會變化的狀況,就是在Leaf項目中的配置文件leaf.properties中顯式得配置某某IP:某某Port對應哪一個workId,每次部署新機器時,將IP:Port的時候在項目中添加這個配置,而後啓動時項目會去讀取leaf.properties中的配置,讀取完寫入本地緩存文件workId.json,下次啓動時直接讀取workId.json,最大時間戳也每次同步到機器上的緩存文件workId.json中。
在Leaf/leaf-server/src/main/resources/leaf.properties中添加如下配置
//開啓snowflake服務
leaf.snowflake.enable=true
//leaf服務的端口,用於生成workId
leaf.snowflake.port=
#註冊中心爲local的的模式
#leaf.snowflake.mode=local
#leaf.snowflake.local.workIdMap=
#workIdMap的格式是這樣的{"Leaf服務的ip:端口":"固定的workId"},例如:{"10.1.46.33:8080":1,"10.1.46.33:8081":2}
複製代碼
啓動LeafServerApplication,調用/api/snowflake/get/test就能夠得到此種模式下生成的分佈式ID。
curl domain/api/snowflake/get/test
1256557484213448722
複製代碼
issue#84:由於當使用默認的模式(咱們暫時命令爲zk_normal模式),註冊中心爲Zookeeper,workId不可複用,上面介紹了這種模式的工做流程,當Leaf服務啓動時,鏈接Zookeeper失敗,那麼會去本機緩存中讀取workerID.properties文件,讀取workId進行使用,可是因爲workerID.properties中只存了workId信息,沒有存儲上次上報的最大時間戳,因此沒有進行時間戳判斷,因此若是機器的當前時間被修改到以前,就可能會致使生成的ID重複。
因此增長了更新時間戳到本地緩存的機制,每次在上報時間戳時將時間戳同時寫入本機緩存workerID.properties,而且當使用本地緩存workerID.properties中的workId時,對時間戳進行校驗,當前系統時間戳<緩存中的時間戳時,才使用這個workerId。
//鏈接失敗,使用本地workerID.properties中的workerID,而且對時間戳進行校驗。
try {
Properties properties = new Properties();
properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + ""))));
Long maxTimestamp = Long.valueOf(properties.getProperty("maxTimestamp"));
if (maxTimestamp!=null && System.currentTimeMillis() <maxTimestamp) {
throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time");
}
workerID = Integer.valueOf(properties.getProperty("workerID"));
LOGGER.warn("START FAILED ,use local node file properties workerID-{}", workerID);
} catch (Exception e1) {
LOGGER.error("Read file error ", e1);
return false;
}
//定時任務每3s執行一次updateNewData()方法,調用更新updateLocalWorkerID()更新緩存文件workerID.properties
void updateNewData(CuratorFramework curator, String path) {
try {
if (System.currentTimeMillis() < lastUpdateTime) {
return;
}
curator.setData().forPath(path, buildData().getBytes());
updateLocalWorkerID(workerID);
lastUpdateTime = System.currentTimeMillis();
} catch (Exception e) {
LOGGER.info("update init data error path is {} error is {}", path, e);
}
}
複製代碼
由於Leaf框架是沿用snowflake的位數分配 最大41位時間差+10位的workID+12位序列化,可是因爲snowflake是強制要求第一位爲符號位0,不然生成的id轉換爲十進制後會是複試,可是Leaf項目中沒有對時間差進行校驗,當時間戳過大或者自定義的twepoch設置不當太小,會致使計算獲得的時間差過大,轉化爲2進制後超過41位,且第一位爲1,會致使生成的long類型的id爲負數,例如當timestamp = twepoch+2199023255552L時, 此時在生成id時,timestamp - twepoch會等於2199023255552,2199023255552轉換爲二進制後是1+41個0,此時生成的id因爲符號位是1,id會是負數-9223372036854775793
long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
複製代碼
//一開始將最大的maxTimeStamp計算好
this.maxTimeStamp = ~(-1L << timeStampBits) + twepoch;
//而後生成ID時進行校驗
if (timestamp>maxTimeStamp) {
throw new OverMaxTimeStampException("current timestamp is over maxTimeStamp, the generate id will be negative");
}
複製代碼
針對issue#68裏面的優化方案,對Segement Buffer的緩存數據與DB數據同步的工做流程進行了進一步優化,主要是對 對SegmentIDGenImpl.updateCacheFromDb()方法進行了優化。
原方案工做流程: 1.遍歷cacheTags,將dbTags的副本insertTagsSet中存在的元素移除,使得insertTagsSet只有db新增的tag 2.遍歷insertTagsSet,將這些新增的元素添加到cache中 3.遍歷dbTags,將cacheTags的副本removeTagsSet中存在的元素移除,使得removeTagsSet只有cache中過時的tag 4.遍歷removeTagsSet,將過時的元素移除cache 這種方案須要經歷四次循環,使用兩個HashSet分別存儲db中新增的tag,cache中過時的tag, 而且爲了篩選出新增的tag,過時的tag,對每一個如今使用的tag有兩次刪除操做,
原有方案代碼以下:
List<String> dbTags = dao.getAllTags();
if (dbTags == null || dbTags.isEmpty()) {
return;
}
List<String> cacheTags = new ArrayList<String>(cache.keySet());
Set<String> insertTagsSet = new HashSet<>(dbTags);
Set<String> removeTagsSet = new HashSet<>(cacheTags);
//db中新加的tags灌進cache
for(int i = 0; i < cacheTags.size(); i++){
String tmp = cacheTags.get(i);
if(insertTagsSet.contains(tmp)){
insertTagsSet.remove(tmp);
}
}
for (String tag : insertTagsSet) {
SegmentBuffer buffer = new SegmentBuffer();
buffer.setKey(tag);
Segment segment = buffer.getCurrent();
segment.setValue(new AtomicLong(0));
segment.setMax(0);
segment.setStep(0);
cache.put(tag, buffer);
logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
}
//cache中已失效的tags從cache刪除
for(int i = 0; i < dbTags.size(); i++){
String tmp = dbTags.get(i);
if(removeTagsSet.contains(tmp)){
removeTagsSet.remove(tmp);
}
}
for (String tag : removeTagsSet) {
cache.remove(tag);
logger.info("Remove tag {} from IdCache", tag);
}
複製代碼
實際上咱們並不須要這些中間過程,現方案工做流程: 只須要遍歷dbTags,判斷cache中是否存在這個key,不存在就是新增元素,進行新增。 遍歷cacheTags,判斷dbSet中是否存在這個key,不存在就是過時元素,進行刪除。
現有方案代碼:
List<String> dbTags = dao.getAllTags();
if (dbTags == null || dbTags.isEmpty()) {
return;
}
//將dbTags中新加的tag添加cache,經過遍歷dbTags,判斷是否在cache中存在,不存在就添加到cache
for (String dbTag : dbTags) {
if (cache.containsKey(dbTag)==false) {
SegmentBuffer buffer = new SegmentBuffer();
buffer.setKey(dbTag);
Segment segment = buffer.getCurrent();
segment.setValue(new AtomicLong(0));
segment.setMax(0);
segment.setStep(0);
cache.put(dbTag, buffer);
logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", dbTag, buffer);
}
}
List<String> cacheTags = new ArrayList<String>(cache.keySet());
Set<String> dbTagSet = new HashSet<>(dbTags);
//將cache中已失效的tag從cache刪除,經過遍歷cacheTags,判斷是否在dbTagSet中存在,不存在說明過時,直接刪除
for (String cacheTag : cacheTags) {
if (dbTagSet.contains(cacheTag) == false) {
cache.remove(cacheTag);
logger.info("Remove tag {} from IdCache", cacheTag);
}
}
複製代碼
兩個方案對比:
這個更新是針對這個issue#88 提出的問題,使用位運算&來代替取模運算%,執行效率更高。 原代碼:
public int nextPos() {
return (currentPos + 1) % 2;
}
複製代碼
現代碼:
public int nextPos() {
return (currentPos + 1) & 1;
}
複製代碼