美團分佈式ID生成框架Leaf源碼分析及優化改進

PS:最近作了一個面試題解答的開源項目,你們能夠看一看,若是對你們有幫助,但願你們幫忙給一個star,謝謝你們了!html

《面試指北》項目地址:github.com/NotFound9/i…java

本文主要是對美團的分佈式ID框架Leaf的原理進行介紹,針對Leaf原項目中的一些issue,對Leaf項目進行功能加強,問題修復及優化改進,改進後的項目地址在這裏:node

Leaf項目改進計劃 github.com/NotFound9/L…mysql

Leaf原理分析

Snowflake生成ID的模式

7849276-4d1955394baa3c6d.png git

snowflake算法對於ID的位數是上圖這樣分配的:

1位的符號位+41位時間戳+10位workID+12位序列號github

加起來一共是64個二進制位,正好與Java中的long類型的位數同樣。面試

美團的Leaf框架對於snowflake算法進行了一些位數調整,位數分配是這樣:算法

最大41位時間差+10位的workID+12位序列化sql

雖然看美團對Leaf的介紹文章裏面說數據庫

Leaf-snowflake方案徹底沿用snowflake方案的bit位設計,便是「1+41+10+12」的方式組裝ID號。

其實看代碼裏面是沒有專門設置符號位的,若是timestamp過大,致使時間差佔用42個二進制位,時間差的第一位爲1時,可能生成的id轉換爲十進制後會是負數:

//timestampLeftShift是22,workerIdShift是12
long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
複製代碼

時間差是什麼?

由於時間戳是以1970年01月01日00時00分00秒做爲起始點,其實咱們通常取的時間戳實際上是起始點到如今的時間差,若是咱們能肯定咱們取的時間都是某個時間點之後的時間,那麼能夠將時間戳的起始點改爲這個時間點,Leaf項目中,若是不設置起始時間,默認是2010年11月4日09:42:54,這樣可使得支持的最大時間增加,Leaf框架的支持最大時間是起始點以後的69年。

workID怎麼分配?

Leaf使用Zookeeper做爲註冊中心,每次機器啓動時去Zookeeper特定路徑/forever/下讀取子節點列表,每一個子節點存儲了IP:Port及對應的workId,遍歷子節點列表,若是存在當前IP:Port對應的workId,就使用節點信息中存儲的workId,不存在就建立一個永久有序節點,將序號做爲workId,而且將workId信息寫入本地緩存文件workerID.properties,供啓動時鏈接Zookeeper失敗,讀取使用。由於workId只分配了10個二進制位,因此取值範圍是0-1023。

序列號怎麼生成?

序列號是12個二進制位,取值範圍是0到4095,主要保證同一個leaf服務在同一毫秒內,生成的ID的惟一性。 序列號是生成流程以下: 1.當前時間戳與上一個ID的時間戳在同一毫秒內,那麼對sequence+1,若是sequence+1超過了4095,那麼進行等待,等到下一毫秒到了以後再生成ID。 2.當前時間戳與上一個ID的時間戳不在同一毫秒內,取一個100之內的隨機數做爲序列號。

if (lastTimestamp == timestamp) {
        sequence = (sequence + 1) & sequenceMask;
        if (sequence == 0) {
            //seq 爲0的時候表示是下一毫秒時間開始對seq作隨機
            sequence = RANDOM.nextInt(100);
            timestamp = tilNextMillis(lastTimestamp);
        }
} else {
    //若是是新的ms開始
       sequence = RANDOM.nextInt(100);
}
lastTimestamp = timestamp;
複製代碼

segment生成ID的模式

5e4ff128.png

這種模式須要依賴MySQL,表字段biz_tag表明業務名,max_id表明該業務目前已分配的最大ID值,step表明每次Leaf往數據庫請求時,一次性分配的ID數量。

大體流程就是每一個Leaf服務在內存中有兩個Segment實例,每一個Segement保存一個分段的ID,

