在分佈式系統中,每每須要對大量的數據如訂單、帳戶進行標識,以一個有意義的有序的序列號來做爲全局惟一的ID。java
而分佈式系統中咱們對ID生成器要求又有哪些呢?node
全局惟一性:不能出現重複的ID號,既然是惟一標識,這是最基本的要求。redis
遞增:比較低要求的條件爲趨勢遞增,即保證下一個ID必定大於上一個ID,而比較苛刻的要求是連續遞增,如1,2,3等等。數據庫
高可用高性能:ID生成事關重大,一旦掛掉系統崩潰;高性能是指必需要在壓測下表現良好,若是達不到要求則在高併發環境下依然會致使系統癱瘓。apache
優勢:緩存
可以保證獨立性,程序能夠在不一樣的數據庫間遷移,效果不受影響。服務器
保證生成的ID不只是表獨立的,並且是庫獨立的,這點在你想切分數據庫的時候尤其重要。網絡
缺點:併發
性能問題:UUID太長,一般以36長度的字符串表示,對MySQL索引不利:若是做爲數據庫主鍵,在InnoDB引擎下,UUID的無序性可能會引發數據位置頻繁變更,嚴重影響性能。app
UUID無業務含義:不少須要ID能標識業務含義的地方不使用。
不知足遞增要求。
snowflake是twitter開源的分佈式ID生成系統。 Twitter每秒有數十萬條消息的請求,每條消息都必須分配一條惟一的id,這些id還須要一些大體的順序(方便客戶端排序),而且在分佈式系統中不一樣機器產生的id必須不一樣。
snowflake的結構以下(每部分用-分開):
0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 – 000000000000
第一位爲未使用,接下來的41位爲毫秒級時間(41位的長度可使用69年),而後是5位datacenterId和5位workerId(10位的長度最多支持部署1024個節點) ,最後12位是毫秒內的計數(12位的計數順序號支持每一個節點每毫秒產生4096個ID序號)
一共加起來恰好64位,爲一個Long型。
snowflake生成的ID總體上按照時間自增排序,而且整個分佈式系統內不會產生ID碰撞(由datacenter和workerId做區分),而且效率較高。snowflake的缺點是:
強依賴時鐘,若是主機時間回撥,則會形成重複ID
ID雖然有序,可是不連續
snowflake如今有較好的改良方案,好比美團點評開源的分佈式ID框架:leaf,經過使用ZooKeeper解決了時鐘依賴問題。
利用數據庫生成ID是最多見的方案。可以確保ID全數據庫惟一。其優缺點以下:
優勢:
很是簡單,利用現有數據庫系統的功能實現,成本小,有DBA專業維護。
ID單調自增。
缺點:
不一樣數據庫語法和實現不一樣,數據庫遷移的時候或多數據庫版本支持的時候須要處理。
在單個數據庫或讀寫分離或一主多從的狀況下,只有一個主庫能夠生成。有單點故障的風險。
在性能達不到要求的狀況下,比較難於擴展。
若是涉及多個系統須要合併或者數據遷移會比較麻煩。
分表分庫的時候會有麻煩。
經過Redis生成ID(主要經過redis的自增函數)、ZooKeeper生成ID、MongoDB的ObjectID等都可實現惟一性的要求。
實際業務中,除了分佈式ID全局惟一以外,還有是否趨勢/連續遞增的要求。根據具體業務需求的不一樣,有兩種可選方案。
一是隻保證全局惟一,不保證連續遞增。二是既保證全局惟一,又保證連續遞增。
2. 基於ZooKeeper和本地緩存的方案
基於zookeeper分佈式ID實現方案有不少種,本方案只使用ZooKeeper做爲分段節點協調工具。每臺服務器首先從zookeeper緩存一段,如1-1000的id。
此時zk上保存最大值1000,每次獲取的時候都會進行判斷,若是id小於本地最大值,即id<=1000,則更新本地的當前值,若是id大於本地當前值,好比說是1001,則會將從zk再獲取下一個id數據段並在本地緩存。獲取數據段的時候須要更新zk節點數據,更新的時候使用curator的分佈式鎖來實現。
因爲id是從本機獲取,所以本方案的優勢是性能很是好。缺點是若是多主機負載均衡,則會出現不連續的id,固然將遞增區段設置爲1也能保證連續的id,可是效率會受到很大影響。
實現關鍵源碼以下:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 根據開源項目mycat實現基於zookeeper的遞增序列號
* <p>
* 只要配置好ZK地址和表名的以下屬性
* MINID 某線程當前區間內最小值
* MAXID 某線程當前區間內最大值
* CURID 某線程當前區間內當前值
*
* @author wangwanbin
* @version 1.0
* @time 2017/9/1
*/
public class ZKCachedSequenceHandler extends SequenceHandler {
protected static final Logger LOGGER = LoggerFactory.getLogger(ZKCachedSequenceHandler.class);
private static final String KEY_MIN_NAME = ".MINID";// 1
private static final String KEY_MAX_NAME = ".MAXID";// 10000
private static final String KEY_CUR_NAME = ".CURID";// 888
private final static long PERIOD = 1000;//每次緩存的ID段數量
private static ZKCachedSequenceHandler instance = new ZKCachedSequenceHandler();
/**
* 私有化構造方法,單例模式
*/
private ZKCachedSequenceHandler() {
}
/**
* 獲取sequence工具對象的惟一方法
*
* @return
*/
public static ZKCachedSequenceHandler getInstance() {
return instance;
}
private Map<String, Map<String, String>> tableParaValMap = null;
private CuratorFramework client;
private InterProcessSemaphoreMutex interProcessSemaphore = null;
public void loadZK() {
try {
this.client = CuratorFrameworkFactory.newClient(zkAddress, new ExponentialBackoffRetry(1000, 3));
this.client.start();
} catch (Exception e) {
LOGGER.error("Error caught while initializing ZK:" + e.getCause());
}
}
public Map<String, String> getParaValMap(String prefixName) {
if (tableParaValMap == null) {
try {
loadZK();
fetchNextPeriod(prefixName);
} catch (Exception e) {
LOGGER.error("Error caught while loding configuration within current thread:" + e.getCause());
}
}
Map<String, String> paraValMap = tableParaValMap.get(prefixName);
return paraValMap;
}
public Boolean fetchNextPeriod(String prefixName) {
try {
Stat stat = this.client.checkExists().forPath(PATH + "/" + prefixName + SEQ);
if (stat == null || (stat.getDataLength() == 0)) {
try {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.forPath(PATH + "/" + prefixName + SEQ, String.valueOf(0).getBytes());
} catch (Exception e) {
LOGGER.debug("Node exists! Maybe other instance is initializing!");
}
}
if (interProcessSemaphore == null) {
interProcessSemaphore = new InterProcessSemaphoreMutex(client, PATH + "/" + prefixName + SEQ);
}
interProcessSemaphore.acquire();
if (tableParaValMap == null) {
tableParaValMap = new ConcurrentHashMap<>();
}
Map<String, String> paraValMap = tableParaValMap.get(prefixName);
if (paraValMap == null) {
paraValMap = new ConcurrentHashMap<>();
tableParaValMap.put(prefixName, paraValMap);
}
long now = Long.parseLong(new String(client.getData().forPath(PATH + "/" + prefixName + SEQ)));
client.setData().forPath(PATH + "/" + prefixName + SEQ, ((now + PERIOD) + "").getBytes());
if (now == 1) {
paraValMap.put(prefixName + KEY_MAX_NAME, PERIOD + "");
paraValMap.put(prefixName + KEY_MIN_NAME, "1");
paraValMap.put(prefixName + KEY_CUR_NAME, "0");
} else {
paraValMap.put(prefixName + KEY_MAX_NAME, (now + PERIOD) + "");
paraValMap.put(prefixName + KEY_MIN_NAME, (now) + "");
paraValMap.put(prefixName + KEY_CUR_NAME, (now) + "");
}
} catch (Exception e) {
LOGGER.error("Error caught while updating period from ZK:" + e.getCause());
} finally {
try {
interProcessSemaphore.release();
} catch (Exception e) {
LOGGER.error("Error caught while realeasing distributed lock" + e.getCause());
}
}
return true;
}
public Boolean updateCURIDVal(String prefixName, Long val) {
Map<String, String> paraValMap = tableParaValMap.get(prefixName);
if (paraValMap == null) {
throw new IllegalStateException("ZKCachedSequenceHandler should be loaded first!");
}
paraValMap.put(prefixName + KEY_CUR_NAME, val + "");
return true;
}
/**
* 獲取自增ID
*
* @param sequenceEnum
* @return
*/
@Override
public synchronized long nextId(SequenceEnum sequenceEnum) {
String prefixName = sequenceEnum.getCode();
Map<String, String> paraMap = this.getParaValMap(prefixName);
if (null == paraMap) {
throw new RuntimeException("fetch Param Values error.");
}
Long nextId = Long.parseLong(paraMap.get(prefixName + KEY_CUR_NAME)) + 1;
Long maxId = Long.parseLong(paraMap.get(prefixName + KEY_MAX_NAME));
if (nextId > maxId) {
fetchNextPeriod(prefixName);
return nextId(sequenceEnum);
}
updateCURIDVal(prefixName, nextId);
return nextId.longValue();
}
public static void main(String[] args) throws UnsupportedEncodingException {
long startTime = System.currentTimeMillis(); //獲取開始時間
final ZKCachedSequenceHandler sequenceHandler = getInstance();
sequenceHandler.loadZK();
new Thread() {
public void run() {
long startTime2 = System.currentTimeMillis(); //獲取開始時間
for (int i = 0; i < 5000; i++) {
System.out.println("線程1 " + sequenceHandler.nextId(SequenceEnum.ACCOUNT));
}
long endTime2 = System.currentTimeMillis(); //獲取結束時間
System.out.println("程序運行時間1:" + (endTime2 - startTime2) + "ms");
}
}.start();
for (int i = 0; i < 5000; i++) {
System.out.println("線程2 " + sequenceHandler.nextId(SequenceEnum.ACCOUNT));
}
long endTime = System.currentTimeMillis(); //獲取結束時間
System.out.println("程序運行時間2:" + (endTime - startTime) + "ms");
}
}
能夠看到,因爲不須要進行過多的網絡消耗,緩存式的zk協調方案性能至關了得,生成10000個id僅需553ms(兩個線程耗時較長者) , 平均每一個id消耗0.05ms。
使用zk的永久sequence策略建立節點,並獲取返回值,而後刪除前一個節點,這樣既防止zk服務器存在過多的節點,又提升了效率;節點刪除採用線程池來統一處理,提升響應速度。
優勢:能建立連續遞增的ID。
關鍵實現代碼以下:
package com.zb.p2p.utils;
import com.zb.p2p.enums.SequenceEnum;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 基於zk的永久型自增節點PERSISTENT_SEQUENTIAL實現
* 每次生成節點後會使用線程池執行刪除節點任務
* Created by wangwanbin on 2017/9/5.
*/
public class ZKIncreaseSequenceHandler extends SequenceHandler implements PooledObjectFactory<CuratorFramework> {
protected static final Logger LOGGER = LoggerFactory.getLogger(ZKCachedSequenceHandler.class);
private static ZKIncreaseSequenceHandler instance = new ZKIncreaseSequenceHandler();
private static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);
private GenericObjectPool genericObjectPool;
private Queue<Long> preNodes = new ConcurrentLinkedQueue<>();
private static String ZK_ADDRESS = ""; //192.168.0.65
private static String PATH = "";// /sequence/p2p
private static String SEQ = "";//seq;
/**
* 私有化構造方法,單例模式
*/
private ZKIncreaseSequenceHandler() {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(4);
genericObjectPool = new GenericObjectPool(this, config);
}
/**
* 獲取sequence工具對象的惟一方法
*
* @return
*/
public static ZKIncreaseSequenceHandler getInstance(String zkAddress, String path, String seq) {
ZK_ADDRESS = zkAddress;
PATH = path;
SEQ = seq;
return instance;
}
@Override
public long nextId(final SequenceEnum sequenceEnum) {
String result = createNode(sequenceEnum.getCode());
final String idstr = result.substring((PATH + "/" + sequenceEnum.getCode() + "/" + SEQ).length());
final long id = Long.parseLong(idstr);
preNodes.add(id);
//刪除上一個節點
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
Iterator<Long> iterator = preNodes.iterator();
if (iterator.hasNext()) {
long preNode = iterator.next();
if (preNode < id) {
final String format = "%0" + idstr.length() + "d";
String preIdstr = String.format(format, preNode);
final String prePath = PATH + "/" + sequenceEnum.getCode() + "/" + SEQ + preIdstr;
CuratorFramework client = null;
try {
client = (CuratorFramework) genericObjectPool.borrowObject();
client.delete().forPath(prePath);
preNodes.remove(preNode);
} catch (Exception e) {
LOGGER.error("delete preNode error", e);
} finally {
if (client != null)
genericObjectPool.returnObject(client);
}
}
}
}
});
return id;
}
private String createNode(String prefixName) {
CuratorFramework client = null;
try {
client = (CuratorFramework) genericObjectPool.borrowObject();
String result = client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL)
.forPath(PATH + "/" + prefixName + "/" + SEQ, String.valueOf(0).getBytes());
return result;
} catch (Exception e) {
throw new RuntimeException("create zookeeper node error", e);
} finally {
if (client != null)
genericObjectPool.returnObject(client);
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(1);
long startTime = System.currentTimeMillis(); //獲取開始時間
final ZKIncreaseSequenceHandler sequenceHandler = ZKIncreaseSequenceHandler.getInstance("192.168.0.65", "/sequence/p2p", "seq");
int count = 10;
final CountDownLatch cd = new CountDownLatch(count);
for (int i = 0; i < count; i++) {
executorService.execute(new Runnable() {
public void run() {
System.out.printf("線程 %s %d \n", Thread.currentThread().getId(), sequenceHandler.nextId(SequenceEnum.ORDER));
cd.countDown();
}
});
}
try {
cd.await();
} catch (InterruptedException e) {
LOGGER.error("Interrupted thread",e);
Thread.currentThread().interrupt();
}
long endTime = System.currentTimeMillis(); //獲取結束時間
System.out.println("程序運行時間:" + (endTime - startTime) + "ms");
}
@Override
public PooledObject<CuratorFramework> makeObject() throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new ExponentialBackoffRetry(1000, 3));
client.start();
return new DefaultPooledObject<>(client);
}
@Override
public void destroyObject(PooledObject<CuratorFramework> p) throws Exception {
}
@Override
public boolean validateObject(PooledObject<CuratorFramework> p) {
return false;
}
@Override
public void activateObject(PooledObject<CuratorFramework> p) throws Exception {
}
@Override
public void passivateObject(PooledObject<CuratorFramework> p) throws Exception {
}
}
測試結果以下,生成10000個id消耗=9443ms(兩個線程耗時較長者), 平均每一個id消耗0.9ms。
這還只是單zk鏈接的狀況下,若是使用鏈接池來維護多個zk的鏈接,效率將成倍的提高。
分佈式ID生成器的實現有不少種。目前各方案也都各有特色。咱們能夠根據業務的具體要求,選擇實現合適的方案。
感謝你們看到這裏,文章有不足,歡迎你們指出;若是你以爲寫得不錯,那就給我一個贊吧。