Go語言提供了channel和sync包兩種併發控制的方法,每種方法都有他們適用的場景,並非全部併發場景都適合應用channel的,有的時候用sync包裏提供的同步原語更簡單。今天這個話題純屬是爲了經過用channel實現同步鎖的功能來學習掌握channel擁有的強大能力,並不適合在實際中使用。並且面試中有時候就是會出一些奇奇怪怪的題考應聘者對知識的理解以及靈活運用的應變能力。
你們仔細看看文章裏用channel實現幾種經常使用的同步鎖的思路,沒準兒哪次面試就碰上這樣的面試官了呢。
今天,我將深刻探討Go
語言channel
和select
語句的表達能力。爲了演示只用這兩個原語就能夠實現多少功能,我將從頭開始用它們重寫sync
包。面試
sync
包提供的同步原語的有哪些以及如何使用咱們已經在以前的文章裏介紹過了,因此這裏不會再去介紹用channel
實現的這些同步原語應該怎麼用。若是對用法有疑問請回看以前的文章: Go語言sync包的應用詳解。編程
once是一個簡單而強大的原語,可確保在並行程序中一個函數僅執行一次。併發
channel
版的Once
咱們使用帶有一個緩衝的通道來實現
第一次調用Do(func ())
的goroutine
從通道中接收到值後,後續的goroutine
將會被阻塞中,直到Do
的參數函數執行完成後關閉通道爲止。其餘goroutine
判斷通道已關閉後將不執行任何操做並當即返回。函數
type Once chan struct{} func NewOnce() Once { o := make(Once, 1) // 只容許一個goroutine接收,其餘goroutine會被阻塞住 o <- struct{}{} return o } func (o Once) Do(f func()) { _, ok := <-o if !ok { // Channel已經被關閉 // 證實f已經被執行過了,直接return. return } // 調用f, 由於channel中只有一個值 // 因此只有一個goroutine會到達這裏 f() // 關閉通道,這將釋放全部在等待的 // 以及將來會調用Do方法的goroutine close(o) }
大小爲N的信號量最多容許N個goroutine
在任何給定時間保持其鎖。互斥鎖是大小爲1的信號量的特例。學習
信號量(英語:semaphore)又稱爲信號標,是一個同步對象,用於保持在0至指定最大值之間的一個計數值。當線程完成一次對該semaphore對象的等待(wait)時,該計數值減一;當線程完成一次對semaphore對象的釋放(release)時,計數值加一。當計數值爲0,則線程直至該semaphore對象變成signaled狀態才能等待成功。semaphore對象的計數值大於0,爲signaled狀態;計數值等於0,爲nonsignaled狀態.
咱們先用channel
實現信號量的功能spa
type Semaphore chan struct{} func NewSemaphore(size int) Semaphore { return make(Semaphore, size) } func (s Semaphore) Lock() { // 只有在s還有空間的時候才能發送成功 s <- struct{}{} } func (s Semaphore) Unlock() { // 爲其餘信號量騰出空間 <-s }
上面也說了互斥鎖是大小爲1的信號量的特例。那麼在剛纔實現的信號量的基礎上實現互斥鎖只須要:線程
type Mutex Semaphore func NewMutex() Mutex { return Mutex(NewSemaphore(1)) }
RWMutex
是一個稍微複雜的原語:它容許任意數量的併發讀鎖,但在任何給定時間僅容許一個寫鎖。還能夠保證,若是有線程持有寫鎖,則任何線程都不能持有或得到讀鎖。code
sync
標準庫裏的RWMutex
還容許若是有線程嘗試獲取寫鎖,則其餘讀鎖將排隊等待,以免餓死嘗試獲取寫鎖的線程。爲了簡潔起見,在用channel
實現的RWMutex
裏咱們忽略了這部分邏輯。對象
RWMutex
具備三種狀態:空閒,存在寫鎖和存在讀鎖。這意味着咱們須要兩個通道分別標記RWMutex
上的讀鎖和寫鎖:空閒時,兩個通道都爲空;當獲取到寫鎖時,標記寫鎖的通道里將被寫入一下空結構體;當獲取到讀鎖時,咱們向兩個通道中都寫入一個值(避免寫鎖可以向標記寫鎖的通道發送值),其中標記讀鎖的通道里的值表明當前RWMutex
擁有的讀鎖的數量,讀鎖釋放的時候除了更新通道里存的讀鎖數量值,也會抽空寫鎖通道。開發
type RWMutex struct { write chan struct{} readers chan int } func NewLock() RWMutex { return RWMutex{ // 用來作一個普通的互斥鎖 write: make(chan struct{}, 1), // 用來保護讀鎖的數量,獲取讀鎖時經過接受通道里的值確保 // 其餘goroutine不會在同一時間更改讀鎖的數量。 readers: make(chan int, 1), } } func (l RWMutex) Lock() { l.write <- struct{}{} } func (l RWMutex) Unlock() { <-l.write } func (l RWMutex) RLock() { // 統計當前讀鎖的數量,默認爲0 var rs int select { case l.write <- struct{}{}: // 若是write通道能發送成功,證實如今沒有讀鎖 // 向write通道發送一個值,防止出現併發的讀-寫 case rs = <-l.readers: // 能從通道里接收到值,證實RWMutex上已經有讀鎖了,下面會更新讀鎖數量 } // 若是執行了l.write <- struct{}{}, rs的值會是0 rs++ // 更新RWMutex讀鎖數量 l.readers <- rs } func (l RWMutex) RUnlock() { // 讀出讀鎖數量而後減一 rs := <-l.readers rs-- // 若是釋放後讀鎖的數量變爲0了,抽空write通道,讓write通道變爲可用 if rs == 0 { <-l.write return } // 若是釋放後讀鎖的數量減一後不是0,把新的讀鎖數量發送給readers通道 l.readers <- rs }
WaitGroup
最多見的用途是建立一個組,向其計數器中設置一個計數,生成與該計數同樣多的goroutine
,而後等待它們完成。每次goroutine
運行完畢後,它將在組上調用Done
表示已完成工做。能夠經過調用WaitGroup
的Done
方法或以負數調用Add
方法減小計數器的計數。當計數器達到0時,被Wait
方法阻塞住的主線程會恢復執行。
WaitGroup
一個不爲人知的功能是在計數器達到0後,若是調用Add
方法讓計數器變爲正數,這將使WaitGroup
重回阻塞狀態。 這意味着對於每一個給定的WaitGroup
,都有一點"世代"的意味:
WaitGroup
的一個世代結束。下面是用channel
實現的WaitGroup
同步原語,真正起到阻塞goroutine
做用的是世代裏的wait
通道,而後經過用WaitGroup
通道包裝generation
結構體實現WaitGroup
的Wait
和Add
等功能。用文字很難描述清楚仍是直接看下面的代碼吧,代碼裏的註釋會幫助理解實現原理。
type generation struct { // 用於讓等待者阻塞住的通道 // 這個通道永遠不會用於發送,只用於接收和close。 wait chan struct{} // 計數器,標記須要等待執行完成的job數量 n int } func newGeneration() generation { return generation{ wait: make(chan struct{}) } } func (g generation) end() { // close通道將釋放由於接受通道而阻塞住的goroutine close(g.wait) } //這裏咱們使用一個通道來保護當前的generation。 //它基本上是WaitGroup狀態的互斥量。 type WaitGroup chan generation func NewWaitGroup() WaitGroup { wg := make(WaitGroup, 1) g := newGeneration() // 在一個新的WaitGroup上Wait, 由於計數器是0,會當即返回不會阻塞住線程 // 它表現跟當前世代已經結束了同樣, 因此這裏先把世代裏的wait通道close掉 // 防止剛建立WaitGroup時調用Wait函數會阻塞線程 g.end() wg <- g return wg } func (wg WaitGroup) Add(delta int) { // 獲取當前的世代 g := <-wg if g.n == 0 { // 計數器是0,建立一個新的世代 g = newGeneration() } g.n += delta if g.n < 0 { // 跟sync庫裏的WaitGroup同樣,不容許計數器爲負數 panic("negative WaitGroup count") } if g.n == 0 { // 計數器回到0了,關閉wait通道,被WaitGroup的Wait方法 // 阻塞住的線程會被釋放出來繼續往下執行 g.end() } // 將更新後的世代發送回WaitGroup通道 wg <- g } func (wg WaitGroup) Done() { wg.Add(-1) } func (wg WaitGroup) Wait() { // 獲取當前的世代 g := <-wg // 保存一個世代裏wait通道的引用 wait := g.wait // 將世代寫回WaitGroup通道 wg <- g // 接收世代裏的wait通道 // 由於wait通道里沒有值,會把調用Wait方法的goroutine阻塞住 // 直到WaitGroup的計數器回到0,wait通道被close後纔會解除阻塞 <-wait }
今天這篇文章用通道實現了Go語言sync
包裏經常使用的幾種同步鎖,主要的目的是演示通道和select
語句結合後強大的表達能力,並無什麼實際應用價值,你們也不要在實際開發中使用這裏實現的同步鎖。
有關通道和同步鎖都適合解決什麼種類的問題咱們後面的文章再細說,今天這篇文章,須要充分理解Go語言通道的行爲才能理解文章裏的代碼,若是有哪裏看不懂的能夠留言,只要時間容許我都會回答。
若是還不瞭解sync
包裏的同步鎖的使用方法,請先看這篇文章 Go語言sync包的應用詳解。後面的文章我會介紹併發編程裏的數據競爭問題以及解決方法,以及考慮給你們留一道思考題,請你們關注公衆號裏的動態。