Go語言基礎之併發

Go語言中的併發編程——併發是編程裏面一個很是重要的概念,Go語言在語言層面天生支持併發,這也是Go語言流行的一個很重要的緣由。html

併發與並行

併發:同一時間段內執行多個任務(你在用微信和兩個女友聊天)。java

並行:同一時刻執行多個任務(你和你朋友都在用微信和女友聊天)。c++

Go語言的併發經過goroutine實現。goroutine相似於線程,屬於用戶態的線程,咱們能夠根據須要建立成千上萬個goroutine併發工做。goroutine是由Go語言的運行時(runtime)調度完成,而線程是由操做系統調度完成。git

Go語言還提供channel在多個goroutine間進行通訊。goroutinechannel是 Go 語言秉承的 CSP(Communicating Sequential Process)併發模式的重要實現基礎。程序員

goroutine

在java/c++中咱們要實現併發編程的時候,咱們一般須要本身維護一個線程池,而且須要本身去包裝一個又一個的任務,同時須要本身去調度線程執行任務並維護上下文切換,這一切一般會耗費程序員大量的心智。那麼能不能有一種機制,程序員只須要定義不少個任務,讓系統去幫助咱們把這些任務分配到CPU上實現併發執行呢?github

Go語言中的goroutine就是這樣一種機制,goroutine的概念相似於線程,但 goroutine是由Go的運行時(runtime)調度和管理的。Go程序會智能地將 goroutine 中的任務合理地分配給每一個CPU。Go語言之因此被稱爲現代化的編程語言,就是由於它在語言層面已經內置了調度和上下文切換的機制。算法

在Go語言編程中你不須要去本身寫進程、線程、協程,你的技能包裏只有一個技能–goroutine,當你須要讓某個任務併發執行的時候,你只須要把這個任務包裝成一個函數,開啓一個goroutine去執行這個函數就能夠了,就是這麼簡單粗暴。編程

使用goroutine

Go語言中使用goroutine很是簡單,只須要在調用函數的時候在前面加上go關鍵字,就能夠爲一個函數建立一個goroutine安全

一個goroutine一定對應一個函數,能夠建立多個goroutine去執行相同的函數。微信

啓動單個goroutine

啓動goroutine的方式很是簡單,只須要在調用的函數(普通函數和匿名函數)前面加上一個go關鍵字。

舉個例子以下:

func hello() {
	fmt.Println("Hello Goroutine!")
}
func main() {
	hello()
	fmt.Println("main goroutine done!")
}

這個示例中hello函數和下面的語句是串行的,執行的結果是打印完Hello Goroutine!後打印main goroutine done!

接下來咱們在調用hello函數前面加上關鍵字go,也就是啓動一個goroutine去執行hello這個函數。

func main() {
	go hello() // 啓動另一個goroutine去執行hello函數
	fmt.Println("main goroutine done!")
}

這一次的執行結果只打印了main goroutine done!,並無打印Hello Goroutine!。爲何呢?

在程序啓動時,Go程序就會爲main()函數建立一個默認的goroutine

當main()函數返回的時候該goroutine就結束了,全部在main()函數中啓動的goroutine會一同結束,main函數所在的goroutine就像是權利的遊戲中的夜王,其餘的goroutine都是異鬼,夜王一死它轉化的那些異鬼也就所有GG了。

因此咱們要想辦法讓main函數等一等hello函數,最簡單粗暴的方式就是time.Sleep了。

func main() {
	go hello() // 啓動另一個goroutine去執行hello函數
	fmt.Println("main goroutine done!")
	time.Sleep(time.Second)
}

執行上面的代碼你會發現,這一次先打印main goroutine done!,而後緊接着打印Hello Goroutine!

首先爲何會先打印main goroutine done!是由於咱們在建立新的goroutine的時候須要花費一些時間,而此時main函數所在的goroutine是繼續執行的。

啓動多個goroutine

在Go語言中實現併發就是這樣簡單,咱們還能夠啓動多個goroutine。讓咱們再來一個例子: (這裏使用了sync.WaitGroup來實現goroutine的同步)

var wg sync.WaitGroup

func hello(i int) {
	defer wg.Done() // goroutine結束就登記-1
	fmt.Println("Hello Goroutine!", i)
}
func main() {

	for i := 0; i < 10; i++ {
		wg.Add(1) // 啓動一個goroutine就登記+1
		go hello(i)
	}
	wg.Wait() // 等待全部登記的goroutine都結束
}

