[快速入門]Go語言的CSP併發模型

Go語言的併發模型

Go語言實現了一下兩種併發形式:編程

第一種是你們廣泛認知的:多線程共享內存。其實就許多主流編程語言中的多線程開發。bash

另一種是Go語言特有的,也是Go語言推薦的:CSP(communicating sequential processes)併發模型。該方式是Go語言最大的兩個亮點goroutine和chan,兩者合體的典型應用。多線程

CSP 是 Communicating Sequential Process 的簡稱,中文能夠叫作通訊順序進程,是一種併發編程模型,是一個很強大的併發數據模型,是上個世紀七十年代提出的,用於描述兩個獨立的併發實體經過共享的通信 channel(管道)進行通訊的併發模型。併發

Go語言其實只用到了 CSP 的很小一部分,即理論中的 Process/Channel(對應到語言中的 goroutine/channel):這兩個併發原語之間沒有從屬關係, Process 能夠訂閱任意個 Channel,Channel 也並不關心是哪一個 Process 在利用它進行通訊;Process 圍繞 Channel 進行讀寫,造成一套有序阻塞和可預測的併發模型。異步

相信你們必定見過一句話:編程語言

Do not communicate by sharing memory; instead, share memory by communicating.函數

不要經過共享內存來通訊,而要經過通訊來實現內存共享。單元測試

這就是 Go 的併發哲學,它依賴 CSP 模型,基於 channel 實現。測試

channel

channel的建立

channel 字面意義是 「通道」,相似於 Linux 中的管道。聲明 channel 的語法以下:ui

chan T          // 能夠接收和發送類型爲 T 的數據
chan<- float64  // 只能夠用來發送 float64 類型的數據
<-chan int      // 只能夠用來接收 int 類型的數據
複製代碼

使用make初始化Channel,而且能夠設置容量:

make(chan int, 100)
複製代碼

由於 channel 是一個引用類型,因此在它被初始化以前,它的值是 nil,channel 使用 make 函數進行初始化。能夠向它傳遞一個 int 值,表明 channel 緩衝區的大小(容量),構造出來的是一個緩衝型的 channel;不傳或傳 0 的,構造的就是一個非緩衝型的 channel。

Channel 分爲兩種:帶緩衝、不帶緩衝。對不帶緩衝的 channel 進行的操做實際上能夠看做 「同步模式」,帶緩衝的則稱爲 「異步模式」。

同步模式下,發送方和接收方要同步就緒,只有在二者都 ready 的狀況下,數據才能在二者間傳輸。不然,任意一方先行進行發送或接收操做,都會被掛起,等待另外一方的出現才能被喚醒。

異步模式下,在緩衝槽可用的狀況下(有剩餘容量),發送和接收操做均可以順利進行。不然,操做的一方(如寫入)一樣會被掛起,直到出現相反操做(如接收)纔會被喚醒。

代碼示例

//這裏定義兩個函數,下面分別驗證同步模式執行以及異步模式執行的效果
func service() {
	time.Sleep(time.Millisecond * 30)
	return "Done"
}
func otherTask() {
	fmt.Println("this is other task B")
	time.Sleep(time.Millisecond * 100)
	fmt.Println("Task B is done")
}
複製代碼

同步模式執行

func AsyncService() chan string { 
	//阻塞模式,即A將信息放進channel直到有人讀取,不然將一直阻塞	
	retCh := make(chan string) 
	go func () {
		ret := service()
		fmt.Println("service return result")
		retCh <- ret 
		fmt.Println("service exited")
	}()
	return retCh
}

//單元測試
func TestAsynService(t *testing.T) {
	retCh := AsyncService()
	otherTask()
	fmt.Println(<-retCh)
	time.Sleep(time.Second * 1)
}
複製代碼

單測結果運行以下,能夠看出等到當othertask執行完開始從chan中取數據時協程才繼續向下執行,在這以前一直處於掛起狀態

this is other task B
service return result
Task B is done
Done
service exited
複製代碼

異步模式執行

func AsyncService() chan string { 
	retCh := make(chan string,1) //buffer模式,非阻塞 丟進channel就繼續向下執行
	go func () {
		ret := service()
		fmt.Println("service return result")
		retCh <- ret 
		fmt.Println("service exited")
	}()
	return retCh
}

func TestAsynService(t *testing.T) {
	retCh := AsyncService()
	otherTask()
	fmt.Println(<-retCh)
	time.Sleep(time.Second * 1)
}
複製代碼

執行結果以下,能夠明顯的看到這種模式下並無等待從chan中獲取消息,直接向下繼續運行

this is other task
service return result
service exited
Task B is done
Done
複製代碼

channel的使用

1.send操做

c := make(chan int)
c <- 3
複製代碼

注意,往一個已經被close的channel中繼續發送數據會致使run-time panic

2.recive操做

c := make(chan int)
c <- 3
i := <-c
fmt.Println(i) //3
複製代碼

從一個nil channel中接收數據會一直被block,直到有數據能夠接收;從一個被close的channel中接收數據不會被阻塞,而是當即返回,會返回元素類型的零值(zero value)以及一個表明當前channel狀態的bool值。能夠經過這個特性判斷channel是否關閉

if x, ok := <-ch;ok {    //ok 爲bool值,true標識正常接收,false表示通道關閉
    ...
}else{
    ...
} 
複製代碼

3.close操做

c := make(chan int)
close(c)
複製代碼

全部的channel接受者都會在channel關閉時,馬上從阻塞等待中返回且上述ok值爲false(若是有值可取依舊會正常取值)。這個廣播機制常被利用,進行向多個訂閱者同時發送信號

代碼示例

//數據生產者
func dataProducer(ch chan int, wg *sync.WaitGroup) {
	go func() {
		for i := 0; i < 10; i++ {
			ch <- i
		}
		close(ch)	//channel關閉

		wg.Done()
	}()

}

//數據接受者
func dataReceiver(ch chan int, wg *sync.WaitGroup) {
	go func() {
		for {
			if data, ok := <-ch; ok {	//channel關閉後,ok值將變爲false
				fmt.Println(data)
			} else {
				break
			}
		}
		wg.Done()
	}()

}

func TestCloseChannel(t *testing.T) {
	var wg sync.WaitGroup
	ch := make(chan int)
	wg.Add(1)
	dataProducer(ch, &wg)
	wg.Add(1)
	dataReceiver(ch, &wg)
	wg.Wait()
複製代碼

與switch-case搭配實現選路

select-case語句配合channel能夠實現多路選擇以及超時控制功能,每一個case後面跟一個阻塞事件,當有事件收到響應後則結束等待,若是均沒有響應則執行default

//多渠道選擇
//原理以下,採用select-case語句 每一個case後面跟一個阻塞事件,當有事件收到響應後則結束等待,若是均沒有響應則執行default
func TestSwitch(t *testing.T){
	select{
		case ret1 := <-retCH1:
			t.Logf("case 1 return")
		case ret2 := <-retCH2:
			t.Logf("case 2 return")
		default:
			t.Logf("no one return")
	}
}

//超時控制
func TestTimeOut(t *testing.T){
	select {
	case ret := <- retCH1:
		t.Logf("case 1 return")
	case <-time.After(time.Second*1):
		t.Logf("time out")
	}
}
複製代碼
相關文章
相關標籤/搜索