使用Golang的singleflight防止緩存擊穿

在使用緩存時,容易發生緩存擊穿。

緩存擊穿:一個存在的key,在緩存過時的瞬間,同時有大量的請求過來,形成全部請求都去讀dB,這些請求都會擊穿到DB,形成瞬時DB請求量大、壓力驟增。golang

singleflight

介紹

import "golang.org/x/sync/singleflight"

singleflight類的使用方法就新建一個singleflight.Group,使用其方法Do或者DoChan來包裝方法,被包裝的方法在對於同一個key,只會有一個協程執行,其餘協程等待那個協程執行結束後,拿到一樣的結果。緩存

  • Group結構體

表明一類工做,同一個group中,一樣的key同時只能被執行一次。安全

  • Do方法
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)

key:同一個key,同時只有一個協程執行。併發

fn:被包裝的函數。app

v:返回值,即執行的結果。其餘等待的協程都會拿到。函數

shared:表示是否有其餘協程獲得了這個結果v。源碼分析

  • DoChan方法
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result

與Do方法同樣,只是返回的是一個channel,執行結果會發送到channel中,其餘等待的協程均可以從channel中拿到結果。code

ref:https://godoc.org/golang.org/x/sync/singleflight協程

示例

  • 使用Do方法來模擬,解決緩存擊穿的問題
func  main() {

var singleSetCache singleflight.Group

   getAndSetCache:=func  (requestID int,cacheKey string)  (string, error) {

      log.Printf("request %v start to get and set cache...",requestID)

value,_, _ :=singleSetCache.Do(cacheKey, func()  (ret interface{}, err error) {//do的入參key,能夠直接使用緩存的key,這樣同一個緩存,只有一個協程會去讀DB

         log.Printf("request %v is setting cache...",requestID)

         time.Sleep(3*time._Second_)

         log.Printf("request %v set cache success!",requestID)

return  "VALUE",nil

      })

 return value.(string),nil

   }

   cacheKey:="cacheKey"

for i:=1;i<10;i++{//模擬多個協程同時請求

 go  func(requestID int) {

         value,_:=getAndSetCache(requestID,cacheKey)

         log.Printf("request %v get value: %v",requestID,value)

      }(i)

   }

   time.Sleep(20*time._Second_)

}

輸出:get

2020/04/12 18:18:40 request 4 start  to  get  and  set cache...

2020/04/12 18:18:40 request 4 is setting cache...

2020/04/12 18:18:40 request 2 start  to  get  and  set cache...

2020/04/12 18:18:40 request 7 start  to  get  and  set cache...

2020/04/12 18:18:40 request 5 start  to  get  and  set cache...

2020/04/12 18:18:40 request 1 start  to  get  and  set cache...

2020/04/12 18:18:40 request 6 start  to  get  and  set cache...

2020/04/12 18:18:40 request 3 start  to  get  and  set cache...

2020/04/12 18:18:40 request 8 start  to  get  and  set cache...

2020/04/12 18:18:40 request 9 start  to  get  and  set cache...

2020/04/12 18:18:43 request 4 set  cache  success!

2020/04/12 18:18:43 request 4 get value: VALUE

2020/04/12 18:18:43 request 9 get value: VALUE

2020/04/12 18:18:43 request 6 get value: VALUE

2020/04/12 18:18:43 request 3 get value: VALUE

2020/04/12 18:18:43 request 8 get value: VALUE

2020/04/12 18:18:43 request 1 get value: VALUE

2020/04/12 18:18:43 request 5 get value: VALUE

2020/04/12 18:18:43 request 2 get value: VALUE