屢次執行上面的代碼,會發現每次打印的數字的順序都不一致。這是由於10個goroutine是併發執行的,而goroutine的調度是隨機的。

goroutine與線程

可增加的棧

OS線程(操做系統線程)通常都有固定的棧內存(一般爲2MB),一個goroutine的棧在其生命週期開始時只有很小的棧(典型狀況下2KB),goroutine的棧不是固定的,他能夠按需增大和縮小,goroutine的棧大小限制能夠達到1GB,雖然極少會用到這個大。因此在Go語言中一次建立十萬左右的goroutine也是能夠的。

goroutine調度

GPM是Go語言運行時(runtime)層面的實現,是go語言本身實現的一套調度系統。區別於操做系統調度OS線程。

  • G很好理解,就是個goroutine的,裏面除了存放本goroutine信息外 還有與所在P的綁定等信息。
  • P管理着一組goroutine隊列,P裏面會存儲當前goroutine運行的上下文環境(函數指針,堆棧地址及地址邊界),P會對本身管理的goroutine隊列作一些調度(好比把佔用CPU時間較長的goroutine暫停、運行後續的goroutine等等)當本身的隊列消費完了就去全局隊列裏取,若是全局隊列裏也消費完了會去其餘P的隊列裏搶任務。
  • M(machine)是Go運行時(runtime)對操做系統內核線程的虛擬, M與內核線程通常是一一映射的關係, 一個groutine最終是要放到M上執行的;

P與M通常也是一一對應的。他們關係是: P管理着一組G掛載在M上運行。當一個G長久阻塞在一個M上時,runtime會新建一個M,阻塞G所在的P會把其餘的G 掛載在新建的M上。當舊的G阻塞完成或者認爲其已經死掉時 回收舊的M。

P的個數是經過runtime.GOMAXPROCS設定(最大256),Go1.5版本以後默認爲物理線程數。 在併發量大的時候會增長一些P和M,但不會太多,切換太頻繁的話得不償失。

單從線程調度講,Go語言相比起其餘語言的優點在於OS線程是由OS內核來調度的,goroutine則是由Go運行時(runtime)本身的調度器調度的,這個調度器使用一個稱爲m:n調度的技術(複用/調度m個goroutine到n個OS線程)。 其一大特色是goroutine的調度是在用戶態下完成的, 不涉及內核態與用戶態之間的頻繁切換,包括內存的分配與釋放,都是在用戶態維護着一塊大的內存池, 不直接調用系統的malloc函數(除非內存池須要改變),成本比調度OS線程低不少。 另外一方面充分利用了多核的硬件資源,近似的把若干goroutine均分在物理線程上, 再加上自己goroutine的超輕量,以上種種保證了go調度方面的性能。

點我瞭解更多

GOMAXPROCS

Go運行時的調度器使用GOMAXPROCS參數來肯定須要使用多少個OS線程來同時執行Go代碼。默認值是機器上的CPU核心數。例如在一個8核心的機器上,調度器會把Go代碼同時調度到8個OS線程上(GOMAXPROCS是m:n調度中的n)。

Go語言中能夠經過runtime.GOMAXPROCS()函數設置當前程序併發時佔用的CPU邏輯核心數。

Go1.5版本以前,默認使用的是單核心執行。Go1.5版本以後,默認使用所有的CPU邏輯核心數。

咱們能夠經過將任務分配到不一樣的CPU邏輯核心上實現並行的效果,這裏舉個例子:

func a() {
	for i := 1; i < 10; i++ {
		fmt.Println("A:", i)
	}
}

func b() {
	for i := 1; i < 10; i++ {
		fmt.Println("B:", i)
	}
}

func main() {
	runtime.GOMAXPROCS(1)
	go a()
	go b()
	time.Sleep(time.Second)
}

兩個任務只有一個邏輯核心,此時是作完一個任務再作另外一個任務。 將邏輯核心數設爲2,此時兩個任務並行執行,代碼以下。

func a() {
	for i := 1; i < 10; i++ {
		fmt.Println("A:", i)
	}
}

func b() {
	for i := 1; i < 10; i++ {
		fmt.Println("B:", i)
	}
}

func main() {
	runtime.GOMAXPROCS(2)
	go a()
	go b()
	time.Sleep(time.Second)
}

