Leaf—Segment分佈式ID生成系統(Golang實現版本)

Leaf-Segment

簡介:今天直接開門見山,先來介紹一下我今天所帶來的東西。沒錯,看標題想必你們已經想到了 —— Leaf-segment數據庫獲取 ID方案。這個方案已經喜聞樂見了,美團早就進行了開源,不過他是由 java來實現的,因此最近爲了學習這一方面知識,我用 go本身實現了一下,目前本身驗證是沒有發現什麼 bug,等待你們的檢驗,發現 bug可及時反饋(提mr或加我vx均可)。

代碼已收錄到個人我的倉庫——[go-算法系列(go-algorithm)](https://github.com/asong2020/...html

歡迎Star,感謝各位~~~。java

注:下文leaf-segment數據庫方案設計直接參考美團(摘取部分)。詳細請參閱:https://tech.meituan.com/2017...mysql

快速使用

建立數據庫

CREATE TABLE `leaf_alloc` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `biz_tag` varchar(128)  NOT NULL DEFAULT '',
  `max_id` bigint(20) NOT NULL DEFAULT '1',
  `step` int(11) NOT NULL,
  `description` varchar(256)  DEFAULT NULL,
  `update_time` bigint(20) NOT NULL DEFAULT '0',
  PRIMARY KEY (`id`),
  UNIQUE KEY (`biz_tag`)
) ENGINE=InnoDB;

也能夠直接使用我已經生成好的SQL文件(已在工程項目中)。各個字段的介紹我會在後文代碼實現部分進行解析,這裏就不一一解析了。git

獲取並運行項目

// 1 新建文件目錄
$ mdkir asong.cloud
$ cd asong.cloud
// 2 獲取項目
$ git clone git@github.com:asong2020/go-algorithm.git
// 3 進入項目目錄
$ cd go-go-algorithm/leaf
// 4 運行
$ go run main.go //運行

測試

建立業務號段

URI: POST http://localhost:8080/api/leaf
Param(json):
{
    "biz_tag": "test_create_one",
    "max_id":  1, // 能夠不傳 默認爲1
    "step": 2000, // 能夠不傳 默認爲200
    "descprition": "test api one"
}
  • 示例:
curl --location --request POST 'http://localhost:8080/api/leaf' \
--header 'Content-Type: application/json' \
--data-raw '{
    "biz_tag": "test_create_one",
    "descprition": "test api one"
}'

初始化DB中的號段到內存中

URI: PUT http://localhost:8080/api/leaf/init/cache
Param(json):
{
    "biz_tag": "test_create"
}
  • 示例
curl --location --request PUT 'http://localhost:8080/api/leaf/init/cache' \
--header 'Content-Type: application/json' \
--data-raw '{
    "biz_tag": "test_create"
}'

獲取ID

URI: GET http://localhost:8080/api/leaf
Param: 
?biz_tag=test_create
  • 示例
curl --location --request GET 'http://localhost:8080/api/leaf?biz_tag=test_create'

更新step

URI: PUT http://localhost:8080/api/leaf/step
Param(json):
{
    "step":   10000,
    "biz_tag": "test_create"
}
  • 示例
curl --location --request PUT 'http://localhost:8080/api/leaf/step' \
--header 'Content-Type: application/json' \
--data-raw '{
    "step": 10000,
    "biz_tag": "test_create"
}'

Leaf-Segment方案實現

背景

在複雜分佈式系統中,每每須要對大量的數據和消息進行惟一標識。一個可以生成全局惟一ID的系統是很是必要的。好比某寶,業務分佈普遍,這麼多業務對數據分庫分表後須要有一個惟一ID來標識一條數據或消息,數據庫的自增ID顯然不能知足需求;因此,咱們能夠總結一下業務系統對ID號的要求有哪些呢?github

  1. 全局惟一性:不能出現重複的ID號,既然是惟一標識,這是最基本的要求。
  2. 趨勢遞增:在MySQL InnoDB引擎中使用的是彙集索引,因爲多數RDBMS使用B-tree的數據結構來存儲索引數據,在主鍵的選擇上面咱們應該儘可能使用有序的主鍵保證寫入性能。
  3. 單調遞增:保證下一個ID必定大於上一個ID,例如事務版本號、IM增量消息、排序等特殊需求。
  4. 信息安全:若是ID是連續的,惡意用戶的扒取工做就很是容易作了,直接按照順序下載指定URL便可;若是是訂單號就更危險了,競對能夠直接知道咱們一天的單量。因此在一些應用場景下,會須要ID無規則、不規則。

