面試官讓我用channel實現sync包裏的同步鎖,是否是故意爲難我?

Go語言提供了channel和sync包兩種併發控制的方法,每種方法都有他們適用的場景,並非全部併發場景都適合應用channel的,有的時候用sync包裏提供的同步原語更簡單。今天這個話題純屬是爲了經過用channel實現同步鎖的功能來學習掌握channel擁有的強大能力,並不適合在實際中使用。並且面試中有時候就是會出一些奇奇怪怪的題考應聘者對知識的理解以及靈活運用的應變能力。面試

你們仔細看看文章裏用channel實現幾種經常使用的同步鎖的思路,沒準兒哪次面試就碰上這樣的面試官了呢。編程

今天,我將深刻探討Go語言channelselect語句的表達能力。爲了演示只用這兩個原語就能夠實現多少功能,我將從頭開始用它們重寫sync包。併發

sync包提供的同步原語的有哪些以及如何使用咱們已經在以前的文章裏介紹過了,因此這裏不會再去介紹用channel實現的這些同步原語應該怎麼用。若是對用法有疑問請回看以前的文章: Go語言sync包的應用詳解函數

Once

once是一個簡單而強大的原語,可確保在並行程序中一個函數僅執行一次。學習

channel版的Once咱們使用帶有一個緩衝的通道來實現 第一次調用Do(func ())goroutine從通道中接收到值後,後續的goroutine將會被阻塞中,直到Do的參數函數執行完成後關閉通道爲止。其餘goroutine判斷通道已關閉後將不執行任何操做並當即返回。ui

type Once chan struct{}

func NewOnce() Once {
	o := make(Once, 1)
	// 只容許一個goroutine接收,其餘goroutine會被阻塞住
	o <- struct{}{}
	return o
}

func (o Once) Do(f func()) {

	_, ok := <-o
	if !ok {
		// Channel已經被關閉
		// 證實f已經被執行過了,直接return.
		return
	}
	// 調用f, 由於channel中只有一個值
	// 因此只有一個goroutine會到達這裏
	
	f()
	// 關閉通道,這將釋放全部在等待的
	// 以及將來會調用Do方法的goroutine
	close(o)
}
複製代碼

Mutex

大小爲N的信號量最多容許N個goroutine在任何給定時間保持其鎖。互斥鎖是大小爲1的信號量的特例。spa

信號量(英語:semaphore)又稱爲信號標,是一個同步對象,用於保持在0至指定最大值之間的一個計數值。當線程完成一次對該semaphore對象的等待(wait)時,該計數值減一;當線程完成一次對semaphore對象的釋放(release)時,計數值加一。當計數值爲0,則線程直至該semaphore對象變成signaled狀態才能等待成功。semaphore對象的計數值大於0,爲signaled狀態;計數值等於0,爲nonsignaled狀態.線程

咱們先用channel實現信號量的功能code

type Semaphore chan struct{}

func NewSemaphore(size int) Semaphore {
	return make(Semaphore, size)
}

func (s Semaphore) Lock() {
	// 只有在s還有空間的時候才能發送成功
	s <- struct{}{}
}

func (s Semaphore) Unlock() {
	// 爲其餘信號量騰出空間
	<-s
}
複製代碼

上面也說了互斥鎖是大小爲1的信號量的特例。那麼在剛纔實現的信號量的基礎上實現互斥鎖只須要:cdn

type Mutex Semaphore

func NewMutex() Mutex {
	return Mutex(NewSemaphore(1))
}
複製代碼

RWMutex

RWMutex是一個稍微複雜的原語:它容許任意數量的併發讀鎖,但在任何給定時間僅容許一個寫鎖。還能夠保證,若是有線程持有寫鎖,則任何線程都不能持有或得到讀鎖。

sync標準庫裏的RWMutex還容許若是有線程嘗試獲取寫鎖,則其餘讀鎖將排隊等待,以免餓死嘗試獲取寫鎖的線程。爲了簡潔起見,在用channel實現的RWMutex裏咱們忽略了這部分邏輯。

RWMutex具備三種狀態:空閒,存在寫鎖和存在讀鎖。這意味着咱們須要兩個通道分別標記RWMutex上的讀鎖和寫鎖:空閒時,兩個通道都爲空;當獲取到寫鎖時,標記寫鎖的通道里將被寫入一下空結構體;當獲取到讀鎖時,咱們向兩個通道中都寫入一個值(避免寫鎖可以向標記寫鎖的通道發送值),其中標記讀鎖的通道里的值表明當前RWMutex擁有的讀鎖的數量,讀鎖釋放的時候除了更新通道里存的讀鎖數量值,也會抽空寫鎖通道。

type RWMutex struct {
	write   chan struct{}
	readers chan int
}

func NewLock() RWMutex {
	return RWMutex{
		// 用來作一個普通的互斥鎖
		write:   make(chan struct{}, 1),
		// 用來保護讀鎖的數量,獲取讀鎖時經過接受通道里的值確保
		// 其餘goroutine不會在同一時間更改讀鎖的數量。
		readers: make(chan int, 1),
	}
}

func (l RWMutex) Lock() { l.write <- struct{}{} }
func (l RWMutex) Unlock() { <-l.write }

func (l RWMutex) RLock() {
	// 統計當前讀鎖的數量,默認爲0
	var rs int
	select {
	case l.write <- struct{}{}:
	// 若是write通道能發送成功,證實如今沒有讀鎖
	// 向write通道發送一個值,防止出現併發的讀-寫
	case rs = <-l.readers: 
	// 能從通道里接收到值,證實RWMutex上已經有讀鎖了,下面會更新讀鎖數量
	}
	// 若是執行了l.write <- struct{}{}, rs的值會是0
	rs++
	// 更新RWMutex讀鎖數量
	l.readers <- rs
}

func (l RWMutex) RUnlock() {
	// 讀出讀鎖數量而後減一
	rs := <-l.readers
	rs--
	// 若是釋放後讀鎖的數量變爲0了,抽空write通道,讓write通道變爲可用
	if rs == 0 {
		<-l.write
		return
	}
	// 若是釋放後讀鎖的數量減一後不是0,把新的讀鎖數量發送給readers通道
	l.readers <- rs
}
複製代碼

WaitGroup

WaitGroup最多見的用途是建立一個組,向其計數器中設置一個計數,生成與該計數同樣多的goroutine,而後等待它們完成。每次goroutine運行完畢後,它將在組上調用Done表示已完成工做。能夠經過調用WaitGroupDone方法或以負數調用Add方法減小計數器的計數。當計數器達到0時,被Wait方法阻塞住的主線程會恢復執行。

WaitGroup一個不爲人知的功能是在計數器達到0後,若是調用Add方法讓計數器變爲正數,這將使WaitGroup重回阻塞狀態。 這意味着對於每一個給定的WaitGroup,都有一點"世代"的意味:

  • 當計數器從0移到正數時開始"世代"。
  • 當計數器重回到0時,WaitGroup的一個世代結束。
  • 當一個世代結束時,被該世代的所阻塞住的線程將恢復執行。

下面是用channel實現的WaitGroup同步原語,真正起到阻塞goroutine做用的是世代裏的wait通道,而後經過用WaitGroup通道包裝generation結構體實現WaitGroupWaitAdd等功能。用文字很難描述清楚仍是直接看下面的代碼吧,代碼裏的註釋會幫助理解實現原理。

type generation struct {
	// 用於讓等待者阻塞住的通道
	// 這個通道永遠不會用於發送,只用於接收和close。
	wait chan struct{}
	// 計數器,標記須要等待執行完成的job數量
	n int
}

func newGeneration() generation {
	return generation{ wait: make(chan struct{}) }
}
func (g generation) end() {
	// close通道將釋放由於接受通道而阻塞住的goroutine
	close(g.wait)
}

//這裏咱們使用一個通道來保護當前的generation。
//它基本上是WaitGroup狀態的互斥量。
type WaitGroup chan generation

func NewWaitGroup() WaitGroup {
	wg := make(WaitGroup, 1)
	g := newGeneration()
	// 在一個新的WaitGroup上Wait, 由於計數器是0,會當即返回不會阻塞住線程
	// 它表現跟當前世代已經結束了同樣, 因此這裏先把世代裏的wait通道close掉
	// 防止剛建立WaitGroup時調用Wait函數會阻塞線程
	g.end()
	wg <- g
	return wg
}

func (wg WaitGroup) Add(delta int) {
	// 獲取當前的世代
	g := <-wg
	if g.n == 0 {
		// 計數器是0,建立一個新的世代
		g = newGeneration()
	}
	g.n += delta
	if g.n < 0 {
	    // 跟sync庫裏的WaitGroup同樣,不容許計數器爲負數
	    panic("negative WaitGroup count")
	}
	if g.n == 0 {
	// 計數器回到0了,關閉wait通道,被WaitGroup的Wait方法
	// 阻塞住的線程會被釋放出來繼續往下執行
		g.end()
	}
	// 將更新後的世代發送回WaitGroup通道
	wg <- g
}

func (wg WaitGroup) Done() { wg.Add(-1) }

func (wg WaitGroup) Wait() {
	// 獲取當前的世代
	g := <-wg
	// 保存一個世代裏wait通道的引用
	wait := g.wait
	// 將世代寫回WaitGroup通道
	wg <- g
	// 接收世代裏的wait通道
	// 由於wait通道里沒有值,會把調用Wait方法的goroutine阻塞住
	// 直到WaitGroup的計數器回到0,wait通道被close後纔會解除阻塞
	<-wait
}
複製代碼

總結

今天這篇文章用通道實現了Go語言sync包裏經常使用的幾種同步鎖,主要的目的是演示通道和select語句結合後強大的表達能力,並無什麼實際應用價值,你們也不要在實際開發中使用這裏實現的同步鎖。

有關通道和同步鎖都適合解決什麼種類的問題咱們後面的文章再細說,今天這篇文章,須要充分理解Go語言通道的行爲才能理解文章裏的代碼,若是有哪裏看不懂的能夠留言,只要時間容許我都會回答。

若是還不瞭解sync包裏的同步鎖的使用方法,請先看這篇文章 Go語言sync包的應用詳解。後面的文章我會介紹併發編程裏的數據競爭問題以及解決方法,以及考慮給你們留一道思考題,請你們關注公衆號裏的動態。

相關文章
相關標籤/搜索