更簡的併發代碼,更強的併發控制

有沒感受 Gosync 包不夠用?有沒遇到類型沒有 sync/atomic 支持?git

咱們一塊兒看看 go-zerosyncx 包對標準庫的一些增值補充。github

https://github.com/tal-tech/g...數據庫

name 做用
AtomicBool bool類型 原子類
AtomicDuration Duration有關 原子類
AtomicFloat64 float64類型 原子類
Barrier 欄柵【將加鎖解鎖包裝】
Cond 條件變量
DoneChan 優雅通知關閉
ImmutableResource 建立後不會修改的資源
Limit 控制請求數
LockedCalls 確保方法的串行調用
ManagedResource 資源管理
Once 提供 once func
OnceGuard 一次性使用的資源管理
Pool pool,簡單的池
RefResource 引用計數的資源
ResourceManager 資源管理器
SharedCalls 相似 singflight 的功能
SpinLock 自旋鎖:自旋+CAS
TimeoutLimit Limit + timeout 控制

下面開始對以上庫組件作分別介紹。緩存

atomic

由於沒有 泛型 支持,因此纔會出現多種類型的原子類支持。如下采用 float64 做爲例子:微信

func (f *AtomicFloat64) Add(val float64) float64 {
    for {
        old := f.Load()
        nv := old + val
        if f.CompareAndSwap(old, nv) {
            return nv
        }
    }
}

func (f *AtomicFloat64) CompareAndSwap(old, val float64) bool {
    return atomic.CompareAndSwapUint64((*uint64)(f), math.Float64bits(old), math.Float64bits(val))
}

func (f *AtomicFloat64) Load() float64 {
    return math.Float64frombits(atomic.LoadUint64((*uint64)(f)))
}

func (f *AtomicFloat64) Set(val float64) {
    atomic.StoreUint64((*uint64)(f), math.Float64bits(val))
}
  • Add(val):若是 CAS 失敗,不斷for循環重試,獲取 old val,並set old+val;
  • CompareAndSwap(old, new):調用底層 atomicCAS
  • Load():調用 atomic.LoadUint64 ,而後轉換
  • Set(val):調用 atomic.StoreUint64

至於其餘類型,開發者想本身擴展本身想要的類型,能夠依照上述,基本上調用原始 atomic 操做,而後轉換爲須要的類型,好比:遇到 bool 能夠藉助 0, 1 來分辨對應的 false, true網絡

Barrier

這裏 Barrier 只是將業務函數操做封裝,做爲閉包傳入,內部將 lock 操做的加鎖解鎖自行解決了【防止開發者加鎖了忘記解鎖】session

func (b *Barrier) Guard(fn func()) {
    b.lock.Lock()
    defer b.lock.Unlock()
  // 本身的業務邏輯
    fn()
}

Cond/Limit/TimeoutLimit

這個數據結構和 Limit 一塊兒組成了 TimeoutLimit ,這裏將這3個一塊兒講:數據結構

func NewTimeoutLimit(n int) TimeoutLimit {
    return TimeoutLimit{
        limit: NewLimit(n),
        cond:  NewCond(),
    }
}

func NewLimit(n int) Limit {
    return Limit{
        pool: make(chan lang.PlaceholderType, n),
    }
}
  • limit 這裏是有緩衝的 channel
  • cond 是無緩衝的;

因此這裏結合名字來理解:由於 Limit 是限制某一種資源的使用,因此須要預先在資源池中放入預置數量的資源;Cond 相似閥門,須要兩邊都準備好,才能進行數據交換,因此使用無緩衝,同步控制。閉包

這裏咱們看看 stores/mongo 中關於 session 的管理,來理解 資源控制:併發

func (cs *concurrentSession) takeSession(opts ...Option) (*mgo.Session, error) {
  // 選項參數注入
    ...
  // 看 limit 中是否還能取出資源
    if err := cs.limit.Borrow(o.timeout); err != nil {
        return nil, err
    } else {
        return cs.Copy(), nil
    }
}

func (l TimeoutLimit) Borrow(timeout time.Duration) error {
  // 1. 若是還有 limit 中還有資源,取出一個,返回
    if l.TryBorrow() {
        return nil
    }
    // 2. 若是 limit 中資源已經用完了
    var ok bool
    for {
    // 只有 cond 能夠取出一個【無緩存,也只有 cond <- 此條才能經過】
        timeout, ok = l.cond.WaitWithTimeout(timeout)
    // 嘗試取出一個【上面 cond 經過時,就有一個資源返回了】
    // 看 `Return()`
        if ok && l.TryBorrow() {
            return nil
        }
        // 超時控制
        if timeout <= 0 {
            return ErrTimeout
        }
    }
}