Go語言中的操做系統線程和goroutine的關係:

  1. 一個操做系統線程對應用戶態多個goroutine。
  2. go程序能夠同時使用多個操做系統線程。
  3. goroutine和OS線程是多對多的關係,即m:n。

channel

單純地將函數併發執行是沒有意義的。函數與函數間須要交換數據才能體現併發執行函數的意義。

雖然可使用共享內存進行數據交換,可是共享內存在不一樣的goroutine中容易發生競態問題。爲了保證數據交換的正確性,必須使用互斥量對內存進行加鎖,這種作法勢必形成性能問題。

Go語言的併發模型是CSP(Communicating Sequential Processes),提倡經過通訊共享內存而不是經過共享內存而實現通訊

若是說goroutine是Go程序併發的執行體,channel就是它們之間的鏈接。channel是可讓一個goroutine發送特定值到另外一個goroutine的通訊機制。

Go 語言中的通道(channel)是一種特殊的類型。通道像一個傳送帶或者隊列,老是遵循先入先出(First In First Out)的規則,保證收發數據的順序。每個通道都是一個具體類型的導管,也就是聲明channel的時候須要爲其指定元素類型。

channel類型

channel是一種類型,一種引用類型。聲明通道類型的格式以下:

var 變量 chan 元素類型

舉幾個例子:

var ch1 chan int   // 聲明一個傳遞整型的通道
var ch2 chan bool  // 聲明一個傳遞布爾型的通道
var ch3 chan []int // 聲明一個傳遞int切片的通道

建立channel

通道是引用類型,通道類型的空值是nil

var ch chan int
fmt.Println(ch) // <nil>

聲明的通道後須要使用make函數初始化以後才能使用。

建立channel的格式以下:

make(chan 元素類型, [緩衝大小])

channel的緩衝大小是可選的。

舉幾個例子:

ch4 := make(chan int)
ch5 := make(chan bool)
ch6 := make(chan []int)

channel操做

通道有發送(send)、接收(receive)和關閉(close)三種操做。

發送和接收都使用<-符號。

如今咱們先使用如下語句定義一個通道:

ch := make(chan int)

發送

將一個值發送到通道中。

ch <- 10 // 把10發送到ch中

接收

從一個通道中接收值。

x := <- ch // 從ch中接收值並賦值給變量x
<-ch       // 從ch中接收值,忽略結果

關閉

咱們經過調用內置的close函數來關閉通道。

close(ch) 

關於關閉通道須要注意的事情是,只有在通知接收方goroutine全部的數據都發送完畢的時候才須要關閉通道。通道是能夠被垃圾回收機制回收的,它和關閉文件是不同的,在結束操做以後關閉文件是必需要作的,但關閉通道不是必須的。

關閉後的通道有如下特色:

  1. 對一個關閉的通道再發送值就會致使panic。
  2. 對一個關閉的通道進行接收會一直獲取值直到通道爲空。
  3. 對一個關閉的而且沒有值的通道執行接收操做會獲得對應類型的零值。
  4. 關閉一個已經關閉的通道會致使panic。

無緩衝的通道

無緩衝的通道又稱爲阻塞的通道。咱們來看一下下面的代碼:

func main() {
	ch := make(chan int)
	ch <- 10
	fmt.Println("發送成功")
}

上面這段代碼可以經過編譯,可是執行的時候會出現如下錯誤:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        .../src/github.com/Q1mi/studygo/day06/channel02/main.go:8 +0x54

爲何會出現deadlock錯誤呢?

由於咱們使用ch := make(chan int)建立的是無緩衝的通道,無緩衝的通道只有在有人接收值的時候才能發送值。就像你住的小區沒有快遞櫃和代收點,快遞員給你打電話必需要把這個物品送到你的手中,簡單來講就是無緩衝的通道必須有接收才能發送。

上面的代碼會阻塞在ch <- 10這一行代碼造成死鎖,那如何解決這個問題呢?

一種方法是啓用一個goroutine去接收值,例如:

func recv(c chan int) {
	ret := <-c
	fmt.Println("接收成功", ret)
}
func main() {
	ch := make(chan int)
	go recv(ch) // 啓用goroutine從通道接收值
	ch <- 10
	fmt.Println("發送成功")
}

