多核處理器日益普及的如今不少代碼都得和併發/並行打交道,對於內置了併發支持(goroutine)的golang來講併發編程是必不可少的一環。node
鏈表是咱們再熟悉不過的數據結構,在併發編程中咱們也時長鬚要用到,今天咱們就來看兩種帶鎖的併發安全的單項鍊表。golang
方案一:粗粒度鎖,徹底鎖住鏈表編程
方案一的作法是將全部操做用鎖——Mutex串行化處理。串行化處理是指鎖和鏈表相關聯,當須要修改或讀取鏈表時就獲取鎖,只要該goroutine持有鎖,那麼其餘goroutine就沒法修改或讀取鏈表,直到鎖的持有者將其釋放。安全
這樣能夠保證任什麼時候間都只有一個goroutine能處理鏈表,如此一來也就避免了數據競爭。下面是鏈表結構的定義:數據結構
type MutexList struct { locker *sync.Mutex head, tail *Node size int64 }
size表示當前list的長度,head是一個哨兵節點,不存儲實際的數值。併發
下面是節點的定義:ide
type Node struct { Value interface{} Next *Node } func NewNode(v interface{}) *Node { n := &Node{Value: v} n.Next = nil return n }
節點和它的初始化不用多說,由於數據訪問經過list來控制,因此節點裏不須要再有mutex的存在。函數
好了咱們進入正題,在粗粒度的解決方案裏Enq方法負責將數據插入list的末尾,這是O(1)時間的操做,將list鎖住而後更新tail便可,注意咱們不容許插入nil:性能
func (l *MutexList) Enq(v interface{}) bool { if v == nil { return false } l.locker.Lock() node := NewNode(v) l.tail.Next = node l.tail = node l.size++ l.locker.Unlock() return true }
而後是insert,它將數據插入在給出的index處,index從0開始,一樣咱們不容許插入nil,同時會檢查index,index不能超過size,當list只有size-1個節點時,新數據會插入在list的末尾:測試
func (l *MutexList) Insert(index int64, v interface{}) bool { l.locker.Lock() defer l.locker.Unlock() if index > l.size || index < 0 || v == nil { return false } current := l.head for i := int64(0); i <= index-1; i++ { // index - 1是最後一個節點時 if current.Next == nil { break } current = current.Next } node := NewNode(v) node.Next = current.Next current.Next = node l.size++ return true }
這裏咱們使用了defer,那是由於只有一個mutex,並且函數有多個出口,容易在編碼過程當中漏掉對鎖的釋放。
節點的刪除和查找也是相似的步驟,先給列表上鎖,而後修改/讀取,最後解鎖,這裏就很少講解了。
而後是獲取size的函數,後面的測試中要用,雖然咱們以原子操做來獲取了長度,可是仍然可能存在得到size以後其餘goroutine進行了remove致使size改變進而引起Insert返回false,所幸的是咱們的測試裏並不會讓remove和Insert同時出現,所以不會出現insert返回失敗的問題,在實際使用時須要注意Insert的返回值,這一點在第二種方案中也是同樣的:
func (l *MutexList) Length() int64 { return atomic.LoadInt64(&l.size) }
由於方案二的該函數並沒有什麼變化,所以就省略了。
如你所見,方案一的優勢在於實現起來簡單,確點在於一次只有一個goroutine能處理list,幾乎全部對list的操做都被串行化了。
方案一無疑能很好地工做,可是它的性能十分有限,因此咱們來看看方案二。
方案二:細粒度鎖,鎖住須要修改的節點
方案二的作法是將鎖放到node裏,每次須要修改的僅僅是部分節點,而不用把整個list鎖住,這樣能保證互不干擾的goroutine們能夠同時處理list,而會互相干擾的goroutine則會被節點的mutex阻塞,以保證不存在竟態數據。
固然,爲了保證不會有多個goroutine同時處理一個節點,咱們須要在取得要修改節點的鎖以前先取得前項節點的鎖,而後才能取得修改節點的鎖。這個步驟很像交叉手,它被稱爲鎖耦合。
另一個須要注意的地方是加鎖的順序,全部操做的加鎖順序/方向必須相同,好比從head開始鎖定到tail,若是不按統一的順序加鎖將會出現死鎖。考慮以下狀況,goroutine A鎖住了節點1,正準備鎖定節點2,這時goroutine B沿反方向加鎖,它要鎖住節點2而後再鎖住節點1,若是B運氣很好先於A取得了節點2的鎖,那麼它將一直等待鎖住節點1,而A則會始終等待鎖住節點2,出現了A等B,B等A的死鎖。可是隻要統一了加鎖的順序/方向,那麼這種問題就不復存在了。
這是list和node的定義,能夠看見鎖已經移動到node結構裏了:
type List struct { head, tail *MutexNode size int64 } type MutexNode struct { Locker *sync.Mutex Value interface{} Next *MutexNode } func NewMutexNode(v interface{}) *MutexNode { n := &MutexNode{Value: v} n.Locker = &sync.Mutex{} n.Next = nil return n } func NewList() *List { l := &List{} l.head = NewMutexNode(nil) l.tail = l.head return l }
下面咱們來看Enq,功能與方案一一致,只是在處理鎖的地方有所不一樣,由於tail節點老是在list末尾的元素,符合咱們從head開始的加鎖順序,又由於l.tail的位置始終是肯定的,因此能夠省略鎖住前項節點的步驟;然而l.tail會在咱們等待鎖的時間段裏被更新,因此咱們須要處理l.tail被更新的狀況:
func (l *List) Enq(v interface{}) bool { if v == nil { return false } tail := l.tail tail.Locker.Lock() for tail.Next != nil { next := tail.Next next.Locker.Lock() tail.Locker.Unlock() tail = next } node := NewMutexNode(v) tail.Next = node l.tail = node l.size++ tail.Locker.Unlock() return true }
若是tail的next在取得鎖時不爲nil,說明tail被更新,在tail被更新以後咱們須要找到當前的末尾節點,這時不能直接使用l.tail,有兩點緣由,一是由於這時的l.tail可能也已經被更新,二是在臨時變量tail多是非前驅節點時給l.tail加鎖不能保證其一致性,並且如此一來會破壞加鎖的順序,會形成意想不到的問題。因此咱們遵循加鎖的順序原則不斷後推,直到找到真正的末尾節點。因而可知方案二的Enq操做最壞狀況下是O(n)最好狀況下是O(1),而只要仔細想想,在併發壓力較大時這個操做幾乎老是O(n)的時間開銷(不過實際狀況是方案二花費的時間與方案一差很少,緣由在於方案一要鎖住整個list開銷實在太大了)
Insert的功能也與方案一同樣,由於不是鎖住整個list,因此光判斷size是無心義的,須要處理list被中途修改的狀況;並且由於是從head開始加鎖,而後鎖住節點1再解鎖head,以此類推,因此不會有競爭,但一樣存在remove和insert一塊兒使用時insert會失敗的狀況,須要注意其返回值:
func (l *List) Insert(index int64, v interface{}) bool { if index < 0 || v == nil { return false } current := l.head current.Locker.Lock() for i := int64(0); i <= index-1; i++ { next := current.Next if next == nil { // 若是index前的某個節點爲nil,那麼說明鏈表可能被修改了,沒有index個節點,insert失敗 if index < index - 1 { current.Locker.Unlock() return false } break } next.Locker.Lock() current.Locker.Unlock() current = next } node := NewMutexNode(v) node.Next = current.Next current.Next = node l.size++ current.Locker.Unlock() return true }
remove的作法和insert相似,再也不贅述。
咱們能夠看到方案二鎖的粒度確實變小了,可是實現變得十分複雜,並且須要同時考慮多個邊界狀況,對開發增長了很大的難度,並且分散的鎖也會對調試帶來必定的負面影響。
方案二的另外一個缺點是每一個節點都帶有本身的mutex,當節點增多時內存的開銷也會增大。
性能對比
說了這麼多,方案一粗粒度鎖和方案二細粒度鎖在性能上孰優孰劣呢?畢竟方案二須要屢次獲取和釋放鎖並且須要額外處理不少邊界狀況,仔細想一下的話可能也是一筆不菲的開銷,感謝golang自帶的測試套件,咱們能夠方便的測試。
測試咱們採用一組單goroutine+一組多goroutine測試一個功能的作法,測試機器是intel i5 6500 4核,爲了模擬通常的工做負載,在多goroutine組我統一使用6個goroutine來併發操做list。
測試代碼:
import ( "math/rand" "sync" "testing" "time" ) func init() { rand.Seed(time.Now().Unix()) } func BenchmarkEnq(b *testing.B) { list := NewMutexList() b.ResetTimer() for i := 0; i < b.N; i++ { if done := list.Enq(rand.Int()); !done { b.Error("Enq failed") } } } func BenchmarkGoroutineEnq(b *testing.B) { list := NewMutexList() wg := &sync.WaitGroup{} b.ResetTimer() for i := 0; i < 6; i++ { wg.Add(1) go func(n int) { for i := 0; i < n; i++ { if done := list.Enq(rand.Int()); !done { b.Error("Enq by goroutines failed") } } wg.Done() }(b.N) } wg.Wait() } func BenchmarkInsert(b *testing.B) { list := NewMutexList() for i := 0; i < 5; i++ { list.Enq(i) } b.ResetTimer() for i := 0; i < b.N; i++ { if done := list.Insert(rand.Int63n(list.Length()), rand.Int()); !done { b.Error("Insert failed") } } } func BenchmarkGoroutineInsert(b *testing.B) { list := NewMutexList() for i := 0; i < 5; i++ { list.Enq(i) } wg := &sync.WaitGroup{} b.ResetTimer() for i := 0; i < 6; i++ { wg.Add(1) go func(n int) { for i := 0; i < n; i++ { if done := list.Insert(rand.Int63n(list.Length()), rand.Int()); !done { b.Error("insert by goroutine failed") } } wg.Done() }(b.N) } wg.Wait() }
import ( "math/rand" "sync" "testing" ) func BenchmarkMutexNodeEnq(b *testing.B) { list := NewList() b.ResetTimer() for i := 0; i < b.N; i++ { if done := list.Enq(rand.Int()); !done { b.Error("MutexNode Enq failed") } } } func BenchmarkGoroutineMutexNodeEnq(b *testing.B) { list := NewList() wg := &sync.WaitGroup{} b.ResetTimer() for i := 0; i < 6; i++ { wg.Add(1) go func(n int) { for i := 0; i < n; i++ { if done := list.Enq(rand.Int()); !done { b.Error("MutexNode Enq by goroutines failed") } } wg.Done() }(b.N) } wg.Wait() } func BenchmarkMutexNodeInsert(b *testing.B) { list := NewList() for i := 0; i < 5; i++ { list.Enq(i) } b.ResetTimer() for i := 0; i < b.N; i++ { if done := list.Insert(rand.Int63n(list.Length()), rand.Int()); !done { b.Error("MutexNode Insert failed") } } } func BenchmarkGoroutineMutexNodeInsert(b *testing.B) { list := NewList() for i := 0; i < 5; i++ { list.Enq(i) } wg := &sync.WaitGroup{} b.ResetTimer() for i := 0; i < 6; i++ { wg.Add(1) go func(n int) { for i := 0; i < n; i++ { if done := list.Insert(rand.Int63n(list.Length()), rand.Int()); !done { b.Error("MutexNode Insert by goroutine failed") } } wg.Done() }(b.N) } wg.Wait() }
測試內容是將隨機數插入兩種不一樣的鏈表,而後對比插入性能。
這是測試結果,由於testing不能跟蹤goroutine內部的操做,因此多goroutine組的單個op看上去比較嚇人,其實這是一個goroutine運行完全部插入調用的時間:
go test -bench=. -benchmem
能夠看到,在Enq也就是在末尾插入上二者相差很少,方案二在全部多goroutine測試用例的表現都優於方案一;
對於有大量隨機訪問發生的Inser操做,方案二在性能上能夠說是碾壓的存在,這多是方案二能夠運行多個goroutine同時修改list而方案一隻能同時有一個goroutine修改的緣由;
而在併發的狀況下方案二仍然比方案一快很多,可是差距縮小了,緣由極可能是頻繁的加鎖加上goroutine之間互相干擾增多致使了性能的部分降低。
總結
若是追求性能,能夠考慮方案二,或者使用第三方的無鎖隊列,不建議本身去實現無鎖數據結構,由於 太 復 雜 !若是你以爲方案二已經想不明白了,那麼無鎖編程將會是天書通常的存在,不如複用大神們的勞動成果吧。若是鏈表並非你程序的性能熱點,那麼就能夠考慮方案一,穩定且易於開發和維護的代碼永遠都是好東西。
最後若是有疑問和建議或者勘誤,歡迎評論指出,祝玩得愉快!