有沒感受
Go
的sync
包不夠用?有沒遇到類型沒有sync/atomic
支持?git咱們一塊兒看看
go-zero
的syncx
包對標準庫的一些增值補充。github
name | 做用 |
---|---|
AtomicBool | bool類型 原子類 |
AtomicDuration | Duration有關 原子類 |
AtomicFloat64 | float64類型 原子類 |
Barrier | 欄柵【將加鎖解鎖包裝】 |
Cond | 條件變量 |
DoneChan | 優雅通知關閉 |
ImmutableResource | 建立後不會修改的資源 |
Limit | 控制請求數 |
LockedCalls | 確保方法的串行調用 |
ManagedResource | 資源管理 |
Once | 提供 once func |
OnceGuard | 一次性使用的資源管理 |
Pool | pool,簡單的池 |
RefResource | 引用計數的資源 |
ResourceManager | 資源管理器 |
SharedCalls | 相似 singflight 的功能 |
SpinLock | 自旋鎖:自旋+CAS |
TimeoutLimit | Limit + timeout 控制 |
下面開始對以上庫組件作分別介紹。緩存
由於沒有 泛型 支持,因此纔會出現多種類型的原子類支持。如下采用 float64
做爲例子:微信
func (f *AtomicFloat64) Add(val float64) float64 { for { old := f.Load() nv := old + val if f.CompareAndSwap(old, nv) { return nv } } } func (f *AtomicFloat64) CompareAndSwap(old, val float64) bool { return atomic.CompareAndSwapUint64((*uint64)(f), math.Float64bits(old), math.Float64bits(val)) } func (f *AtomicFloat64) Load() float64 { return math.Float64frombits(atomic.LoadUint64((*uint64)(f))) } func (f *AtomicFloat64) Set(val float64) { atomic.StoreUint64((*uint64)(f), math.Float64bits(val)) }
Add(val)
:若是 CAS
失敗,不斷for循環重試,獲取 old val,並set old+val;CompareAndSwap(old, new)
:調用底層 atomic
的 CAS
;Load()
:調用 atomic.LoadUint64
,而後轉換Set(val)
:調用 atomic.StoreUint64
至於其餘類型,開發者想本身擴展本身想要的類型,能夠依照上述,基本上調用原始 atomic
操做,而後轉換爲須要的類型,好比:遇到 bool
能夠藉助 0, 1
來分辨對應的 false, true
。網絡
這裏 Barrier
只是將業務函數操做封裝,做爲閉包傳入,內部將 lock
操做的加鎖解鎖自行解決了【防止開發者加鎖了忘記解鎖】session
func (b *Barrier) Guard(fn func()) { b.lock.Lock() defer b.lock.Unlock() // 本身的業務邏輯 fn() }
這個數據結構和 Limit
一塊兒組成了 TimeoutLimit
,這裏將這3個一塊兒講:數據結構
func NewTimeoutLimit(n int) TimeoutLimit { return TimeoutLimit{ limit: NewLimit(n), cond: NewCond(), } } func NewLimit(n int) Limit { return Limit{ pool: make(chan lang.PlaceholderType, n), } }
limit
這裏是有緩衝的 channel
;cond
是無緩衝的;因此這裏結合名字來理解:由於 Limit
是限制某一種資源的使用,因此須要預先在資源池中放入預置數量的資源;Cond
相似閥門,須要兩邊都準備好,才能進行數據交換,因此使用無緩衝,同步控制。閉包
這裏咱們看看 stores/mongo
中關於 session
的管理,來理解 資源控制:併發
func (cs *concurrentSession) takeSession(opts ...Option) (*mgo.Session, error) { // 選項參數注入 ... // 看 limit 中是否還能取出資源 if err := cs.limit.Borrow(o.timeout); err != nil { return nil, err } else { return cs.Copy(), nil } } func (l TimeoutLimit) Borrow(timeout time.Duration) error { // 1. 若是還有 limit 中還有資源,取出一個,返回 if l.TryBorrow() { return nil } // 2. 若是 limit 中資源已經用完了 var ok bool for { // 只有 cond 能夠取出一個【無緩存,也只有 cond <- 此條才能經過】 timeout, ok = l.cond.WaitWithTimeout(timeout) // 嘗試取出一個【上面 cond 經過時,就有一個資源返回了】 // 看 `Return()` if ok && l.TryBorrow() { return nil } // 超時控制 if timeout <= 0 { return ErrTimeout } } } func (l TimeoutLimit) Return() error { // 返回去一個資源 if err := l.limit.Return(); err != nil { return err } // 同步通知另外一個須要資源的協程【實現了閥門,兩方交換】 l.cond.Signal() return nil }
同文件夾中還有 ResourceManager
,從名字上相似,這裏將兩個組件放在一塊兒講解。
先從結構上:
type ManagedResource struct { // 資源 resource interface{} lock sync.RWMutex // 生成資源的邏輯,由開發者本身控制 generate func() interface{} // 對比資源 equals func(a, b interface{}) bool } type ResourceManager struct { // 資源:這裏看得出來是 I/O, resources map[string]io.Closer sharedCalls SharedCalls // 對資源map互斥訪問 lock sync.RWMutex }
而後來看獲取資源的方法簽名:
func (manager *ResourceManager) GetResource(key, create func() (io.Closer, error)) (io.Closer, error) // 獲取一個資源(有就直接獲取,沒有生成一個) func (mr *ManagedResource) Take() interface{} // 判斷這個資源是否不符合傳入的判斷要求,不符合則重置 func (mr *ManagedResource) MarkBroken(resource interface{})
ResourceManager
使用 SharedCalls
作防重複請求,並將資源緩存在內部的 sourMap
;另外傳入的 create func
和 IO
操做有關,常見用在網絡資源的緩存;ManagedResource
緩存資源沒有 map
而是單一的 interface
,說明只有一份,可是它提供了 Take()
和傳入 generate()
說明可讓開發者自行更新 resource
;因此在用途上:
ResourceManager
:用在網絡資源的管理。如:數據庫鏈接管理;ManagedResource
:用在一些變化資源,能夠作資源先後對比,達到更新資源。如:token
管理和驗證這個就和 GC
中引用計數相似:
Use() -> ref++
Clean() -> ref--; if ref == 0 -> ref clean
func (r *RefResource) Use() error { // 互斥訪問 r.lock.Lock() defer r.lock.Unlock() // 清除標記 if r.cleaned { return ErrUseOfCleaned } // 引用 +1 r.ref++ return nil }
一句話形容:使用SharedCalls能夠使得同時多個請求只須要發起一次拿結果的調用,其餘請求"不勞而獲",這種設計有效減小了資源服務的併發壓力,能夠有效防止緩存擊穿。
這個組件被反覆應用在其餘組件中,上面說的 ResourceManager
。
相似當須要高頻併發訪問一個資源時,就能夠使用 SharedCalls
緩存。
// 當多個請求同時使用Do方法請求資源時 func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) { // 先申請加鎖 g.lock.Lock() // 根據key,獲取對應的call結果,並用變量c保存 if c, ok := g.calls[key]; ok { // 拿到call之後,釋放鎖,此處call可能尚未實際數據,只是一個空的內存佔位 g.lock.Unlock() // 調用wg.Wait,判斷是否有其餘goroutine正在申請資源,若是阻塞,說明有其餘goroutine正在獲取資源 c.wg.Wait() // 當wg.Wait再也不阻塞,表示資源獲取已經結束,能夠直接返回結果 return c.val, c.err } // 沒有拿到結果,則調用makeCall方法去獲取資源,注意此處仍然是鎖住的,能夠保證只有一個goroutine能夠調用makecall c := g.makeCall(key, fn) // 返回調用結果 return c.val, c.err }
不重複造輪子,一直是 go-zero
設計主旨之一;也同時將平時業務沉澱到組件中,這纔是框架和組件的意義。
關於 go-zero
更多的設計和實現文章,能夠持續關注咱們。歡迎你們去關注和使用。
https://github.com/tal-tech/go-zero
歡迎使用 go-zero 並 star 支持咱們!
關注『微服務實踐』公衆號並回復 進羣 獲取社區羣二維碼。
go-zero 系列文章見『微服務實踐』公衆號