無緩衝通道上的發送操做會阻塞,直到另外一個goroutine在該通道上執行接收操做,這時值才能發送成功,兩個goroutine將繼續執行。相反,若是接收操做先執行,接收方的goroutine將阻塞,直到另外一個goroutine在該通道上發送一個值。

使用無緩衝通道進行通訊將致使發送和接收的goroutine同步化。所以,無緩衝通道也被稱爲同步通道

有緩衝的通道

解決上面問題的方法還有一種就是使用有緩衝區的通道。咱們能夠在使用make函數初始化通道的時候爲其指定通道的容量,例如:

func main() {
	ch := make(chan int, 1) // 建立一個容量爲1的有緩衝區通道
	ch <- 10
	fmt.Println("發送成功")
}

只要通道的容量大於零,那麼該通道就是有緩衝的通道,通道的容量表示通道中能存放元素的數量。就像你小區的快遞櫃只有那麼個多格子,格子滿了就裝不下了,就阻塞了,等到別人取走一個快遞員就能往裏面放一個。

咱們可使用內置的len函數獲取通道內元素的數量,使用cap函數獲取通道的容量,雖然咱們不多會這麼作。

for range從通道循環取值

當向通道中發送完數據時,咱們能夠經過close函數來關閉通道。

當通道被關閉時,再往該通道發送值會引起panic,從該通道里接收的值一直都是類型零值。那如何判斷一個通道是否被關閉了呢?

咱們來看下面這個例子:

// channel 練習
func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)
	// 開啓goroutine將0~100的數發送到ch1中
	go func() {
		for i := 0; i < 100; i++ {
			ch1 <- i
		}
		close(ch1)
	}()
	// 開啓goroutine從ch1中接收值,並將該值的平方發送到ch2中
	go func() {
		for {
			i, ok := <-ch1 // 通道關閉後再取值ok=false
			if !ok {
				break
			}
			ch2 <- i * i
		}
		close(ch2)
	}()
	// 在主goroutine中從ch2中接收值打印
	for i := range ch2 { // 通道關閉後會退出for range循環
		fmt.Println(i)
	}
}

從上面的例子中咱們看到有兩種方式在接收值的時候判斷該通道是否被關閉,不過咱們一般使用的是for range的方式。使用for range遍歷通道,當通道被關閉的時候就會退出for range

單向通道

有的時候咱們會將通道做爲參數在多個任務函數間傳遞,不少時候咱們在不一樣的任務函數中使用通道都會對其進行限制,好比限制通道在函數中只能發送或只能接收。

Go語言中提供了單向通道來處理這種狀況。例如,咱們把上面的例子改造以下:

func counter(out chan<- int) {
	for i := 0; i < 100; i++ {
		out <- i
	}
	close(out)
}

func squarer(out chan<- int, in <-chan int) {
	for i := range in {
		out <- i * i
	}
	close(out)
}
func printer(in <-chan int) {
	for i := range in {
		fmt.Println(i)
	}
}

func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)
	go counter(ch1)
	go squarer(ch2, ch1)
	printer(ch2)
}

其中,

  • chan<- int是一個只寫單向通道(只能對其寫入int類型值),能夠對其執行發送操做可是不能執行接收操做;
  • <-chan int是一個只讀單向通道(只能從其讀取int類型值),能夠對其執行接收操做可是不能執行發送操做。

在函數傳參及任何賦值操做中能夠將雙向通道轉換爲單向通道,但反過來是不能夠的。

通道總結

channel常見的異常總結,以下圖: 

關閉已經關閉的channel也會引起panic

func worker(id int, jobs <-chan int, results chan<- int) {
	for j := range jobs {
		fmt.Printf("worker:%d start job:%d\n", id, j)
		time.Sleep(time.Second)
		fmt.Printf("worker:%d end job:%d\n", id, j)
		results <- j * 2
	}
}


func main() {
	jobs := make(chan int, 100)
	results := make(chan int, 100)
	// 開啓3個goroutine
	for w := 1; w <= 3; w++ {
		go worker(w, jobs, results)
	}
	// 5個任務
	for j := 1; j <= 5; j++ {
		jobs <- j
	}
	close(jobs)
	// 輸出結果
	for a := 1; a <= 5; a++ {
		<-results
	}
}

worker pool(goroutine池)

在工做中咱們一般會使用能夠指定啓動的goroutine數量–worker pool模式,控制goroutine的數量,防止goroutine泄漏和暴漲。

一個簡易的work pool示例代碼以下:

 