一個Segment是當前用於分配ID,有一個value屬性保存這個分段已分配的最大ID,以及一個max屬性這個分段最大的ID。

另一個Segement是備用的,當一個Segement用完時,會進行切換,使用另外一個Segement進行使用。

當一個Segement的分段ID使用率達到10%時,就會觸發另外一個Segement去DB獲取分段ID,初始化好分段ID供以後使用。

Segment {
    private AtomicLong value = new AtomicLong(0);
    private volatile long max;
    private volatile int step;
}
SegmentBuffer {
    private String key;
    private Segment[] segments; //雙buffer
    private volatile int currentPos; //當前的使用的segment的index
    private volatile boolean nextReady; //下一個segment是否處於可切換狀態
    private volatile boolean initOk; //是否初始化完成
    private final AtomicBoolean threadRunning; //線程是否在運行中
    private final ReadWriteLock lock;

    private volatile int step;
    private volatile int minStep;
    private volatile long updateTimestamp;
}
複製代碼

Leaf項目改進

目前Leaf項目存在的問題是

Snowflake生成ID相關:

1.註冊中心只支持Zookeeper

而對於一些小公司或者項目組,其餘業務沒有使用到Zookeeper的話,爲了部署Leaf服務而維護一個Zookeeper集羣的代價太大。因此原項目中有issue在問」怎麼支持非Zookeeper的註冊中心「,因爲通常項目中使用MySQL的機率會大不少,因此增長了使用MySQL做爲註冊中心,本地配置做爲註冊中心的功能。

2.潛在的時鐘回撥問題

因爲啓動前,服務器時間調到了之前的時間或者進行了回撥,鏈接Zookeeper失敗時會使用本地緩存文件workerID.properties中的workerId,而沒有校驗該ID生成的最大時間戳,可能會形成ID重複,對這個問題進行了修復。

3.時間差過大時,生成id爲負數

由於缺乏對時間差的校驗,當時間差過大,轉換爲二進制數後超過41位後,在生成ID時會形成溢出,使得符號位爲1,生成id爲負數。

Segement生成ID相關:

沒有太多問題,主要是根據一些issue對代碼進行了性能優化。

具體改進以下:

Snowflake生成ID相關的改進:

1.針對Leaf原項目中的issue#84,增長zk_recycle模式(註冊中心爲zk,workId循環使用)

2.針對Leaf原項目中的issue#100,增長MySQL模式(註冊中心爲MySQL)

3.針對Leaf原項目中的issue#100,增長Local模式(註冊中心爲本地項目配置)

4.針對Leaf原項目中的issue#84,修復啓動時時鐘回撥的問題

5.針對Leaf原項目中的issue#106,修復時間差過大,超過41位溢出,致使生成的id負數的問題

Segement生成ID相關的改進:

1.針對Leaf原項目中的issue#68,優化SegmentIDGenImpl.updateCacheFromDb()方法。

2.針對Leaf原項目中的 issue#88,使用位運算&替換取模運算

snowflake算法生成ID的相關改進

Leaf項目原來的註冊中心的模式(咱們暫時命令爲zk_normal模式) 使用Zookeeper做爲註冊中心,每次機器啓動時去Zookeeper特定路徑下讀取子節點列表,若是存在當前IP:Port對應的workId,就使用節點信息中存儲的workId,不存在就建立一個永久有序節點,將序號做爲workId,而且將workId信息寫入本地緩存文件workerID.properties,供啓動時鏈接Zookeeper失敗,讀取使用。

1.針對Leaf原項目中的issue#84,增長zk_recycle模式(註冊中心爲zk,workId循環使用)

問題詳情:

issue#84:workid是否支持回收?

SnowflakeService模式中,workid是否支持回收?分佈式環境下,每次從新部署可能就換了一個ip,若是沒有回收的話1024個機器標識很快就會消耗完,爲何zk不用臨時節點去存儲呢,這樣能動態感知服務上下線,對workid進行管理回收?