func (l TimeoutLimit) Return() error {
  // 返回去一個資源
    if err := l.limit.Return(); err != nil {
        return err
    }
    // 同步通知另外一個須要資源的協程【實現了閥門,兩方交換】
    l.cond.Signal()
    return nil
}

資源管理

同文件夾中還有 ResourceManager,從名字上相似,這裏將兩個組件放在一塊兒講解。

先從結構上:

type ManagedResource struct {
  // 資源
    resource interface{}
    lock     sync.RWMutex
  // 生成資源的邏輯,由開發者本身控制
    generate func() interface{}
  // 對比資源
    equals   func(a, b interface{}) bool
}

type ResourceManager struct {
  // 資源:這裏看得出來是 I/O,
    resources   map[string]io.Closer
    sharedCalls SharedCalls
  // 對資源map互斥訪問
    lock        sync.RWMutex
}

而後來看獲取資源的方法簽名:

func (manager *ResourceManager) GetResource(key, create func() (io.Closer, error)) (io.Closer, error)

// 獲取一個資源(有就直接獲取,沒有生成一個)
func (mr *ManagedResource) Take() interface{}
// 判斷這個資源是否不符合傳入的判斷要求,不符合則重置
func (mr *ManagedResource) MarkBroken(resource interface{})
  1. ResourceManager 使用 SharedCalls 作防重複請求,並將資源緩存在內部的 sourMap;另外傳入的 create funcIO 操做有關,常見用在網絡資源的緩存;
  2. ManagedResource 緩存資源沒有 map 而是單一的 interface ,說明只有一份,可是它提供了 Take() 和傳入 generate()說明可讓開發者自行更新 resource

因此在用途上:

  • ResourceManager:用在網絡資源的管理。如:數據庫鏈接管理;
  • ManagedResource:用在一些變化資源,能夠作資源先後對比,達到更新資源。如:token 管理和驗證

RefResource

這個就和 GC 中引用計數相似:

  • Use() -> ref++
  • Clean() -> ref--; if ref == 0 -> ref clean
func (r *RefResource) Use() error {
  // 互斥訪問
    r.lock.Lock()
    defer r.lock.Unlock()
    // 清除標記
    if r.cleaned {
        return ErrUseOfCleaned
    }
    // 引用 +1
    r.ref++
    return nil
}

SharedCalls

一句話形容:使用SharedCalls能夠使得同時多個請求只須要發起一次拿結果的調用,其餘請求"不勞而獲",這種設計有效減小了資源服務的併發壓力,能夠有效防止緩存擊穿

這個組件被反覆應用在其餘組件中,上面說的 ResourceManager

相似當須要高頻併發訪問一個資源時,就能夠使用 SharedCalls 緩存。

// 當多個請求同時使用Do方法請求資源時
func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
  // 先申請加鎖
  g.lock.Lock()

  // 根據key,獲取對應的call結果,並用變量c保存
  if c, ok := g.calls[key]; ok {
    // 拿到call之後,釋放鎖,此處call可能尚未實際數據,只是一個空的內存佔位
    g.lock.Unlock()
    // 調用wg.Wait,判斷是否有其餘goroutine正在申請資源,若是阻塞,說明有其餘goroutine正在獲取資源
    c.wg.Wait()
    // 當wg.Wait再也不阻塞,表示資源獲取已經結束,能夠直接返回結果
    return c.val, c.err
  }

  // 沒有拿到結果,則調用makeCall方法去獲取資源,注意此處仍然是鎖住的,能夠保證只有一個goroutine能夠調用makecall
  c := g.makeCall(key, fn)
  // 返回調用結果
  return c.val, c.err
}

總結

不重複造輪子,一直是 go-zero 設計主旨之一;也同時將平時業務沉澱到組件中,這纔是框架和組件的意義。

關於 go-zero 更多的設計和實現文章,能夠持續關注咱們。歡迎你們去關注和使用。

項目地址

https://github.com/tal-tech/go-zero

歡迎使用 go-zero 並 star 支持咱們!

微信交流羣

關注『微服務實踐』公衆號並回復 進羣 獲取社區羣二維碼。

go-zero 系列文章見『微服務實踐』公衆號
相關文章
相關標籤/搜索