snowflake 分佈式惟一ID生成器

本文來自個人github pages博客http://galengao.github.io/ 即www.gaohuirong.cnjava

摘要:python

  • 原文參考運維生存和開源中國上的代碼整理
  • 個人環境是python3.5,pip8.2的

1、python版本

前言mysql

因爲考慮到之後要動態切分數據,防止將不一樣表切分數據到同一個表中時出現主鍵相等的衝突狀況,這裏咱們使用一個全局ID生存器。重要的是他是自增的。
這邊我使用Snowflake的python實現版(pysnowflake)。固然你也可使用java實現版.
具體詳細信息:參考網址git

Snowflake的使用

  • 安裝 requests
pip install requests
 
  • 安裝 pysnowflake
pip install pysnowflake

 

  • 啓動pysnowflake服務

snowflake_start_server \
  --address=192.168.10.145 \
  --port=30001 \
  --dc=1 \
  --worker=1 \
  --log_file_prefix=/tmp/pysnowflask.log


# --address:本機的IP地址默認localhost這裏解釋一下參數意思(能夠經過--help來獲取):
# --dc:數據中心惟一標識符默認爲0
# --worker:工做者惟一標識符默認爲0
# --log_file_prefix:日誌文件所在位置
github

  • 使用示例(這邊引用官網的)
 
# 導入pysnowflake客戶端
>>> import snowflake.client
 
# 連接服務端並初始化一個pysnowflake客戶端
>>> host = '192.168.10.145'
>>> port = 30001
>>> snowflake.client.setup(host, port)
# 生成一個全局惟一的ID(在MySQL中能夠用BIGINT UNSIGNED對應)
>>> snowflake.client.get_guid()
3631957913783762945
# 查看當前狀態
>>> snowflake.client.get_stats()
{
  'dc': 1,
  'worker': 1,
  'timestamp': 1454126885629, # current timestamp for this worker
  'last_timestamp': 1454126890928, # the last timestamp that generated ID on
  'sequence': 1, # the sequence number for last timestamp
  'sequence_overload': 1, # the number of times that the sequence is overflow
  'errors': 1, # the number of times that clock went backward
}
 
  • 數據整理重建ID

重建ID是一個很龐大的工程,首先要很瞭解表的結構。否則,若是少更新了某個表的一列都會致使數據的不一致。
固然,若是你的表中有很強的外鍵以及設置了級聯那更新一個主鍵會更新其餘相關聯的外鍵。這裏我仍是不建議去依賴外鍵級聯更新來投機取巧畢竟若是有數據庫的設計在項目的里程碑中通過了n次變化,也不能確定設置的外鍵必定是級聯更新的。
在這邊我強烈建議重建ID時候講MySQL中的檢查外鍵的參數設置爲0。sql

SET FOREIGN_KEY_CHECKS=0;
 

小提示:其實理論上咱們是沒有必要重建ID的由於原來的ID已是惟一的了並且是整型,他兼容BIGINT。可是這裏我仍是作了重建,主要是由於之後的數據一致。而且若是有些人的ID不是整型的,而是有必定含義的那時候也確定須要作ID的重建。數據庫

  • 修改相關表ID的數據類型爲BIGINT
 
-- 修改商品表 goods_id 字段
ALTER TABLE goods_1
  MODIFY COLUMN goods_id BIGINT UNSIGNED NOT NULL 
  COMMENT '商品ID';
 
-- 修改出售訂單表 goods_id 字段
ALTER TABLE sell_order_1
  MODIFY COLUMN sell_order_id BIGINT UNSIGNED NOT NULL 
  COMMENT '出售訂單ID';
 
-- 修改購買訂單表 buy_order_id 字段
ALTER TABLE buy_order_1
  MODIFY COLUMN buy_order_id BIGINT UNSIGNED NOT NULL 
  COMMENT '出售訂單ID與出售訂單相等';
 
-- 修改訂單商品表 order_goods_id、orders_id、goods_id 字段
ALTER TABLE order_goods_1
  MODIFY COLUMN order_goods_id BIGINT UNSIGNED NOT NULL 
  COMMENT '訂單商品表ID';
ALTER TABLE order_goods_1
  MODIFY COLUMN sell_order_id BIGINT UNSIGNED NOT NULL 
  COMMENT '訂單ID';
ALTER TABLE order_goods_1
  MODIFY COLUMN goods_id BIGINT UNSIGNED NOT NULL 
  COMMENT '商品ID';
 
  • 使用python重建ID

使用的python 模塊:flask

模塊名 版本 備註
pysnowflake 0.1.3 全局ID生成器
mysql_connector_python 2.1.3 mysql python API

這邊只展現主程序:完整的程序在附件中都有ruby

 
if __name__=='__main__':
  # 設置默認的數據庫連接參數
  db_config = {
    'user'    : 'root',
    'password': 'root',
    'host'    : '127.0.0.1',
    'port'    : 3306,
    'database': 'test'
  }
  # 設置snowflake連接默認參數
  snowflake_config = {
    'host': '192.168.137.11',
    'port': 30001
  }
 
  rebuild = Rebuild()
  # 設置數據庫配置
  rebuild.set_db_config(db_config)
  # 設置snowflak配置
  rebuild.set_snowflake_config(snowflake_config)
  # 連接配置snowflak
  rebuild.setup_snowflake()
 
  # 生成數據庫連接和
  rebuild.get_conn_cursor()
 
  ##########################################################################
  ## 修改商品ID
  ##########################################################################
  # 得到商品的遊標
  goods_sql = '''
    SELECT goods_id FROM goods
  '''
  goods_iter = rebuild.execute_select_sql([goods_sql])
  # 根據得到的商品ID更新商品表(goods)和訂單商品表(order_goods)的商品ID 
  for goods in goods_iter:
    for (goods_id, ) in goods:
      rebuild.update_table_id('goods', 'goods_id', goods_id)
      rebuild.update_table_id('order_goods', 'goods_id', goods_id, rebuild.get_current_guid())
    rebuild.commit()
 
  ##########################################################################
  ## 修改訂單ID, 這邊咱們規定出售訂單ID和購買訂單ID相等
  ##########################################################################
  # 得到訂單的遊標
  orders_sql = '''
    SELECT sell_order_id FROM sell_order_1
  '''
  sell_order_iter = rebuild.execute_select_sql([orders_sql])
  # 根據出售訂單修改 出售訂單(sell_order_1)、購買訂單(buy_order_1)、訂單商品(order_goods)的出售訂單ID
  for sell_order_1 in sell_order_iter:
    for (sell_order_id, ) in sell_order_1:
      rebuild.update_table_id('sell_order_1', 'sell_order_id', sell_order_id)
      rebuild.update_table_id('buy_order_1', 'buy_order_id', sell_order_id, rebuild.get_current_guid())
      rebuild.update_table_id('order_goods', 'sell_order_id', sell_order_id, rebuild.get_current_guid())
    rebuild.commit()
 
  ##########################################################################
  ## 修改訂單商品表ID
  ##########################################################################
  # 得到訂單商品的遊標
  order_goods_sql = '''
    SELECT order_goods_id FROM order_goods
  '''
  order_goods_iter = rebuild.execute_select_sql([order_goods_sql])
  for order_goods in order_goods_iter:
    for (order_goods_id, ) in order_goods:
      rebuild.update_table_id('order_goods', 'order_goods_id', order_goods_id)
    rebuild.commit()
  # 關閉遊標
  rebuild.close_cursor('select')
  rebuild.close_cursor('dml')
  # 關閉鏈接
  rebuild.close_conn()
 

完整的python程序:rebuild_id.py
執行程序less

 
 
python rebuild_id.py
 
  • 最後查看錶的結果
 