解決方案:

開發了zk_recycle模式,針對使用snowflake生成分佈式ID的技術方案,本來是使用Zookeeper做爲註冊中心爲每一個服務根據IP:Port分配一個固定的workId,workId生成範圍爲0到1023,workId不支持回收,因此在Leaf的原項目中有人提出了一個issue#84 workid是否支持回收?,由於當部署Leaf的服務的IP和Port不固定時,若是workId不支持回收,當workId超過最大值時,會致使生成的分佈式ID的重複。因此增長了workId循環使用的模式zk_recycle。

如何使用zk_recycle模式?

在Leaf/leaf-server/src/main/resources/leaf.properties中添加如下配置

//開啓snowflake服務
leaf.snowflake.enable=true
//leaf服務的端口,用於生成workId
leaf.snowflake.port=
//將snowflake模式設置爲zk_recycle,此時註冊中心爲Zookeeper,而且workerId可複用
leaf.snowflake.mode=zk_recycle
//zookeeper的地址
leaf.snowflake.zk.address=localhost:2181
複製代碼

啓動LeafServerApplication,調用/api/snowflake/get/test就能夠得到此種模式下生成的分佈式ID。

curl domain/api/snowflake/get/test
1256557484213448722
複製代碼

zk_recycle模式實現原理

按照上面的配置在leaf.properties裏面進行配置後,

if(mode.equals(SnowflakeMode.ZK_RECYCLE)) {//註冊中心爲zk,對ip:port分配的workId是課循環利用的模式
     String    zkAddress = properties.getProperty(Constants.LEAF_SNOWFLAKE_ZK_ADDRESS);
     RecyclableZookeeperHolder holder    = new RecyclableZookeeperHolder(Utils.getIp(),port,zkAddress);
     idGen = new SnowflakeIDGenImpl(holder);
     if (idGen.init()) {
     logger.info("Snowflake Service Init Successfully in mode " + mode);
     } else {
     throw new InitException("Snowflake Service Init Fail");
     }
}
複製代碼

此時SnowflakeIDGenImpl使用的holder是RecyclableZookeeperHolder的實例,workId是可循環利用的,RecyclableZookeeperHolder工做流程以下: 1.首先會在未使用的workId池(zookeeper路徑爲/snowflake/leaf.name/recycle/notuse/)中生成全部workId。 2.而後每次服務器啓動時都是去未使用的workId池取一個新的workId,而後放到正在使用的workId池(zookeeper路徑爲/snowflake/leaf.name/recycle/inuse/)下,將此workId用於Id生成,而且定時上報時間戳,更新zookeeper中的節點信息。 3.而且定時檢測正在使用的workId池,發現某個workId超過最大時間沒有更新時間戳的workId,會把它從正在使用的workId池移出,而後放到未使用的workId池中,以供workId循環使用。 4.而且正在使用這個很長時間沒有更新時間戳的workId的服務器,在發現本身超過最大時間,尚未上報時間戳成功後,會中止id生成服務,以防workId被其餘服務器循環使用,致使id重複。

2.針對Leaf原項目中的issue#100,增長MySQL模式(註冊中心爲MySQL)

問題詳情:

issue#100:如何使用非zk的註冊中心?

解決方案:

開發了mysql模式,這種模式註冊中心爲MySQL,針對每一個ip:port的workid是固定的。

如何使用這種mysql模式?

須要先在數據庫執行項目中的leaf_workerid_alloc.sql,完成建表,而後在Leaf/leaf-server/src/main/resources/leaf.properties中添加如下配置

//開啓snowflake服務
leaf.snowflake.enable=true
//leaf服務的端口,用於生成workId
leaf.snowflake.port=
//將snowflake模式設置爲mysql,此時註冊中心爲Zookeeper,workerId爲固定分配
leaf.snowflake.mode=mysql
//mysql數據庫地址
leaf.jdbc.url=
leaf.jdbc.username=
leaf.jdbc.password=
複製代碼