上述123對應三類不一樣的場景,3和4需求仍是互斥的,沒法使用同一個方案知足。golang

本文只講述場景3的方案,即leaf-segment。場景4能夠用雪花算法實現,這個我以前實現過了,有興趣的童鞋能夠參考一下。傳送門:https://github.com/asong2020/...面試

數據庫生成

leaf-sement是在使用數據庫生成方案上作的改進。這裏先拋磚引玉一下,看一下數據庫生成方案是怎樣實現的。算法

以MySQL舉例,利用給字段設置auto_increment_incrementauto_increment_offset來保證ID自增,每次業務使用下列SQL讀寫MySQL獲得ID號。sql

begin;
REPLACE INTO Tickets64 (stub) VALUES ('a');
SELECT LAST_INSERT_ID();
commit;

這種方案的優缺點以下:shell

優勢:

  • 很是簡單,利用現有數據庫系統的功能實現,成本小,有DBA專業維護。
  • ID號單調自增,能夠實現一些對ID有特殊要求的業務。

缺點:

  • 強依賴DB,當DB異常時整個系統不可用,屬於致命問題。配置主從複製能夠儘量的增長可用性,可是數據一致性在特殊狀況下難以保證。主從切換時的不一致可能會致使重複發號。
  • ID發號性能瓶頸限制在單臺MySQL的讀寫性能。

對於MySQL性能問題,可用以下方案解決:在分佈式系統中咱們能夠多部署幾臺機器,每臺機器設置不一樣的初始值,且步長和機器數相等。好比有兩臺機器。設置步長step爲2,TicketServer1的初始值爲1(1,3,5,7,9,11…)、TicketServer2的初始值爲2(2,4,6,8,10…)。這是Flickr團隊在2010年撰文介紹的一種主鍵生成策略(Ticket Servers: Distributed Unique Primary Keys on the Cheap )。以下所示,爲了實現上述方案分別設置兩臺機器對應的參數,TicketServer1從1開始發號,TicketServer2從2開始發號,兩臺機器每次發號以後都遞增2。

TicketServer1:
auto-increment-increment = 2
auto-increment-offset = 1

TicketServer2:
auto-increment-increment = 2
auto-increment-offset = 2

假設咱們要部署N臺機器,步長需設置爲N,每臺的初始值依次爲0,1,2…N-1那麼整個架構就變成了以下圖所示:

這種架構貌似可以知足性能的需求,但有如下幾個缺點:

  • 系統水平擴展比較困難,好比定義好了步長和機器臺數以後,若是要添加機器該怎麼作?假設如今只有一臺機器發號是1,2,3,4,5(步長是1),這個時候須要擴容機器一臺。能夠這樣作:把第二臺機器的初始值設置得比第一臺超過不少,好比14(假設在擴容時間以內第一臺不可能發到14),同時設置步長爲2,那麼這臺機器下發的號碼都是14之後的偶數。而後摘掉第一臺,把ID值保留爲奇數,好比7,而後修改第一臺的步長爲2。讓它符合咱們定義的號段標準,對於這個例子來講就是讓第一臺之後只能產生奇數。擴容方案看起來複雜嗎?貌似還好,如今想象一下若是咱們線上有100臺機器,這個時候要擴容該怎麼作?簡直是噩夢。因此係統水平擴展方案複雜難以實現。
  • ID沒有了單調遞增的特性,只能趨勢遞增,這個缺點對於通常業務需求不是很重要,能夠容忍。
  • 數據庫壓力仍是很大,每次獲取ID都得讀寫一次數據庫,只能靠堆機器來提升性能

