GoLang之協程

GoLang之協程

 

目前,WebServer幾種主流的併發模型:c++

  • 多線程,每一個線程一次處理一個請求,在當前請求處理完成以前不會接收其它請求;但在高併發環境下,多線程的開銷比較大;
  • 基於回調的異步IO,如Nginx服務器使用的epoll模型,這種模式經過事件驅動的方式使用異步IO,使服務器持續運轉,但人的思惟模式是串行的,大量回調函數會把流程分割,對於問題自己的反應不夠天然;
  • 協程,不須要搶佔式調度,能夠有效提升線程的任務併發性,而避免多線程的缺點;但原生支持協程的語言還不多。

 

協程(coroutine)是Go語言中的輕量級線程實現,由Go運行時(runtime)管理。程序員

在一個函數調用前加上go關鍵字,此次調用就會在一個新的goroutine中併發執行。當被調用的函數返回時,這個goroutine也自動結束。須要注意的是,若是這個函數有返回值,那麼這個返回值會被丟棄。數組

 

先看下面的例子:服務器

func Add(x, y int) {
    z := x + y
    fmt.Println(z)
}

func main() {
    for i:=0; i<10; i++ {
        go Add(i, i)
    }
}

執行上面的代碼,會發現屏幕什麼也沒打印出來,程序就退出了。
對於上面的例子,main()函數啓動了10個goroutine,而後返回,這時程序就退出了,而被啓動的執行Add()的goroutine沒來得及執行。咱們想要讓main()函數等待全部goroutine退出後再返回,但如何知道goroutine都退出了呢?這就引出了多個goroutine之間通訊的問題。多線程

 

在工程上,有兩種最多見的併發通訊模型:共享內存和消息。併發

來看下面的例子,10個goroutine共享了變量counter,每一個goroutine執行完成後,將counter值加1.由於10個goroutine是併發執行的,因此咱們還引入了鎖,也就是代碼中的lock變量。在main()函數中,使用for循環來不斷檢查counter值,當其值達到10時,說明全部goroutine都執行完畢了,這時main()返回,程序退出。異步

package main
import (
    "fmt"
    "sync"
    "runtime"
)

var counter int = 0

func Count(lock *sync.Mutex) {
    lock.Lock()
    counter++
    fmt.Println("counter =", counter)
    lock.Unlock()
}


func main() {

    lock := &sync.Mutex{}

    for i:=0; i<10; i++ {
        go Count(lock)
    }

    for {
        lock.Lock()

        c := counter

        lock.Unlock()

        runtime.Gosched()    // 出讓時間片

        if c >= 10 {
            break
        }
    }
}

上面的例子,使用了鎖變量(屬於一種共享內存)來同步協程,事實上Go語言主要使用消息機制(channel)來做爲通訊模型。socket

 

 


 

channel

消息機制認爲每一個併發單元是自包含的、獨立的個體,而且都有本身的變量,但在不一樣併發單元間這些變量不共享。每一個併發單元的輸入和輸出只有一種,那就是消息。函數

channel是Go語言在語言級別提供的goroutine間的通訊方式,咱們可使用channel在多個goroutine之間傳遞消息。channel是進程內的通訊方式,所以經過channel傳遞對象的過程和調用函數時的參數傳遞行爲比較一致,好比也能夠傳遞指針等。
channel是類型相關的,一個channel只能傳遞一種類型的值,這個類型須要在聲明channel時指定。高併發

 

channel的聲明形式爲:
var chanName chan ElementType

舉個例子,聲明一個傳遞int類型的channel:

var ch chan int

 

使用內置函數make()定義一個channel:

ch := make(chan int)

 

在channel的用法中,最多見的包括寫入和讀出:

// 將一個數據value寫入至channel,這會致使阻塞,直到有其餘goroutine從這個channel中讀取數據
ch <- value

// 從channel中讀取數據,若是channel以前沒有寫入數據,也會致使阻塞,直到channel中被寫入數據爲止
value := <-ch

默認狀況下,channel的接收和發送都是阻塞的,除非另外一端已準備好。 

 

 咱們還能夠建立一個帶緩衝的channel:

c := make(chan int, 1024)

// 從帶緩衝的channel中讀數據
for i:=range c {
  ...
}

此時,建立一個大小爲1024的int類型的channel,即便沒有讀取方,寫入方也能夠一直往channel裏寫入,在緩衝區被填完以前都不會阻塞。

 

能夠關閉再也不使用的channel:

close(ch)

應該在生產者的地方關閉channel,若是在消費者的地方關閉,容易引發panic; 

在一個已關閉 channel 上執行接收操做(<-ch)老是可以當即返回,返回值是對應類型的零值。

 

如今利用channel來重寫上面的例子:

func Count(ch chan int) {
    ch <- 1
    fmt.Println("Counting")
}

func main() {

    chs := make([] chan int, 10)

    for i:=0; i<10; i++ {
        chs[i] = make(chan int)
        go Count(chs[i])
    }

    for _, ch := range(chs) {
        <-ch
    }
}