select多路複用

在某些場景下咱們須要同時從多個通道接收數據。通道在接收數據時,若是沒有數據能夠接收將會發生阻塞。你也許會寫出以下代碼使用遍歷的方式來實現:

for{
    // 嘗試從ch1接收值
    data, ok := <-ch1
    // 嘗試從ch2接收值
    data, ok := <-ch2
    …
}

這種方式雖然能夠實現從多個通道接收值的需求,可是運行性能會差不少。爲了應對這種場景,Go內置了select關鍵字,能夠同時響應多個通道的操做。

select的使用相似於switch語句,它有一系列case分支和一個默認的分支。每一個case會對應一個通道的通訊(接收或發送)過程。select會一直等待,直到某個case的通訊操做完成時,就會執行case分支對應的語句。具體格式以下:

select{
    case <-ch1:
        ...
    case data := <-ch2:
        ...
    case ch3<-data:
        ...
    default:
        默認操做
}

舉個小例子來演示下select的使用:

func main() {
	ch := make(chan int, 1)
	for i := 0; i < 10; i++ {
		select {
		case x := <-ch:
			fmt.Println(x)
		case ch <- i:
		}
	}
}

使用select語句能提升代碼的可讀性。

  • 可處理一個或多個channel的發送/接收操做。
  • 若是多個case同時知足,select會隨機選擇一個。
  • 對於沒有caseselect{}會一直等待,可用於阻塞main函數。

併發安全和鎖

有時候在Go代碼中可能會存在多個goroutine同時操做一個資源(臨界區),這種狀況會發生競態問題(數據競態)。類比現實生活中的例子有十字路口被各個方向的的汽車競爭;還有火車上的衛生間被車箱裏的人競爭。

舉個例子:

var x int64
var wg sync.WaitGroup

func add() {
	for i := 0; i < 5000; i++ {
		x = x + 1
	}
	wg.Done()
}
func main() {
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
}

上面的代碼中咱們開啓了兩個goroutine去累加變量x的值,這兩個goroutine在訪問和修改x變量的時候就會存在數據競爭,致使最後的結果與期待的不符。

互斥鎖

互斥鎖是一種經常使用的控制共享資源訪問的方法,它可以保證同時只有一個goroutine能夠訪問共享資源。Go語言中使用sync包的Mutex類型來實現互斥鎖。 使用互斥鎖來修復上面代碼的問題:

var x int64
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
	for i := 0; i < 5000; i++ {
		lock.Lock() // 加鎖
		x = x + 1
		lock.Unlock() // 解鎖
	}
	wg.Done()
}
func main() {
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
}

使用互斥鎖可以保證同一時間有且只有一個goroutine進入臨界區,其餘的goroutine則在等待鎖;當互斥鎖釋放後,等待的goroutine才能夠獲取鎖進入臨界區,多個goroutine同時等待一個鎖時,喚醒的策略是隨機的。

讀寫互斥鎖

互斥鎖是徹底互斥的,可是有不少實際的場景下是讀多寫少的,當咱們併發的去讀取一個資源不涉及資源修改的時候是沒有必要加鎖的,這種場景下使用讀寫鎖是更好的一種選擇。讀寫鎖在Go語言中使用sync包中的RWMutex類型。

讀寫鎖分爲兩種:讀鎖和寫鎖。當一個goroutine獲取讀鎖以後,其餘的goroutine若是是獲取讀鎖會繼續得到鎖,若是是獲取寫鎖就會等待;當一個goroutine獲取寫鎖以後,其餘的goroutine不管是獲取讀鎖仍是寫鎖都會等待。

讀寫鎖示例:

var (
	x      int64
	wg     sync.WaitGroup
	lock   sync.Mutex
	rwlock sync.RWMutex
)

func write() {
	// lock.Lock()   // 加互斥鎖
	rwlock.Lock() // 加寫鎖
	x = x + 1
	time.Sleep(10 * time.Millisecond) // 假設讀操做耗時10毫秒
	rwlock.Unlock()                   // 解寫鎖
	// lock.Unlock()                     // 解互斥鎖
	wg.Done()
}

func read() {
	// lock.Lock()                  // 加互斥鎖
	rwlock.RLock()               // 加讀鎖
	time.Sleep(time.Millisecond) // 假設讀操做耗時1毫秒
	rwlock.RUnlock()             // 解讀鎖
	// lock.Unlock()                // 解互斥鎖
	wg.Done()
}