Leaf-Segment數據庫方案

Leaf-Segment數據庫方案是在上面的數據庫生成方案上作的改進。

作了以下改變: - 原方案每次獲取ID都得讀寫一次數據庫,形成數據庫壓力大。改成利用proxy server批量獲取,每次獲取一個segment(step決定大小)號段的值。用完以後再去數據庫獲取新的號段,能夠大大的減輕數據庫的壓力。 - 各個業務不一樣的發號需求用biz_tag字段來區分,每一個biz-tag的ID獲取相互隔離,互不影響。若是之後有性能需求須要對數據庫擴容,不須要上述描述的複雜的擴容操做,只須要對biz_tag分庫分表就行。

數據庫表設計以下:

CREATE TABLE `leaf_alloc` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `biz_tag` varchar(128)  NOT NULL DEFAULT '',
  `max_id` bigint(20) NOT NULL DEFAULT '1',
  `step` int(11) NOT NULL,
  `description` varchar(256)  DEFAULT NULL,
  `update_time` bigint(20) NOT NULL DEFAULT '0',
  PRIMARY KEY (`id`),
  UNIQUE KEY (`biz_tag`)
) ENGINE=InnoDB;

這裏我依舊使用了一個自增主鍵,不過沒什麼用,能夠忽略。biz_tag用來區分業務(因此我把它設置成了惟一索引),max_id表示該biz_tag目前所被分配的ID號段的最大值,step表示每次分配的號段長度。原來獲取ID每次都須要寫數據庫,如今只須要把step設置得足夠大,好比1000。那麼只有當1000個號被消耗完了以後纔會去從新讀寫一次數據庫。讀寫數據庫的頻率從1減少到了1/step,大體架構以下圖所示:

test_tag在第一臺Leaf機器上是1~1000的號段,當這個號段用完時,會去加載另外一個長度爲step=1000的號段,假設另外兩臺號段都沒有更新,這個時候第一臺機器新加載的號段就應該是3001~4000。同時數據庫對應的biz_tag這條數據的max_id會從3000被更新成4000,更新號段的SQL語句以下:

Begin
UPDATE table SET max_id=max_id+step WHERE biz_tag=xxx
SELECT tag, max_id, step FROM table WHERE biz_tag=xxx
Commit

這種模式有如下優缺點:

優勢:

  • Leaf服務能夠很方便的線性擴展,性能徹底可以支撐大多數業務場景。
  • ID號碼是趨勢遞增的8byte的64位數字,知足上述數據庫存儲的主鍵要求。
  • 容災性高:Leaf服務內部有號段緩存,即便DB宕機,短期內Leaf仍能正常對外提供服務。
  • 能夠自定義max_id的大小,很是方便業務從原有的ID方式上遷移過來。

缺點:

  • ID號碼不夠隨機,可以泄露發號數量的信息,不太安全。
  • TP999數據波動大,當號段使用完以後仍是會hang在更新數據庫的I/O上,tg999數據會出現偶爾的尖刺。
  • DB宕機會形成整個系統不可用。

雙buffer優化

對於第二個缺點,Leaf-segment作了一些優化,簡單的說就是:

Leaf 取號段的時機是在號段消耗完的時候進行的,也就意味着號段臨界點的ID下發時間取決於下一次從DB取回號段的時間,而且在這期間進來的請求也會由於DB號段沒有取回來,致使線程阻塞。若是請求DB的網絡和DB的性能穩定,這種狀況對系統的影響是不大的,可是假如取DB的時候網絡發生抖動,或者DB發生慢查詢就會致使整個系統的響應時間變慢。

爲此,咱們但願DB取號段的過程可以作到無阻塞,不須要在DB取號段的時候阻塞請求線程,即當號段消費到某個點時就異步的把下一個號段加載到內存中。而不須要等到號段用盡的時候纔去更新號段。這樣作就能夠很大程度上的下降系統的TP999指標。詳細實現以下圖所示:

採用雙buffer的方式,Leaf服務內部有兩個號段緩存區segment。當前號段已下發10%時,若是下一個號段未更新,則另啓一個更新線程去更新下一個號段。當前號段所有下發完後,若是下個號段準備好了則切換到下個號段爲當前segment接着下發,循環往復。

  • 每一個biz-tag都有消費速度監控,一般推薦segment長度設置爲服務高峯期發號QPS的600倍(10分鐘),這樣即便DB宕機,Leaf仍能持續發號10-20分鐘不受影響。
  • 每次請求來臨時都會判斷下個號段的狀態,從而更新此號段,因此偶爾的網絡抖動不會影響下個號段的更新。

代碼實現

終於到本文的重點了,下面就給你們講解一下我是怎麼實現的。

代碼架構

這裏先貼一下個人代碼架構,具體以下:

leaf
├── common -- common包,放的是一些client初始的代碼
├── conf   -- conf包,配置文件
├── config -- config包,讀取配置文件代碼部分
├── dao    -- dao包,DB操做部分
├── handler -- hanler包,路由註冊即API代碼實現部分
├── images -- 本文的圖片文件
├── model -- model包,db模型或其餘模型
├── service -- service包,邏輯實現部分
├── wire    -- wire包,依賴綁定
├── leaf_svr.go -- main運行先置條件
└── main.go -- main函數

實現分析

在咱們實現以前,確定要分析一波,咱們要作什麼,怎麼作?我老大常常跟我說的一句話:"先把需求分析明白了,再動手,返工反而是浪費時間"。

  1. 首先咱們要把一段號段存從DB中拿到存到內存中,而且可能同時存在不少業務,這裏我就想到用map來存,使用biz_tag來做爲key,由於它具備惟一性,而且能很快的定位。因此咱們能夠定義一個這樣的結構體做爲全局ID分發器:
// 全局分配器
// key: biz_tag value: SegmentBuffer
type LeafSeq struct {
    cache sync.Map
}

這裏考慮到併發操做,因此使用sync.map,由於他是併發安全的。

  1. 肯定了ID怎麼存,接下來咱們就要思考,咱們存什麼樣的結構比較合適。這裏我決定直接實現"雙buffer優化"的方案。這裏我準備本身定義struct,而後用切片來模擬雙buffer。因此能夠設計以下結構:
// 號段
type LeafSegment struct {
    Cursor uint64 // 當前發放位置
    Max    uint64 // 最大值
    Min    uint64 // 開始值即最小值
    InitOk bool   // 是否初始化成功
}

字段說明:首先要有一個字段來記錄當前數據發放到什麼位置了,Cursor就是來作這個的。其次咱們還要把範圍固定住,也是這個號段的開始和結束,也就是minmax字段的做用。最後咱們還要考慮一個問題,既然咱們使用的雙buffer,也就是說咱們並不能肯定下一段buffer是否可用,因此加了一個initOK字段來進行代表。

  1. 上面也設計好了號段怎麼存,接下來咱們設計怎麼線程安全的把這些id有序的發放出去,因此能夠設計以下結構:
type LeafAlloc struct {
    Key        string                 // 也就是`biz_tag`用來區分業務
    Step       int32                  // 記錄步長
    CurrentPos int32                  // 當前使用的 segment buffer光標; 總共兩個buffer緩存區,循環使用
    Buffer     []*LeafSegment         // 雙buffer 一個做爲預緩存做用
    UpdateTime time.Time              // 記錄更新時間 方便長時間不用進行清理,防止佔用內存
    mutex      sync.Mutex             // 互斥鎖
    IsPreload  bool                   // 是否正在預加載
    Waiting    map[string][]chan byte // 掛起等待
}