在這個例子中,定義了一個包含10個channel的數組,並把數組中的每一個channel分配給10個不一樣的goroutine。在每一個goroutine完成後,向goroutine寫入一個數據,在這個channel被讀取前,這個操做是阻塞的。在全部的goroutine啓動完成後,依次從10個channel中讀取數據,在對應的channel寫入數據前,這個操做也是阻塞的。這樣,就用channel實現了相似鎖的功能,並保證了全部goroutine完成後main()才返回。

另外,咱們在將一個channel變量傳遞到一個函數時,能夠經過將其指定爲單向channel變量,從而限制該函數中能夠對此channel的操做。

單向channel變量的聲明:

var ch1 chan int      // 普通channel
var ch2 chan <- int    // 只用於寫int數據
var ch3 <-chan int    // 只用於讀int數據

 

能夠經過類型轉換,將一個channel轉換爲單向的:

ch4 := make(chan int)
ch5 := <-chan int(ch4)   // 單向讀
ch6 := chan<- int(ch4)  //單向寫

 

單向channel的做用有點相似於c++中的const關鍵字,用於遵循代碼「最小權限原則」。

例如在一個函數中使用單向讀channel:

func Parse(ch <-chan int) {
    for value := range ch {
        fmt.Println("Parsing value", value) 
    }
}

 

channel做爲一種原生類型,自己也能夠經過channel進行傳遞,例以下面這個流式處理結構:

type PipeData struct {
    value int
    handler func(int) int
    next chan int
}

func handle(queue chan *PipeData) {
    for data := range queue {
        data.next <- data.handler(data.value)
    }
}

 

 

 

select

在UNIX中,select()函數用來監控一組描述符,該機制常被用於實現高併發的socket服務器程序。Go語言直接在語言級別支持select關鍵字,用於處理異步IO問題,大體結構以下:

select {
    case <- chan1:
    // 若是chan1成功讀到數據
    
    case chan2 <- 1:
    // 若是成功向chan2寫入數據

    default:
    // 默認分支
}

 select默認是阻塞的,只有當監聽的channel中有發送或接收能夠進行時纔會運行,當多個channel都準備好的時候,select是隨機的選擇一個執行的。

Go語言沒有對channel提供直接的超時處理機制,但咱們能夠利用select來間接實現,例如:

timeout := make(chan bool, 1)

go func() {
    time.Sleep(1e9)
    timeout <- true
}()

switch {
    case <- ch:
    // 從ch中讀取到數據

    case <- timeout:
    // 沒有從ch中讀取到數據,但從timeout中讀取到了數據
}

這樣使用select就能夠避免永久等待的問題,由於程序會在timeout中獲取到一個數據後繼續執行,而不管對ch的讀取是否還處於等待狀態。

 

 

 


 

併發

早期版本的Go編譯器並不能很智能的發現和利用多核的優點,即便在咱們的代碼中建立了多個goroutine,但實際上全部這些goroutine都容許在同一個CPU上,在一個goroutine獲得時間片執行的時候其它goroutine都會處於等待狀態。

實現下面的代碼能夠顯式指定編譯器將goroutine調度到多個CPU上運行。

import "runtime"
...
runtime.GOMAXPROCS(4)

 

 

PS:runtime包中有幾個處理goroutine的函數,

函數

說明

Goexit

退出當前執行的goroutine,可是defer函數還會繼續調用

Gosched

讓出當前goroutine的執行權限,調度器安排其餘等待的任務運行,並在下次某個時候從該位置恢復執行

NumCPU

返回 CPU 核數量

NumGoroutine

返回正在執行和排隊的任務總數

GOMAXPROCS

用來設置能夠並行計算的CPU核數的最大值,並返回以前的值

 

 

 

調度

Go調度的幾個概念:

M:內核線程;
G:go routine,併發的最小邏輯單元,由程序員建立;
P:處理器,執行G的上下文環境,每一個P會維護一個本地的go routine隊列;

 

 除了每一個P擁有一個本地的go routine隊列外,還存在一個全局的go routine隊列。

具體調度原理:

  1. P的數量在初始化由GOMAXPROCS決定;
  2. 咱們要作的就是添加G;
  3. G的數量超出了M的處理能力,且還有空餘P的話,runtime就會自動建立新的M;
  4. M拿到P後才能幹活,取G的順序:本地隊列>全局隊列>其餘P的隊列,若是全部隊列都沒有可用的G,M會歸還P並進入休眠;

 

 

一個G若是發生阻塞等事件會進行阻塞,以下圖:

G發生上下文切換條件:

  • 系統調用;
  • 讀寫channel;
  • gosched主動放棄,會將G扔進全局隊列;

如上圖,一個G發生阻塞時,M0讓出P,由M1接管其任務隊列;當M0執行的阻塞調用返回後,再將G0扔到全局隊列,本身則進入睡眠(沒有P了沒法幹活);

相關文章
相關標籤/搜索