啓動LeafServerApplication,調用/api/snowflake/get/test就能夠得到此種模式下生成的分佈式ID。

curl domain/api/snowflake/get/test
1256557484213448722
複製代碼

實現原理

使用上面的配置後,此時SnowflakeIDGenImpl使用的holder是SnowflakeMySQLHolder的實例。實現原理與Leaf原項目默認的模式,使用Zookeeper做爲註冊中心,每一個ip:port的workid是固定的實現原理相似,只是註冊,獲取workid,及更新時間戳是與MySQL進行交互,而不是Zookeeper。

if (mode.equals(SnowflakeMode.MYSQL)) {//註冊中心爲mysql
		DruidDataSource dataSource = new DruidDataSource();
		dataSource.setUrl(properties.getProperty(Constants.LEAF_JDBC_URL));
dataSource.setUsername(properties.getProperty(Constants.LEAF_JDBC_USERNAME));
dataSource.setPassword(properties.getProperty(Constants.LEAF_JDBC_PASSWORD));
		dataSource.init();
		// Config Dao
		WorkerIdAllocDao dao = new WorkerIdAllocDaoImpl(dataSource);
		SnowflakeMySQLHolder holder = new SnowflakeMySQLHolder(Utils.getIp(), port, dao);
		idGen = new SnowflakeIDGenImpl(holder);
		if (idGen.init()) {
				logger.info("Snowflake Service Init Successfully in mode " + mode);
		} else {
				throw new InitException("Snowflake Service Init Fail");
    }
}
複製代碼

3.針對Leaf原項目中的issue#100,增長Local模式(註冊中心爲本地項目配置)

問題詳情:

issue#100:如何使用非zk的註冊中心?

解決方案:

開發了local模式,這種模式就是適用於部署Leaf服務的IP和Port基本不會變化的狀況,就是在Leaf項目中的配置文件leaf.properties中顯式得配置某某IP:某某Port對應哪一個workId,每次部署新機器時,將IP:Port的時候在項目中添加這個配置,而後啓動時項目會去讀取leaf.properties中的配置,讀取完寫入本地緩存文件workId.json,下次啓動時直接讀取workId.json,最大時間戳也每次同步到機器上的緩存文件workId.json中。

如何使用這種local模式?

在Leaf/leaf-server/src/main/resources/leaf.properties中添加如下配置

//開啓snowflake服務
leaf.snowflake.enable=true
//leaf服務的端口,用於生成workId
leaf.snowflake.port=
#註冊中心爲local的的模式
#leaf.snowflake.mode=local
#leaf.snowflake.local.workIdMap=
#workIdMap的格式是這樣的{"Leaf服務的ip:端口":"固定的workId"},例如:{"10.1.46.33:8080":1,"10.1.46.33:8081":2}
複製代碼

啓動LeafServerApplication,調用/api/snowflake/get/test就能夠得到此種模式下生成的分佈式ID。

curl domain/api/snowflake/get/test
1256557484213448722
複製代碼

4.針對Leaf原項目中的issue#84,修復啓動時時鐘回撥的問題

問題詳情:

issue#84:由於當使用默認的模式(咱們暫時命令爲zk_normal模式),註冊中心爲Zookeeper,workId不可複用,上面介紹了這種模式的工做流程,當Leaf服務啓動時,鏈接Zookeeper失敗,那麼會去本機緩存中讀取workerID.properties文件,讀取workId進行使用,可是因爲workerID.properties中只存了workId信息,沒有存儲上次上報的最大時間戳,因此沒有進行時間戳判斷,因此若是機器的當前時間被修改到以前,就可能會致使生成的ID重複。

解決方案:

因此增長了更新時間戳到本地緩存的機制,每次在上報時間戳時將時間戳同時寫入本機緩存workerID.properties,而且當使用本地緩存workerID.properties中的workId時,對時間戳進行校驗,當前系統時間戳<緩存中的時間戳時,才使用這個workerId。

//鏈接失敗,使用本地workerID.properties中的workerID,而且對時間戳進行校驗。
try {
		Properties properties = new Properties();
		properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + ""))));
		Long maxTimestamp = 				 Long.valueOf(properties.getProperty("maxTimestamp"));
		if (maxTimestamp!=null && System.currentTimeMillis() <maxTimestamp) 		{
				throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time");
		}
		workerID = Integer.valueOf(properties.getProperty("workerID"));
		LOGGER.warn("START FAILED ,use local node file properties workerID-{}", workerID);
} catch (Exception e1) {
		LOGGER.error("Read file error ", e1);
		return false;
}      

//定時任務每3s執行一次updateNewData()方法,調用更新updateLocalWorkerID()更新緩存文件workerID.properties
void updateNewData(CuratorFramework curator, String path) {
      try {
          if (System.currentTimeMillis() < lastUpdateTime) {
            	return;
          }
          curator.setData().forPath(path, buildData().getBytes());
          updateLocalWorkerID(workerID);
          lastUpdateTime = System.currentTimeMillis();
      } catch (Exception e) {
        	LOGGER.info("update init data error path is {} error is {}", path, e);
      }
}
複製代碼

5.針對Leaf原項目中的issue#106,修復時間差過大,超過41位溢出,致使生成的id負數的問題

問題詳情:

由於Leaf框架是沿用snowflake的位數分配 最大41位時間差+10位的workID+12位序列化,可是因爲snowflake是強制要求第一位爲符號位0,不然生成的id轉換爲十進制後會是複試,可是Leaf項目中沒有對時間差進行校驗,當時間戳過大或者自定義的twepoch設置不當太小,會致使計算獲得的時間差過大,轉化爲2進制後超過41位,且第一位爲1,會致使生成的long類型的id爲負數,例如當timestamp = twepoch+2199023255552L時, 此時在生成id時,timestamp - twepoch會等於2199023255552,2199023255552轉換爲二進制後是1+41個0,此時生成的id因爲符號位是1,id會是負數-9223372036854775793

long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
複製代碼

解決方案:

//一開始將最大的maxTimeStamp計算好
this.maxTimeStamp = ~(-1L << timeStampBits) + twepoch;
//而後生成ID時進行校驗
if (timestamp>maxTimeStamp) {
    throw new OverMaxTimeStampException("current timestamp is over maxTimeStamp, the generate id will be negative");
}
複製代碼

針對Segement生成分佈式ID相關的改進

1.針對Leaf原項目中的issue#68,優化SegmentIDGenImpl.updateCacheFromDb()方法

針對issue#68裏面的優化方案,對Segement Buffer的緩存數據與DB數據同步的工做流程進行了進一步優化,主要是對 對SegmentIDGenImpl.updateCacheFromDb()方法進行了優化。

原方案工做流程: 1.遍歷cacheTags,將dbTags的副本insertTagsSet中存在的元素移除,使得insertTagsSet只有db新增的tag 2.遍歷insertTagsSet,將這些新增的元素添加到cache中 3.遍歷dbTags,將cacheTags的副本removeTagsSet中存在的元素移除,使得removeTagsSet只有cache中過時的tag 4.遍歷removeTagsSet,將過時的元素移除cache 這種方案須要經歷四次循環,使用兩個HashSet分別存儲db中新增的tag,cache中過時的tag, 而且爲了篩選出新增的tag,過時的tag,對每一個如今使用的tag有兩次刪除操做,

原有方案代碼以下:

List<String> dbTags = dao.getAllTags();
    if (dbTags == null || dbTags.isEmpty()) {
        return;
    }
    List<String> cacheTags = new ArrayList<String>(cache.keySet());
    Set<String> insertTagsSet = new HashSet<>(dbTags);
    Set<String> removeTagsSet = new HashSet<>(cacheTags);
    //db中新加的tags灌進cache
    for(int i = 0; i < cacheTags.size(); i++){
        String tmp = cacheTags.get(i);
        if(insertTagsSet.contains(tmp)){
            insertTagsSet.remove(tmp);
        }
    }
    for (String tag : insertTagsSet) {
        SegmentBuffer buffer = new SegmentBuffer();
        buffer.setKey(tag);
        Segment segment = buffer.getCurrent();
        segment.setValue(new AtomicLong(0));
        segment.setMax(0);
        segment.setStep(0);
        cache.put(tag, buffer);
        logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
    }
    //cache中已失效的tags從cache刪除
    for(int i = 0; i < dbTags.size(); i++){
        String tmp = dbTags.get(i);
        if(removeTagsSet.contains(tmp)){
            removeTagsSet.remove(tmp);
        }
    }
    for (String tag : removeTagsSet) {
        cache.remove(tag);
        logger.info("Remove tag {} from IdCache", tag);
    }
複製代碼

實際上咱們並不須要這些中間過程,現方案工做流程: 只須要遍歷dbTags,判斷cache中是否存在這個key,不存在就是新增元素,進行新增。 遍歷cacheTags,判斷dbSet中是否存在這個key,不存在就是過時元素,進行刪除。

現有方案代碼:

List<String> dbTags = dao.getAllTags();
    if (dbTags == null || dbTags.isEmpty()) {
        return;
    }
    //將dbTags中新加的tag添加cache,經過遍歷dbTags,判斷是否在cache中存在,不存在就添加到cache
    for (String dbTag : dbTags) {
        if (cache.containsKey(dbTag)==false) {
            SegmentBuffer buffer = new SegmentBuffer();
            buffer.setKey(dbTag);
            Segment segment = buffer.getCurrent();
            segment.setValue(new AtomicLong(0));
            segment.setMax(0);
            segment.setStep(0);
            cache.put(dbTag, buffer);
            logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", dbTag, buffer);
        }
    }
    List<String> cacheTags = new ArrayList<String>(cache.keySet());
    Set<String>  dbTagSet     = new HashSet<>(dbTags);
    //將cache中已失效的tag從cache刪除,經過遍歷cacheTags,判斷是否在dbTagSet中存在,不存在說明過時,直接刪除
    for (String cacheTag : cacheTags) {
        if (dbTagSet.contains(cacheTag) == false) {
            cache.remove(cacheTag);
            logger.info("Remove tag {} from IdCache", cacheTag);
        }
    }
複製代碼

兩個方案對比:

  • 空間複雜度 相比原方案須要使用兩個HashSet,這種方案的只須要使用一個hashSet,空間複雜度會低一些。
  • 時間複雜度 總遍歷次數會比原來的少,時間複雜度更低,由於判斷是新增,過時的狀況就直接處理了,不須要後續再單獨遍歷, 並且不須要對cache和dbtag的交集進行刪除操做,由於原來方案爲了得到新增的元素,是將dbSet的副本中現有元素進行刪除獲得。
  • 代碼可讀性 原方案是4個for循環,總共35行代碼,現方案是2個for循環,總共25行代碼,更加簡潔易懂。

2.針對Leaf原項目中的issue#88,使用位運算&替換取模運算

這個更新是針對這個issue#88 提出的問題,使用位運算&來代替取模運算%,執行效率更高。 原代碼:

public int nextPos() {
        return (currentPos + 1) % 2;
}
複製代碼

現代碼:

public int nextPos() {
        return (currentPos + 1) & 1;
}
複製代碼

參考文章

tech.meituan.com/2017/04/21/…

相關文章
相關標籤/搜索