func main() {
	start := time.Now()
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go write()
	}

	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go read()
	}

	wg.Wait()
	end := time.Now()
	fmt.Println(end.Sub(start))
}

須要注意的是讀寫鎖很是適合讀多寫少的場景,若是讀和寫的操做差異不大,讀寫鎖的優點就發揮不出來。

sync.WaitGroup

在代碼中生硬的使用time.Sleep確定是不合適的,Go語言中可使用sync.WaitGroup來實現併發任務的同步。 sync.WaitGroup有如下幾個方法:

方法名 功能
(wg * WaitGroup) Add(delta int) 計數器+delta
(wg *WaitGroup) Done() 計數器-1
(wg *WaitGroup) Wait() 阻塞直到計數器變爲0

sync.WaitGroup內部維護着一個計數器,計數器的值能夠增長和減小。例如當咱們啓動了N 個併發任務時,就將計數器值增長N。每一個任務完成時經過調用Done()方法將計數器減1。經過調用Wait()來等待併發任務執行完,當計數器值爲0時,表示全部併發任務已經完成。

咱們利用sync.WaitGroup將上面的代碼優化一下:

var wg sync.WaitGroup

func hello() {
	defer wg.Done()
	fmt.Println("Hello Goroutine!")
}
func main() {
	wg.Add(1)
	go hello() // 啓動另一個goroutine去執行hello函數
	fmt.Println("main goroutine done!")
	wg.Wait()
}

須要注意sync.WaitGroup是一個結構體,傳遞的時候要傳遞指針。

sync.Once

說在前面的話:這是一個進階知識點。

在編程的不少場景下咱們須要確保某些操做在高併發的場景下只執行一次,例如只加載一次配置文件、只關閉一次通道等。

Go語言中的sync包中提供了一個針對只執行一次場景的解決方案–sync.Once

sync.Once只有一個Do方法,其簽名以下:

func (o *Once) Do(f func()) {}

備註:若是要執行的函數f須要傳遞參數就須要搭配閉包來使用。

加載配置文件示例

延遲一個開銷很大的初始化操做到真正用到它的時候再執行是一個很好的實踐。由於預先初始化一個變量(好比在init函數中完成初始化)會增長程序的啓動耗時,並且有可能實際執行過程當中這個變量沒有用上,那麼這個初始化操做就不是必需要作的。咱們來看一個例子:

var icons map[string]image.Image

func loadIcons() {
	icons = map[string]image.Image{
		"left":  loadIcon("left.png"),
		"up":    loadIcon("up.png"),
		"right": loadIcon("right.png"),
		"down":  loadIcon("down.png"),
	}
}

// Icon 被多個goroutine調用時不是併發安全的
func Icon(name string) image.Image {
	if icons == nil {
		loadIcons()
	}
	return icons[name]
}

多個goroutine併發調用Icon函數時不是併發安全的,現代的編譯器和CPU可能會在保證每一個goroutine都知足串行一致的基礎上自由地重排訪問內存的順序。loadIcons函數可能會被重排爲如下結果:

func loadIcons() {
	icons = make(map[string]image.Image)
	icons["left"] = loadIcon("left.png")
	icons["up"] = loadIcon("up.png")
	icons["right"] = loadIcon("right.png")
	icons["down"] = loadIcon("down.png")
}

在這種狀況下就會出現即便判斷了icons不是nil也不意味着變量初始化完成了。考慮到這種狀況,咱們能想到的辦法就是添加互斥鎖,保證初始化icons的時候不會被其餘的goroutine操做,可是這樣作又會引起性能問題。

使用sync.Once改造的示例代碼以下:

var icons map[string]image.Image

var loadIconsOnce sync.Once

func loadIcons() {
	icons = map[string]image.Image{
		"left":  loadIcon("left.png"),
		"up":    loadIcon("up.png"),
		"right": loadIcon("right.png"),
		"down":  loadIcon("down.png"),
	}
}

// Icon 是併發安全的
func Icon(name string) image.Image {
	loadIconsOnce.Do(loadIcons)
	return icons[name]
}

併發安全的單例模式

下面是藉助sync.Once實現的併發安全的單例模式:

package singleton

import (
    "sync"
)

type singleton struct {}

var instance *singleton
var once sync.Once