SELECT * FROM goods LIMIT 0, 1;
+---------------------+------------+---------+----------+
| goods_id            | goods_name | price   | store_id |
+---------------------+------------+---------+----------+
| 3791337987775664129 | goods1     | 9369.00 |        1 |
+---------------------+------------+---------+----------+
SELECT * FROM sell_order_1 LIMIT 0, 1;
+---------------------+---------------+---------+---------+--------+
| sell_order_id       | user_guide_id | user_id | price   | status |
+---------------------+---------------+---------+---------+--------+
| 3791337998693437441 |             1 |      10 | 5320.00 |      1 |
+---------------------+---------------+---------+---------+--------+
SELECT * FROM buy_order_1 LIMIT 0, 1;
+---------------------+---------+---------------+
| buy_order_id        | user_id | user_guide_id |
+---------------------+---------+---------------+
| 3791337998693437441 |      10 |             1 |
+---------------------+---------+---------------+
SELECT * FROM order_goods LIMIT 0, 1;
+---------------------+---------------------+---------------------+---------------+---------+------+
| order_goods_id      | sell_order_id       | goods_id            | user_guide_id | price   | num  |
+---------------------+---------------------+---------------------+---------------+---------+------+
| 3792076554839789569 | 3792076377064214529 | 3792076372429508609 |             1 | 9744.00 |    2 |
+---------------------+---------------------+---------------------+---------------+---------+------+

 

建議:若是在生產上有使用到snowflake請務必要弄一個高可用防止單點故障,具體策略看大家本身定啦。

2、java版本

代碼一

 
/**
 * @author zhujuan
 * From: https://github.com/twitter/snowflake
 * An object that generates IDs.
 * This is broken into a separate class in case
 * we ever want to support multiple worker threads
 * per process
 */
public class IdWorker {
     
    protected static final Logger LOG = LoggerFactory.getLogger(IdWorker.class);
     
    private long workerId;
    private long datacenterId;
    private long sequence = 0L;
 
    private long twepoch = 1288834974657L;
 
    private long workerIdBits = 5L;
    private long datacenterIdBits = 5L;
    private long maxWorkerId = -1L ^ (-1L << workerIdBits);
    private long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
    private long sequenceBits = 12L;
 
    private long workerIdShift = sequenceBits;
    private long datacenterIdShift = sequenceBits + workerIdBits;
    private long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
    private long sequenceMask = -1L ^ (-1L << sequenceBits);
 
    private long lastTimestamp = -1L;
 
    public IdWorker(long workerId, long datacenterId) {
        // sanity check for workerId
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
        }
        this.workerId = workerId;
        this.datacenterId = datacenterId;
        LOG.info(String.format("worker starting. timestamp left shift %d, datacenter id bits %d, worker id bits %d, sequence bits %d, workerid %d", timestampLeftShift, datacenterIdBits, workerIdBits, sequenceBits, workerId));
    }
 
    public synchronized long nextId() {
        long timestamp = timeGen();
 
        if (timestamp < lastTimestamp) {
            LOG.error(String.format("clock is moving backwards.  Rejecting requests until %d.", lastTimestamp));
            throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }
 
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            if (sequence == 0) {
                timestamp = tilNextMillis(lastTimestamp);
            }
        } else {
            sequence = 0L;
        }
 
        lastTimestamp = timestamp;
 
        return ((timestamp - twepoch) << timestampLeftShift) | (datacenterId << datacenterIdShift) | (workerId << workerIdShift) | sequence;
    }
 
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }
 
    protected long timeGen() {
        return System.currentTimeMillis();
    }
}
 
 

代碼二

 
public class IdWorkerTest {
 
    static class IdWorkThread implements Runnable {
        private Set<Long> set;
        private IdWorker idWorker;
 
        public IdWorkThread(Set<Long> set, IdWorker idWorker) {
            this.set = set;
            this.idWorker = idWorker;
        }
 
        @Override
        public void run() {
            while (true) {
                long id = idWorker.nextId();
                if (!set.add(id)) {
                    System.out.println("duplicate:" + id);
                }
            }
        }
    }
 
    public static void main(String[] args) {
        Set<Long> set = new HashSet<Long>();
        final IdWorker idWorker1 = new IdWorker(0, 0);
        final IdWorker idWorker2 = new IdWorker(1, 0);
        Thread t1 = new Thread(new IdWorkThread(set, idWorker1));
        Thread t2 = new Thread(new IdWorkThread(set, idWorker2));
        t1.setDaemon(true);
        t2.setDaemon(true);
        t1.start();
        t2.start();
        try {
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
相關文章
相關標籤/搜索