字段介紹:key也就是咱們的biz_tag,能夠用它快速從map中定義數據。Step記錄當前號段的步長,由於步長是能夠動態改變的,因此這裏須要記錄一下。currentPos這個徹底是記錄當前使用buffer,由於是雙buffer,因此須要定位。buffer這個不用介紹你們也應該知道,就是緩存池。Update_time這個字段大多人可能想不到爲何會有這個,咱們的號段如今都存到內存當中了,那麼當咱們的業務變多了之後,那麼內存就會越佔越多,因此咱們須要記錄他的更新時間,這樣咱們可使用一個定時器按期去清除長時間不使用的號段,節省內存。IsPreload這個字段是給預加載使用,當前緩衝區的號段使用了90%後,咱們就會去預加載下一段緩存池,爲了防止屢次重複加載,因此使用該字段作標識。waiting這裏我使用的是map+chan的結合,這裏的做用就是當咱們當前緩存池的號段消耗的比較快或者預加載失敗了,就會致使如今沒有緩存池可用,因此咱們能夠去等待一會,由於如今有可能正在作預加載,這樣能夠保持系統的高可用,若是超時仍未等到預加載成功則返回失敗,下一次調用便可。

基本思想就是這樣啦,下面咱們就分塊看一下代碼。

先從DB層走起

我這我的寫代碼,愛從DB層開始,也就是把我須要的CRUD操做都提早寫好並測試,這裏就不貼每段代碼的實現了,要不代碼量有點大,而且也沒有必要,直接介紹一下有哪些方法就能夠了。