func GetInstance() *singleton {
    once.Do(func() {
        instance = &singleton{}
    })
    return instance
}

sync.Once其實內部包含一個互斥鎖和一個布爾值,互斥鎖保證布爾值和數據的安全,而布爾值用來記錄初始化是否完成。這樣設計就能保證初始化操做的時候是併發安全的而且初始化操做也不會被執行屢次。

sync.Map

Go語言中內置的map不是併發安全的。請看下面的示例:

var m = make(map[string]int)

func get(key string) int {
	return m[key]
}

func set(key string, value int) {
	m[key] = value
}

func main() {
	wg := sync.WaitGroup{}
	for i := 0; i < 20; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			set(key, n)
			fmt.Printf("k=:%v,v:=%v\n", key, get(key))
			wg.Done()
		}(i)
	}
	wg.Wait()
}

上面的代碼開啓少許幾個goroutine的時候可能沒什麼問題,當併發多了以後執行上面的代碼就會報fatal error: concurrent map writes錯誤。

像這種場景下就須要爲map加鎖來保證併發的安全性了,Go語言的sync包中提供了一個開箱即用的併發安全版map–sync.Map。開箱即用表示不用像內置的map同樣使用make函數初始化就能直接使用。同時sync.Map內置了諸如StoreLoadLoadOrStoreDeleteRange等操做方法。

var m = sync.Map{}

func main() {
	wg := sync.WaitGroup{}
	for i := 0; i < 20; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			m.Store(key, n)
			value, _ := m.Load(key)
			fmt.Printf("k=:%v,v:=%v\n", key, value)
			wg.Done()
		}(i)
	}
	wg.Wait()
}

原子操做

代碼中的加鎖操做由於涉及內核態的上下文切換會比較耗時、代價比較高。針對基本數據類型咱們還可使用原子操做來保證併發安全,由於原子操做是Go語言提供的方法它在用戶態就能夠完成,所以性能比加鎖操做更好。Go語言中原子操做由內置的標準庫sync/atomic提供。

atomic包

方法 解釋
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
讀取操做
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
寫入操做
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
修改操做
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
交換操做
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
比較並交換操做

示例

咱們填寫一個示例來比較下互斥鎖和原子操做的性能。

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

type Counter interface {
	Inc()
	Load() int64
}

// 普通版
type CommonCounter struct {
	counter int64
}

func (c CommonCounter) Inc() {
	c.counter++
}

func (c CommonCounter) Load() int64 {
	return c.counter
}

// 互斥鎖版
type MutexCounter struct {
	counter int64
	lock    sync.Mutex
}

func (m *MutexCounter) Inc() {
	m.lock.Lock()
	defer m.lock.Unlock()
	m.counter++
}

func (m *MutexCounter) Load() int64 {
	m.lock.Lock()
	defer m.lock.Unlock()
	return m.counter
}

// 原子操做版
type AtomicCounter struct {
	counter int64
}

func (a *AtomicCounter) Inc() {
	atomic.AddInt64(&a.counter, 1)
}

func (a *AtomicCounter) Load() int64 {
	return atomic.LoadInt64(&a.counter)
}

func test(c Counter) {
	var wg sync.WaitGroup
	start := time.Now()
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			c.Inc()
			wg.Done()
		}()
	}
	wg.Wait()
	end := time.Now()
	fmt.Println(c.Load(), end.Sub(start))
}

func main() {
	c1 := CommonCounter{} // 非併發安全
	test(c1)
	c2 := MutexCounter{} // 使用互斥鎖實現併發安全
	test(&c2)
	c3 := AtomicCounter{} // 併發安全且比互斥鎖效率更高
	test(&c3)
}

atomic包提供了底層的原子級內存操做,對於同步算法的實現頗有用。這些函數必須謹慎地保證正確使用。除了某些特殊的底層應用,使用通道或者sync包的函數/類型實現同步更好。

練習題

  1. 使用goroutinechannel實現一個計算int64隨機數各位數和的程序。
    1. 開啓一個goroutine循環生成int64類型的隨機數,發送到jobChan
    2. 開啓24個goroutinejobChan中取出隨機數計算各位數的和,將結果發送到resultChan
    3. goroutineresultChan取出結果並打印到終端輸出
  2. 爲了保證業務代碼的執行性能將以前寫的日誌庫改寫爲異步記錄日誌方式。

轉載自李文周博客

相關文章
相關標籤/搜索