2020/04/12 18:18:43 request 7 get value: VALUE`

能夠看到確實只有一個協程執行了被包裝的函數,而且其餘協程都拿到告終果。

源碼分析

看一下這個Do方法是怎麼實現的。

首先看一下Group的結構:

type Group struct {

   mu sync.Mutex      

   m  map[string]*call //保存key對應的函數執行過程和結果的變量。

}

Group的結構很是簡單,一個鎖來保證併發安全,另外一個map用來保存key對應的函數執行過程和結果的變量。

看下call的結構:

type call struct {

wg sync.WaitGroup //用WaitGroup實現只有一個協程執行函數

val interface{} //函數執行結果

   err error

forgotten bool

   dups  int  //含義是duplications,即同時執行同一個key的協程數量

   chans []chan<- Result

}

看下Do方法

func  (g *Group)  Do(key string, fn func()  (interface{}, error)) (v interface{}, err error, shared bool) {

   g.mu.Lock()//寫Group的m字段時,加鎖保證寫安全。

if g.m == nil {

g.m = make(map[string]*call)

   }

if c, ok := g.m[key]; ok {//若是key已經存在,說明已經有協程在執行,則dups++,並等待其執行完畢後,返回其執行結果,執行結果保存在對應的call的val字段裏

      c.dups++

      g.mu.Unlock()

      c.wg.Wait()

 return c.val, c.err, true

   }

//若是key不存在,則新建一個call,並使用WaitGroup來阻塞其餘協程,同時在m字段裏寫入key和對應的call

c := new(call)

   c.wg.Add(1)

   g.m[key] = c

   g.mu.Unlock()

   g.doCall(c, key, fn)//第一個進來的協程來執行這個函數

return c.val, c.err, c.dups > 0

}

繼續看下g.doCall裏具體幹了什麼

func  (g *Group)  doCall(c *call, key string, fn func()  (interface{}, error)) {

   c.val, c.err = fn()//執行被包裝的函數

   c.wg.Done()//執行完畢後,就能夠通知其餘協程能夠拿結果了

   g.mu.Lock()

if !c.forgotten {//其實這裏是爲了保證執行完畢以後,對應的key被刪除,Group有一個方法Forget(key string),能夠用來主動刪除key,這裏是判斷那個方法是否被調用過,被調用過則字段forgotten會置爲true,若是沒有被調用過,則在這裏把key刪除。

 delete(g.m, key)

   }

for _, ch := range c.chans {//將執行結果發送到channel裏,這裏是給DoChan方法使用的

ch <- Result{c.val, c.err, c.dups > 0}

   }

   g.mu.Unlock()

}

由此看來,其實現是很是簡單的。不得不讚嘆一百來行代碼就實現了功能。

其餘

順便附上DoChan方法的使用示例:

func  main() {

var singleSetCache singleflight.Group

   getAndSetCache:=func  (requestID int,cacheKey string)  (string, error) {

      log.Printf("request %v start to get and set cache...",requestID)

retChan:=singleSetCache.DoChan(cacheKey, func()  (ret interface{}, err error) {

         log.Printf("request %v is setting cache...",requestID)

         time.Sleep(3*time._Second_)

         log.Printf("request %v set cache success!",requestID)

return  "VALUE",nil

      })

 var ret singleflight.Result

      timeout := time.After(5 * time._Second_)

 select {//加入了超時機制

 case <-timeout:

         log.Printf("time out!")

return  "",errors.New("time out")

 case ret =<- retChan://從chan中取出結果

return ret.Val.(string),ret.Err

      }

 return  "",nil

   }

   cacheKey:="cacheKey"

for i:=1;i<10;i++{

 go  func(requestID int) {

         value,_:=getAndSetCache(requestID,cacheKey)

         log.Printf("request %v get value: %v",requestID,value)

      }(i)

   }

   time.Sleep(20*time._Second_)

}

看下DoChan的源碼

func  (g *Group)  DoChan(key string, fn func()  (interface{}, error)) <-chan  Result {

ch := make(chan Result, 1)

   g.mu.Lock()

if g.m == nil {

g.m = make(map[string]*call)

   }

if c, ok := g.m[key]; ok {

      c.dups++

c.chans = append(c.chans, ch)//能夠看到,每一個等待的協程,都有一個結果channel。從以前的g.doCall裏也能夠看到,每一個channel都給塞告終果。爲何不全部協程共用一個channel?由於那樣就得在channel裏塞至少與協程數量同樣的結果數量,可是你卻沒法保證用戶一個協程只讀取一次。

      g.mu.Unlock()

 return ch

   }

   c := &call{chans: []chan<- Result{ch}}

   c.wg.Add(1)

   g.m[key] = c

   g.mu.Unlock()

go g.doCall(c, key, fn)

return ch

}
相關文章
相關標籤/搜索