func (l *LeafDB) Create(ctx context.Context, leaf *model.Leaf) error {}
func (l *LeafDB) Get(ctx context.Context, bizTag string, tx *sql.Tx) (*model.Leaf, error) {}
func (l *LeafDB) UpdateMaxID(ctx context.Context, bizTag string, tx *sql.Tx) error {}
func (l *LeafDB) UpdateMaxIdByCustomStep(ctx context.Context, step int32, bizTag string, tx *sql.Tx) error {}
func (l *LeafDB) GetAll(ctx context.Context) ([]*model.Leaf, error) {}
func (l *LeafDB) UpdateStep(ctx context.Context, step int32, bizTag string) error {
  1. 建立leaf方法。
  2. 獲取某個業務的號段
  3. 號段用完時是須要更新DB到下一個號段
  4. 根據自定義step更新DB到下一個號段
  5. 獲取DB中全部的業務號段
  6. 更新DB中step

實現獲取新號段部分的代碼

先貼出個人代碼:

func (l *LeafDao) NextSegment(ctx context.Context, bizTag string) (*model.Leaf, error) {
    // 開啓事務
    tx, err := l.sql.Begin()
    defer func() {
        if err != nil {
            l.rollback(tx)
        }
    }()
    if err = l.checkError(err); err != nil {
        return nil, err
    }
    err = l.db.UpdateMaxID(ctx, bizTag, tx)
    if err = l.checkError(err); err != nil {
        return nil, err
    }
    leaf, err := l.db.Get(ctx, bizTag, tx)
    if err = l.checkError(err); err != nil {
        return nil, err
    }
    // 提交事務
    err = tx.Commit()
    if err = l.checkError(err); err != nil {
        return nil, err
    }
    return leaf, nil
}

func (l *LeafDao) checkError(err error) error {
    if err == nil {
        return nil
    }
    if message, ok := err.(*mysql.MySQLError); ok {
        fmt.Printf("it's sql error; str:%v", message.Message)
    }
    return errors.New("db error")
}

func (l *LeafDao) rollback(tx *sql.Tx) {
    err := tx.Rollback()
    if err != sql.ErrTxDone && err != nil {
        fmt.Println("rollback error")
    }
}

實現其實很簡單,也就是先更新一下數據庫中的號段,而後再取出來就能夠了,這裏爲了保證數據的一致性和防止屢次更新DB致使號段丟失,因此使用了事務,沒有什麼特別的點,看一下代碼就能懂了。

初始化及獲取ID

這裏我把DB中的號段初始化到內存這一步和獲取ID合到一塊兒來講吧,由於在獲取ID時會有兜底策略進行初始化。先看初始化部分代碼:

// 第一次使用要初始化也就是把DB中的數據存到內存中,非必須操做,直接使用的話有兜底策略
func (l *LeafService) InitCache(ctx context.Context, bizTag string) (*model.LeafAlloc, error) {
    leaf, err := l.dao.NextSegment(ctx, bizTag)
    if err != nil {
        fmt.Printf("initCache failed; err:%v\n", err)
        return nil, err
    }
    alloc := model.NewLeafAlloc(leaf)
    alloc.Buffer = append(alloc.Buffer, model.NewLeafSegment(leaf))

    _ = l.leafSeq.Add(alloc)
    return alloc, nil
}

這裏步驟主要分兩步:

  1. 從DB中獲取當前業務的號段
  2. 保存到內存中,也就是存到map

以後是咱們來看一下咱們是如何獲取id的,這裏步驟比較多了,也是最核心的地方了。

先看主流程:

func (l *LeafService) GetID(ctx context.Context, bizTag string) (uint64, error) {
    // 先去內存中看一下是否已經初始了,未初始化則開啓兜底策略初始化一下。
    l.mutex.Lock()
    var err error
    seqs := l.leafSeq.Get(bizTag)
    if seqs == nil {
        // 不存在初始化一下
        seqs, err = l.InitCache(ctx, bizTag)
        if err != nil {
            return 0, err
        }
    }
    l.mutex.Unlock()

    var id uint64
    id, err = l.NextID(seqs)
    if err != nil {
        return 0, err
    }
    l.leafSeq.Update(bizTag, seqs)

    return id, nil
}

主要分爲三步:

  1. 先去內存中查看該業務是否已經初始化了,未初始化則開啓兜底策略進行初始化。
  2. 獲取id
  3. 更新內存中的數據。

這裏最終要的就是第二步,獲取id,這裏我先把代碼貼出來,而後細細講解一下:

func (l *LeafService) NextID(current *model.LeafAlloc) (uint64, error) {
    current.Lock()
    defer current.Unlock()
    var id uint64
    currentBuffer := current.Buffer[current.CurrentPos]
    // 判斷當前buffer是不是可用的
    if current.HasSeq() {
        id = atomic.AddUint64(&current.Buffer[current.CurrentPos].Cursor, 1)
        current.UpdateTime = time.Now()
    }

    // 當前號段已下發10%時,若是下一個號段未更新加載,則另啓一個更新線程去更新下一個號段
    if currentBuffer.Max-id < uint64(0.9*float32(current.Step)) && len(current.Buffer) <= 1 && !current.IsPreload {
        current.IsPreload = true
        cancel, _ := context.WithTimeout(context.Background(), 3*time.Second)
        go l.PreloadBuffer(cancel, current.Key, current)
    }

    // 第一個buffer的segment使用完成 切換到下一個buffer 並移除如今的buffer
    if id == currentBuffer.Max {
        // 判斷第二個buffer是否準備好了(由於上面開啓協程去更新下一個號段會出現失敗),準備好了切換  currentPos 永遠是0 無論怎麼切換
        if len(current.Buffer) > 1 && current.Buffer[current.CurrentPos+1].InitOk {
            current.Buffer = append(current.Buffer[:0], current.Buffer[1:]...)
        }
        // 若是沒準備好,直接返回就行了,由於如今已經分配id了, 後面會進行補償
    }
    // 有id直接返回就能夠了
    if current.HasID(id) {
        return id, nil
    }

    // 當前buffer已經沒有id可用了,此時補償線程必定正在運行,咱們等待一會
    waitChan := make(chan byte, 1)
    current.Waiting[current.Key] = append(current.Waiting[current.Key], waitChan)
    // 釋放鎖 等待讓其餘客戶端進行走前面的步驟
    current.Unlock()

    timer := time.NewTimer(500 * time.Millisecond) // 等待500ms最多
    select {
    case <-waitChan:
    case <-timer.C:
    }

    current.Lock()
    // 第二個緩衝區仍未初始化好
    if len(current.Buffer) <= 1 {
        return 0, errors.New("get id failed")
    }
    // 切換buffer
    current.Buffer = append(current.Buffer[:0], current.Buffer[1:]...)
    if current.HasSeq() {
        id = atomic.AddUint64(&current.Buffer[current.CurrentPos].Cursor, 1)
        current.UpdateTime = time.Now()
    }
    return id, nil

}

這裏我以爲用文字描述不清楚,因此我畫了個圖,不知道大家能不能看懂,能夠對照着代碼來看,這樣是最清晰的,有問題歡迎留言討論:

預加載的流程也被我畫上去了,預加載的步驟主要有三個:

  1. 獲取某業務下一個階段的號段
  2. 存儲到緩存buffer中,留着備用.
  3. 喚醒當前正在掛起等待的客戶端

代碼實現以下:

func (l *LeafService) PreloadBuffer(ctx context.Context, bizTag string, current *model.LeafAlloc) error {
    for i := 0; i < MAXRETRY; i++ {
        leaf, err := l.dao.NextSegment(ctx, bizTag)
        if err != nil {
            fmt.Printf("preloadBuffer failed; bizTag:%s;err:%v", bizTag, err)
            continue
        }
        segment := model.NewLeafSegment(leaf)
        current.Buffer = append(current.Buffer, segment) // 追加
        l.leafSeq.Update(bizTag, current)
        current.Wakeup()
        break
    }
    current.IsPreload = false
    return nil
}
func (l *LeafAlloc) Wakeup() {
    l.mutex.Lock()
    defer l.mutex.Unlock()
    for _, waitChan := range l.Waiting[l.Key] {
        close(waitChan)
    }
    l.Waiting[l.Key] = l.Waiting[l.Key][:0]
}

緩存清理

到這裏,全部核心代碼都已經實現了,還差最後一步,就是清緩存,就像上面說到的,超長時間不用的號段咱們就要清除它,不能讓他堆積形成內存浪費。這裏我實現的是清理超過15min未使用的號段。實現也很簡單,就是使用timer作一個定時器,每隔15min就去遍歷存儲號段的`map,把超過15min未更新的號段清除掉(雖然會形成號段浪費,但也要這要作)。

// 清理超過15min沒用過的內存
func (l *LeafSeq) clear() {
    for {
        now := time.Now()
        // 15分鐘後
        mm, _ := time.ParseDuration("15m")
        next := now.Add(mm)
        next = time.Date(next.Year(), next.Month(), next.Day(), next.Hour(), next.Minute(), 0, 0, next.Location())
        t := time.NewTimer(next.Sub(now))
        <-t.C
        fmt.Println("start clear goroutine")
        l.cache.Range(func(key, value interface{}) bool {
            alloc := value.(*LeafAlloc)
            if next.Sub(alloc.UpdateTime) > ExpiredTime {
                fmt.Printf("clear biz_tag: %s cache", key)
                l.cache.Delete(key)
            }
            return true
        })
    }
}

總結

好啦,到這裏就是接近尾聲了,上面就是我實現的整個過程,目前本身測試沒有什麼問題,後期還會在縫縫補補,你們也能夠幫我找找問題,歡迎提出大家寶貴的建議~~~。

代碼已收錄到個人我的倉庫——[go-算法系列(go-algorithm)](https://github.com/asong2020/...

歡迎Star,感謝各位~~~。

好啦,這一篇文章到這就結束了,咱們下期見~~。但願對大家有用,又不對的地方歡迎指出,可添加個人golang交流羣,咱們一塊兒學習交流。

結尾給你們發一個小福利吧,最近我在看[微服務架構設計模式]這一本書,講的很好,本身也收集了一本PDF,有須要的小夥能夠到自行下載。獲取方式:關注公衆號:[Golang夢工廠],後臺回覆:[微服務],便可獲取。

我翻譯了一份GIN中文文檔,會按期進行維護,有須要的小夥伴後臺回覆[gin]便可下載。

翻譯了一份Machinery中文文檔,會按期進行維護,有須要的小夥伴們後臺回覆[machinery]便可獲取。

我是asong,一名普普統統的程序猿,讓gi我一塊兒慢慢變強吧。我本身建了一個golang交流羣,有須要的小夥伴加我vx,我拉你入羣。歡迎各位的關注,咱們下期見~~~

推薦往期文章:

相關文章
相關標籤/搜索