簡介:今天直接開門見山,先來介紹一下我今天所帶來的東西。沒錯,看標題想必你們已經想到了 —— Leaf-segment數據庫獲取ID
方案。這個方案已經喜聞樂見了,美團早就進行了開源,不過他是由java
來實現的,因此最近爲了學習這一方面知識,我用go
本身實現了一下,目前本身驗證是沒有發現什麼bug
,等待你們的檢驗,發現bug
可及時反饋(提mr或加我vx均可)。代碼已收錄到個人我的倉庫——[go-算法系列(go-algorithm)](https://github.com/asong2020/...。html
歡迎
Star
,感謝各位~~~。java注:下文
leaf-segment
數據庫方案設計直接參考美團(摘取部分)。詳細請參閱:https://tech.meituan.com/2017...mysql
CREATE TABLE `leaf_alloc` ( `id` int(11) NOT NULL AUTO_INCREMENT, `biz_tag` varchar(128) NOT NULL DEFAULT '', `max_id` bigint(20) NOT NULL DEFAULT '1', `step` int(11) NOT NULL, `description` varchar(256) DEFAULT NULL, `update_time` bigint(20) NOT NULL DEFAULT '0', PRIMARY KEY (`id`), UNIQUE KEY (`biz_tag`) ) ENGINE=InnoDB;
也能夠直接使用我已經生成好的SQL
文件(已在工程項目中)。各個字段的介紹我會在後文代碼實現部分進行解析,這裏就不一一解析了。git
// 1 新建文件目錄 $ mdkir asong.cloud $ cd asong.cloud // 2 獲取項目 $ git clone git@github.com:asong2020/go-algorithm.git // 3 進入項目目錄 $ cd go-go-algorithm/leaf // 4 運行 $ go run main.go //運行
URI: POST http://localhost:8080/api/leaf Param(json): { "biz_tag": "test_create_one", "max_id": 1, // 能夠不傳 默認爲1 "step": 2000, // 能夠不傳 默認爲200 "descprition": "test api one" }
curl --location --request POST 'http://localhost:8080/api/leaf' \ --header 'Content-Type: application/json' \ --data-raw '{ "biz_tag": "test_create_one", "descprition": "test api one" }'
URI: PUT http://localhost:8080/api/leaf/init/cache Param(json): { "biz_tag": "test_create" }
curl --location --request PUT 'http://localhost:8080/api/leaf/init/cache' \ --header 'Content-Type: application/json' \ --data-raw '{ "biz_tag": "test_create" }'
URI: GET http://localhost:8080/api/leaf Param: ?biz_tag=test_create
curl --location --request GET 'http://localhost:8080/api/leaf?biz_tag=test_create'
step
URI: PUT http://localhost:8080/api/leaf/step Param(json): { "step": 10000, "biz_tag": "test_create" }
curl --location --request PUT 'http://localhost:8080/api/leaf/step' \ --header 'Content-Type: application/json' \ --data-raw '{ "step": 10000, "biz_tag": "test_create" }'
在複雜分佈式系統中,每每須要對大量的數據和消息進行惟一標識。一個可以生成全局惟一ID的系統是很是必要的。好比某寶,業務分佈普遍,這麼多業務對數據分庫分表後須要有一個惟一ID來標識一條數據或消息,數據庫的自增ID顯然不能知足需求;因此,咱們能夠總結一下業務系統對ID號的要求有哪些呢?github
上述123對應三類不一樣的場景,3和4需求仍是互斥的,沒法使用同一個方案知足。golang
本文只講述場景3的方案,即leaf-segment
。場景4能夠用雪花算法
實現,這個我以前實現過了,有興趣的童鞋能夠參考一下。傳送門:https://github.com/asong2020/...面試
leaf-sement
是在使用數據庫生成方案上作的改進。這裏先拋磚引玉一下,看一下數據庫生成方案是怎樣實現的。算法
以MySQL舉例,利用給字段設置auto_increment_increment
和auto_increment_offset
來保證ID自增,每次業務使用下列SQL讀寫MySQL獲得ID號。sql
begin; REPLACE INTO Tickets64 (stub) VALUES ('a'); SELECT LAST_INSERT_ID(); commit;
這種方案的優缺點以下:shell
優勢:
缺點:
對於MySQL性能問題,可用以下方案解決:在分佈式系統中咱們能夠多部署幾臺機器,每臺機器設置不一樣的初始值,且步長和機器數相等。好比有兩臺機器。設置步長step爲2,TicketServer1的初始值爲1(1,3,5,7,9,11…)、TicketServer2的初始值爲2(2,4,6,8,10…)。這是Flickr團隊在2010年撰文介紹的一種主鍵生成策略(Ticket Servers: Distributed Unique Primary Keys on the Cheap )。以下所示,爲了實現上述方案分別設置兩臺機器對應的參數,TicketServer1從1開始發號,TicketServer2從2開始發號,兩臺機器每次發號以後都遞增2。
TicketServer1: auto-increment-increment = 2 auto-increment-offset = 1 TicketServer2: auto-increment-increment = 2 auto-increment-offset = 2
假設咱們要部署N臺機器,步長需設置爲N,每臺的初始值依次爲0,1,2…N-1那麼整個架構就變成了以下圖所示:
這種架構貌似可以知足性能的需求,但有如下幾個缺點:
Leaf-Segment
數據庫方案Leaf-Segment
數據庫方案是在上面的數據庫生成方案上作的改進。
作了以下改變: - 原方案每次獲取ID都得讀寫一次數據庫,形成數據庫壓力大。改成利用proxy server批量獲取,每次獲取一個segment(step決定大小)號段的值。用完以後再去數據庫獲取新的號段,能夠大大的減輕數據庫的壓力。 - 各個業務不一樣的發號需求用biz_tag字段來區分,每一個biz-tag的ID獲取相互隔離,互不影響。若是之後有性能需求須要對數據庫擴容,不須要上述描述的複雜的擴容操做,只須要對biz_tag分庫分表就行。
數據庫表設計以下:
CREATE TABLE `leaf_alloc` ( `id` int(11) NOT NULL AUTO_INCREMENT, `biz_tag` varchar(128) NOT NULL DEFAULT '', `max_id` bigint(20) NOT NULL DEFAULT '1', `step` int(11) NOT NULL, `description` varchar(256) DEFAULT NULL, `update_time` bigint(20) NOT NULL DEFAULT '0', PRIMARY KEY (`id`), UNIQUE KEY (`biz_tag`) ) ENGINE=InnoDB;
這裏我依舊使用了一個自增主鍵,不過沒什麼用,能夠忽略。biz_tag用來區分業務(因此我把它設置成了惟一索引),max_id表示該biz_tag目前所被分配的ID號段的最大值,step表示每次分配的號段長度。原來獲取ID每次都須要寫數據庫,如今只須要把step設置得足夠大,好比1000。那麼只有當1000個號被消耗完了以後纔會去從新讀寫一次數據庫。讀寫數據庫的頻率從1減少到了1/step,大體架構以下圖所示:
test_tag在第一臺Leaf機器上是1~1000的號段,當這個號段用完時,會去加載另外一個長度爲step=1000的號段,假設另外兩臺號段都沒有更新,這個時候第一臺機器新加載的號段就應該是3001~4000。同時數據庫對應的biz_tag這條數據的max_id會從3000被更新成4000,更新號段的SQL語句以下:
Begin UPDATE table SET max_id=max_id+step WHERE biz_tag=xxx SELECT tag, max_id, step FROM table WHERE biz_tag=xxx Commit
這種模式有如下優缺點:
優勢:
缺點:
對於第二個缺點,Leaf-segment作了一些優化,簡單的說就是:
Leaf 取號段的時機是在號段消耗完的時候進行的,也就意味着號段臨界點的ID下發時間取決於下一次從DB取回號段的時間,而且在這期間進來的請求也會由於DB號段沒有取回來,致使線程阻塞。若是請求DB的網絡和DB的性能穩定,這種狀況對系統的影響是不大的,可是假如取DB的時候網絡發生抖動,或者DB發生慢查詢就會致使整個系統的響應時間變慢。
爲此,咱們但願DB取號段的過程可以作到無阻塞,不須要在DB取號段的時候阻塞請求線程,即當號段消費到某個點時就異步的把下一個號段加載到內存中。而不須要等到號段用盡的時候纔去更新號段。這樣作就能夠很大程度上的下降系統的TP999指標。詳細實現以下圖所示:
採用雙buffer的方式,Leaf服務內部有兩個號段緩存區segment。當前號段已下發10%時,若是下一個號段未更新,則另啓一個更新線程去更新下一個號段。當前號段所有下發完後,若是下個號段準備好了則切換到下個號段爲當前segment接着下發,循環往復。
終於到本文的重點了,下面就給你們講解一下我是怎麼實現的。
這裏先貼一下個人代碼架構,具體以下:
leaf ├── common -- common包,放的是一些client初始的代碼 ├── conf -- conf包,配置文件 ├── config -- config包,讀取配置文件代碼部分 ├── dao -- dao包,DB操做部分 ├── handler -- hanler包,路由註冊即API代碼實現部分 ├── images -- 本文的圖片文件 ├── model -- model包,db模型或其餘模型 ├── service -- service包,邏輯實現部分 ├── wire -- wire包,依賴綁定 ├── leaf_svr.go -- main運行先置條件 └── main.go -- main函數
在咱們實現以前,確定要分析一波,咱們要作什麼,怎麼作?我老大常常跟我說的一句話:"先把需求分析明白了,再動手,返工反而是浪費時間"。
map
來存,使用biz_tag
來做爲key
,由於它具備惟一性,而且能很快的定位。因此咱們能夠定義一個這樣的結構體做爲全局ID分發器:// 全局分配器 // key: biz_tag value: SegmentBuffer type LeafSeq struct { cache sync.Map }
這裏考慮到併發操做,因此使用sync.map
,由於他是併發安全的。
ID
怎麼存,接下來咱們就要思考,咱們存什麼樣的結構比較合適。這裏我決定直接實現"雙buffer優化"的方案。這裏我準備本身定義struct
,而後用切片來模擬雙buffer。因此能夠設計以下結構:// 號段 type LeafSegment struct { Cursor uint64 // 當前發放位置 Max uint64 // 最大值 Min uint64 // 開始值即最小值 InitOk bool // 是否初始化成功 }
字段說明:首先要有一個字段來記錄當前數據發放到什麼位置了,Cursor
就是來作這個的。其次咱們還要把範圍固定住,也是這個號段的開始和結束,也就是min
、max
字段的做用。最後咱們還要考慮一個問題,既然咱們使用的雙buffer
,也就是說咱們並不能肯定下一段buffer
是否可用,因此加了一個initOK
字段來進行代表。
id
有序的發放出去,因此能夠設計以下結構:type LeafAlloc struct { Key string // 也就是`biz_tag`用來區分業務 Step int32 // 記錄步長 CurrentPos int32 // 當前使用的 segment buffer光標; 總共兩個buffer緩存區,循環使用 Buffer []*LeafSegment // 雙buffer 一個做爲預緩存做用 UpdateTime time.Time // 記錄更新時間 方便長時間不用進行清理,防止佔用內存 mutex sync.Mutex // 互斥鎖 IsPreload bool // 是否正在預加載 Waiting map[string][]chan byte // 掛起等待 }
字段介紹:key
也就是咱們的biz_tag
,能夠用它快速從map
中定義數據。Step
記錄當前號段的步長,由於步長是能夠動態改變的,因此這裏須要記錄一下。currentPos
這個徹底是記錄當前使用buffer
,由於是雙buffer
,因此須要定位。buffer
這個不用介紹你們也應該知道,就是緩存池。Update_time
這個字段大多人可能想不到爲何會有這個,咱們的號段如今都存到內存當中了,那麼當咱們的業務變多了之後,那麼內存就會越佔越多,因此咱們須要記錄他的更新時間,這樣咱們可使用一個定時器按期去清除長時間不使用的號段,節省內存。IsPreload
這個字段是給預加載使用,當前緩衝區的號段使用了90%後,咱們就會去預加載下一段緩存池,爲了防止屢次重複加載,因此使用該字段作標識。waiting
這裏我使用的是map+chan
的結合,這裏的做用就是當咱們當前緩存池的號段消耗的比較快或者預加載失敗了,就會致使如今沒有緩存池可用,因此咱們能夠去等待一會,由於如今有可能正在作預加載,這樣能夠保持系統的高可用,若是超時仍未等到預加載成功則返回失敗,下一次調用便可。
基本思想就是這樣啦,下面咱們就分塊看一下代碼。
我這我的寫代碼,愛從DB
層開始,也就是把我須要的CRUD
操做都提早寫好並測試,這裏就不貼每段代碼的實現了,要不代碼量有點大,而且也沒有必要,直接介紹一下有哪些方法就能夠了。
func (l *LeafDB) Create(ctx context.Context, leaf *model.Leaf) error {} func (l *LeafDB) Get(ctx context.Context, bizTag string, tx *sql.Tx) (*model.Leaf, error) {} func (l *LeafDB) UpdateMaxID(ctx context.Context, bizTag string, tx *sql.Tx) error {} func (l *LeafDB) UpdateMaxIdByCustomStep(ctx context.Context, step int32, bizTag string, tx *sql.Tx) error {} func (l *LeafDB) GetAll(ctx context.Context) ([]*model.Leaf, error) {} func (l *LeafDB) UpdateStep(ctx context.Context, step int32, bizTag string) error {
leaf
方法。step
更新DB到下一個號段step
先貼出個人代碼:
func (l *LeafDao) NextSegment(ctx context.Context, bizTag string) (*model.Leaf, error) { // 開啓事務 tx, err := l.sql.Begin() defer func() { if err != nil { l.rollback(tx) } }() if err = l.checkError(err); err != nil { return nil, err } err = l.db.UpdateMaxID(ctx, bizTag, tx) if err = l.checkError(err); err != nil { return nil, err } leaf, err := l.db.Get(ctx, bizTag, tx) if err = l.checkError(err); err != nil { return nil, err } // 提交事務 err = tx.Commit() if err = l.checkError(err); err != nil { return nil, err } return leaf, nil } func (l *LeafDao) checkError(err error) error { if err == nil { return nil } if message, ok := err.(*mysql.MySQLError); ok { fmt.Printf("it's sql error; str:%v", message.Message) } return errors.New("db error") } func (l *LeafDao) rollback(tx *sql.Tx) { err := tx.Rollback() if err != sql.ErrTxDone && err != nil { fmt.Println("rollback error") } }
實現其實很簡單,也就是先更新一下數據庫中的號段,而後再取出來就能夠了,這裏爲了保證數據的一致性和防止屢次更新DB致使號段丟失,因此使用了事務,沒有什麼特別的點,看一下代碼就能懂了。
這裏我把DB中的號段初始化到內存這一步和獲取ID
合到一塊兒來講吧,由於在獲取ID時會有兜底策略進行初始化。先看初始化部分代碼:
// 第一次使用要初始化也就是把DB中的數據存到內存中,非必須操做,直接使用的話有兜底策略 func (l *LeafService) InitCache(ctx context.Context, bizTag string) (*model.LeafAlloc, error) { leaf, err := l.dao.NextSegment(ctx, bizTag) if err != nil { fmt.Printf("initCache failed; err:%v\n", err) return nil, err } alloc := model.NewLeafAlloc(leaf) alloc.Buffer = append(alloc.Buffer, model.NewLeafSegment(leaf)) _ = l.leafSeq.Add(alloc) return alloc, nil }
這裏步驟主要分兩步:
map
以後是咱們來看一下咱們是如何獲取id
的,這裏步驟比較多了,也是最核心的地方了。
先看主流程:
func (l *LeafService) GetID(ctx context.Context, bizTag string) (uint64, error) { // 先去內存中看一下是否已經初始了,未初始化則開啓兜底策略初始化一下。 l.mutex.Lock() var err error seqs := l.leafSeq.Get(bizTag) if seqs == nil { // 不存在初始化一下 seqs, err = l.InitCache(ctx, bizTag) if err != nil { return 0, err } } l.mutex.Unlock() var id uint64 id, err = l.NextID(seqs) if err != nil { return 0, err } l.leafSeq.Update(bizTag, seqs) return id, nil }
主要分爲三步:
id
這裏最終要的就是第二步,獲取id
,這裏我先把代碼貼出來,而後細細講解一下:
func (l *LeafService) NextID(current *model.LeafAlloc) (uint64, error) { current.Lock() defer current.Unlock() var id uint64 currentBuffer := current.Buffer[current.CurrentPos] // 判斷當前buffer是不是可用的 if current.HasSeq() { id = atomic.AddUint64(¤t.Buffer[current.CurrentPos].Cursor, 1) current.UpdateTime = time.Now() } // 當前號段已下發10%時,若是下一個號段未更新加載,則另啓一個更新線程去更新下一個號段 if currentBuffer.Max-id < uint64(0.9*float32(current.Step)) && len(current.Buffer) <= 1 && !current.IsPreload { current.IsPreload = true cancel, _ := context.WithTimeout(context.Background(), 3*time.Second) go l.PreloadBuffer(cancel, current.Key, current) } // 第一個buffer的segment使用完成 切換到下一個buffer 並移除如今的buffer if id == currentBuffer.Max { // 判斷第二個buffer是否準備好了(由於上面開啓協程去更新下一個號段會出現失敗),準備好了切換 currentPos 永遠是0 無論怎麼切換 if len(current.Buffer) > 1 && current.Buffer[current.CurrentPos+1].InitOk { current.Buffer = append(current.Buffer[:0], current.Buffer[1:]...) } // 若是沒準備好,直接返回就行了,由於如今已經分配id了, 後面會進行補償 } // 有id直接返回就能夠了 if current.HasID(id) { return id, nil } // 當前buffer已經沒有id可用了,此時補償線程必定正在運行,咱們等待一會 waitChan := make(chan byte, 1) current.Waiting[current.Key] = append(current.Waiting[current.Key], waitChan) // 釋放鎖 等待讓其餘客戶端進行走前面的步驟 current.Unlock() timer := time.NewTimer(500 * time.Millisecond) // 等待500ms最多 select { case <-waitChan: case <-timer.C: } current.Lock() // 第二個緩衝區仍未初始化好 if len(current.Buffer) <= 1 { return 0, errors.New("get id failed") } // 切換buffer current.Buffer = append(current.Buffer[:0], current.Buffer[1:]...) if current.HasSeq() { id = atomic.AddUint64(¤t.Buffer[current.CurrentPos].Cursor, 1) current.UpdateTime = time.Now() } return id, nil }
這裏我以爲用文字描述不清楚,因此我畫了個圖,不知道大家能不能看懂,能夠對照着代碼來看,這樣是最清晰的,有問題歡迎留言討論:
預加載的流程也被我畫上去了,預加載的步驟主要有三個:
buffer
中,留着備用.代碼實現以下:
func (l *LeafService) PreloadBuffer(ctx context.Context, bizTag string, current *model.LeafAlloc) error { for i := 0; i < MAXRETRY; i++ { leaf, err := l.dao.NextSegment(ctx, bizTag) if err != nil { fmt.Printf("preloadBuffer failed; bizTag:%s;err:%v", bizTag, err) continue } segment := model.NewLeafSegment(leaf) current.Buffer = append(current.Buffer, segment) // 追加 l.leafSeq.Update(bizTag, current) current.Wakeup() break } current.IsPreload = false return nil } func (l *LeafAlloc) Wakeup() { l.mutex.Lock() defer l.mutex.Unlock() for _, waitChan := range l.Waiting[l.Key] { close(waitChan) } l.Waiting[l.Key] = l.Waiting[l.Key][:0] }
到這裏,全部核心代碼都已經實現了,還差最後一步,就是清緩存,就像上面說到的,超長時間不用的號段咱們就要清除它,不能讓他堆積形成內存浪費。這裏我實現的是清理超過15min未使用的號段。實現也很簡單,就是使用timer
作一個定時器,每隔15min就去遍歷存儲號段的`map,把超過15min未更新的號段清除掉(雖然會形成號段浪費,但也要這要作)。
// 清理超過15min沒用過的內存 func (l *LeafSeq) clear() { for { now := time.Now() // 15分鐘後 mm, _ := time.ParseDuration("15m") next := now.Add(mm) next = time.Date(next.Year(), next.Month(), next.Day(), next.Hour(), next.Minute(), 0, 0, next.Location()) t := time.NewTimer(next.Sub(now)) <-t.C fmt.Println("start clear goroutine") l.cache.Range(func(key, value interface{}) bool { alloc := value.(*LeafAlloc) if next.Sub(alloc.UpdateTime) > ExpiredTime { fmt.Printf("clear biz_tag: %s cache", key) l.cache.Delete(key) } return true }) } }
好啦,到這裏就是接近尾聲了,上面就是我實現的整個過程,目前本身測試沒有什麼問題,後期還會在縫縫補補,你們也能夠幫我找找問題,歡迎提出大家寶貴的建議~~~。
代碼已收錄到個人我的倉庫——[go-算法系列(go-algorithm)](https://github.com/asong2020/...。
歡迎Star
,感謝各位~~~。
好啦,這一篇文章到這就結束了,咱們下期見~~。但願對大家有用,又不對的地方歡迎指出,可添加個人golang交流羣,咱們一塊兒學習交流。
結尾給你們發一個小福利吧,最近我在看[微服務架構設計模式]這一本書,講的很好,本身也收集了一本PDF,有須要的小夥能夠到自行下載。獲取方式:關注公衆號:[Golang夢工廠],後臺回覆:[微服務],便可獲取。
我翻譯了一份GIN中文文檔,會按期進行維護,有須要的小夥伴後臺回覆[gin]便可下載。
翻譯了一份Machinery中文文檔,會按期進行維護,有須要的小夥伴們後臺回覆[machinery]便可獲取。
我是asong,一名普普統統的程序猿,讓gi我一塊兒慢慢變強吧。我本身建了一個golang
交流羣,有須要的小夥伴加我vx
,我拉你入羣。歡迎各位的關注,咱們下期見~~~
推薦往期文章: