Golang 入門 : channel(通道)

筆者在《Golang 入門 : 競爭條件》一文中介紹了 Golang 併發編程中須要面對的競爭條件。本文咱們就介紹如何使用 Golang 提供的 channel(通道) 消除競爭條件。html

Channel 是 Golang 在語言級別提供的 goroutine 之間的通訊方式,可使用 channel 在兩個或多個 goroutine 之間傳遞消息。Channel 是進程內的通訊方式,所以經過 channel 傳遞對象的過程和調用函數時的參數傳遞行爲比較一致,好比也能夠傳遞指針等。使用通道發送和接收所需的共享資源,能夠在 goroutine 之間消除競爭條件。linux

當一個資源須要在 goroutine 之間共享時,channel 在 goroutine 之間架起了一個管道,並提供了確保同步交換數據的機制。Channel 是類型相關的,也就是說,一個 channel 只能傳遞一種類型的值,這個類型須要在聲明 channel 時指定。能夠經過 channel 共享內置類型、命名類型、結構類型和引用類型的值或者指針。編程

基本語法

聲明 channel 的語法格式爲:
var ChannelName chan ElementType服務器

與通常變量聲明的不一樣之處僅僅是在類型前面添加了一個 chan 關鍵字。ElementType 則指明這個 channel 可以傳遞的數據的類型。好比聲明一個傳遞 int 類型的 channel:併發

var ch chan int

或者是聲明一個 map,其元素是 bool 型的 channel:異步

var m map[string] chan bool

在 Golang 中須要使用內置的 make 函數類建立 channel 的實例:socket

ch := make(chan int)

這樣就聲明並初始化了一個名爲 ch 的 int 型 channel。使用 channel 發送和接收數據的語法也很直觀,好比下面的代碼把數據發送到 channel 中:函數

ch <- value

向 channel 中寫入數據一般會致使程序阻塞,直到有其它 goroutine 從這個 channel 中讀取數據。下面的代碼把數據從 channel 讀取到變量中:高併發

value := <-ch

注意,若是 channel 中沒有數據,那麼從 channel 中讀取數據也會致使程序阻塞,直到 channel 中被寫入數據爲止。post

根據 channel 是否有緩衝區能夠簡單地把 channel 分爲無緩衝區的 channel 和帶緩衝區的 channel,在本文接下來的篇幅中會詳細的介紹這兩類 channel 的用法。

select

Linux 系統中的 select 函數用來監控一系列的文件句柄,一旦其中一個文件句柄發生了 I/O 動做,select 函數就會返回。該函數主要被用來實現高併發的 socket 服務器程序。Golang 中的 select 關鍵字和 linux 中的 select 函數功能有點類似,它主要用於處理異步 I/O 問題。

select 的語法與 switch 的語法很是類似,由 select 開始一個新的選擇塊,每一個選擇條件有 case 語句來描述。與 switch 語句能夠選擇任何可以使用相等比較的條件相比,select 有比較多的限制,其中最大的一條限制就是每一個 case 語句裏必須是一個 I/O 操做。其大體的結構以下:

select {
    case <-chan1:       // 若是 chan1 成功讀取到數據,則執行該 case 語句
    case chan2 <- 1:    // 若是成功向 chan2 寫入數據,則執行該 case 語句
    default:            // 若是上面的條件都沒有成功,則執行 default 流程
}

能夠看出,select 不像 switch,後面並無條件判斷,而是直接去查看 case 語句。每一個 case 語句都必須是一個面向 channel 的操做。好比上面的例子中,第一個 case 試圖從 chan1 讀取一個數據並直接忽略讀取到的數據,而第二個 case 則試圖向 chan2 中寫入一個整數 1,若是這二者都沒有成功,則執行 default 語句。

無緩衝的 channel

無緩衝的 channel(unbuffered channel) 是指在接收前沒有能力保存任何值的 channel。這種類型的 channel 要求發送 goroutine 和接收 goroutine 同時準備好,才能完成發送和接收操做。若是兩個 goroutine 沒有同時準備好,channel 會致使先執行發送或接收操做的 goroutine 阻塞等待。這種對通道進行發送和接收的交互行爲自己就是同步的。其中任意一個操做都沒法離開另外一個操做單獨存在。咱們能夠經過下面的圖示形象地理解兩個 goroutine 如何利用無緩衝的 channel 來共享一個值(下圖來自互聯網):

下面詳細地解釋一下上圖:

  • 在第 1 步,兩個 goroutine 都到達通道,但兩個都沒有開始執行數據的發送或接收。
  • 在第 2 步,左側的 goroutine 將它的手伸進了通道,這模擬了向通道發送數據的行爲。這時,這個 goroutine 會在通道中被鎖住,直到交換完成。
  • 在第 3 步,右側的 goroutine 將它的手放入通道,這模擬了從通道里接收數據。這個 goroutine同樣也會在通道中被鎖住,直到交換完成。
  • 在第 4 步和第 5 步,進行數據交換。
  • 在第 6 步,兩個 goroutine 都將它們的手從通道里拿出來,這模擬了被鎖住的 goroutine 獲得釋放。兩個 goroutine 如今均可以去作別的事情了。

下面的例子模擬一場網球比賽。在網球比賽中,兩位選手會把球在兩我的之間來回傳遞。選手老是處在如下兩種狀態之一:要麼在等待接球,要麼將球打向對方。可使用兩個goroutine來模擬網球比賽,並使用無緩衝的通道來模擬球的來回:

// 這個示例程序展現如何用無緩衝的通道來模擬
//2個goroutine間的網球比賽
package main

import(
    "math/rand"
    "sync"
    "time"
    "fmt"
)

// wg用來等待程序結束
var wg sync.WaitGroup

func init() {
    rand.Seed(time.Now().UnixNano())
}

// main是全部Go程序的入口
func main() {
    // 建立一個無緩衝的通道
    court := make(chan int)

    // 計數加2,表示要等待兩個goroutine
    wg.Add(2)

    // 啓動兩個選手
    go player("Nick", court)
    go player("Jack", court)

    // 發球
    court <- 1

    // 等待遊戲結束
    wg.Wait()
}

// player 模擬一個選手在打網球
func player(name string, court chan int) {
    // 在函數退出時調用Done來通知main函數工做已經完成
    defer wg.Done()

    for{
        // 等待球被擊打過來
        ball, ok := <-court
        if !ok {
            // 若是通道被關閉,咱們就贏了
            fmt.Printf("Player %s Won\n", name)
            return
        }

        // 選隨機數,而後用這個數來判斷咱們是否丟球
        n := rand.Intn(100)
        if n%5 == 0 {
            fmt.Printf("Player %s Missed\n", name)

            // 關閉通道,表示咱們輸了
            close(court)
            return
        }

        // 顯示擊球數,並將擊球數加1
        fmt.Printf("Player %s Hit %d\n", name, ball)
        ball++

        // 將球打向對手
        court <- ball
    }
}

運行上面的代碼,會輸出相似下面的信息:

Player Jack Hit 1
Player Nick Hit 2
Player Jack Hit 3
Player Nick Hit 4
Player Jack Missed
Player Nick Won

簡單解釋一下上面的代碼:
在 main 函數中建立了一個 int 類型的無緩衝的通道,使用該通道讓兩個 goroutine 在擊球時可以互相同步。而後建立了參與比賽的兩個 goroutine。在這個時候,兩個 goroutine 都阻塞住等待擊球。court <- 1 模擬發球,將球發到通道里,程序開始執行這個比賽,直到某個 goroutine 輸掉比賽。
在 player 函數裏,主要是運行一個無限循環的 for 語句。在這個循環裏,是玩遊戲的過程。goroutine 從通道接收數據,用來表示等待接球。這個接收動做會鎖住 goroutine,直到有數據發送到通道里。通道的接收動做返回時,會檢測 ok 標誌是否爲 false。若是這個值是 false,表示通道已經被關閉,遊戲結束。在這個模擬程序中,使用隨機數來決定 goroutine 是否擊中了球。若是擊中了球,就把 ball 的值遞增 1,並將 ball 做爲球從新放入通道,發送給另外一位選手。在這個時刻,兩個 goroutine 都會被鎖住,直到交換完成。最終,引某個 goroutine 沒有打中球會把通道關閉。以後兩個 goroutine 都會返回,經過 defer 聲明的 Done 會被執行,程序終止。

帶緩衝的 channel

帶緩衝的 channel(buffered channel) 是一種在被接收前能存儲一個或者多個值的通道。這種類型的通道並不強制要求 goroutine 之間必須同時完成發送和接收。通道會阻塞發送和接收動做的條件也會不一樣。只有在通道中沒有要接收的值時,接收動做纔會阻塞。只有在通道沒有可用緩衝區容納被髮送的值時,發送動做纔會阻塞。這致使有緩衝的通道和無緩衝的通道之間的一個很大的不一樣:無緩衝的通道保證進行發送和接收的 goroutine 會在同一時間進行數據交換;有緩衝的通道沒有這種保證。能夠經過下面的圖示形象地理解兩個 goroutine 分別向帶緩衝的通道里增長一個值和從帶緩衝的通道里移除一個值(下圖來自互聯網):

下面詳細地解釋一下上圖:

  • 在第 1 步,右側的 goroutine 正在從通道接收一個值。
  • 在第 2 步,右側的這個 goroutine 獨立完成了接收值的動做,而左側的 goroutine 正在發送一個新值到通道里。
  • 在第 3 步,左側的 goroutine 還在向通道發送新值,而右側的 goroutine 正在從通道接收另一個值。這個步驟裏的兩個操做既不是同步的,也不會互相阻塞。
  • 最後,在第 4 步,全部的發送和接收都完成,而通道里還有幾個值,也有一些空間能夠存更多的值。

建立帶緩衝區的 channel 很是簡單,只須要再添加一個緩衝區的大小就能夠了,好比建立一個傳遞 int 類型數據,緩衝區爲 10 的 channel:

ch := make(chan int, 10)

下面的 demo 使用一組 goroutine 來接收並完成任務,帶緩衝區的通道提供了一種清晰而直觀的方式來實現這個功能:

// 這個示例程序展現如何使用
// 有緩衝的通道和固定數目的
// goroutine來處理一堆工做
package main

import(
    "math/rand"
    "sync"
    "time"
    "fmt"
)

const(
    numberGoroutines = 2 // 要使用的goroutine的數量
    taskLoad = 5 // 要處理的工做的數量
)

// wg用來等待程序結束
var wg sync.WaitGroup

func init()  {
    rand.Seed(time.Now().UnixNano())
}

// main是全部Go程序的入口
func main()  {
    // 建立一個有緩衝的通道來管理工做
    tasks := make(chan string, taskLoad)
    
    // 啓動goroutine來處理工做
    wg.Add(numberGoroutines)
    for gr := 1; gr <= numberGoroutines; gr++ {
        go worker(tasks, gr)
    }
    
    // 增長一組要完成的工做
    for post := 1; post <= taskLoad; post++ {
        tasks <- fmt.Sprintf("Task: %d", post)
    }
    
    // 當全部工做都處理完時關閉通道
    // 以便全部goroutine退出
    close(tasks)
    
    // 等待全部工做完成
    wg.Wait()
}

// worker做爲goroutine啓動來處理
// 從有緩衝的通道傳入的工做
func worker(tasks chan string, worker int) {
    // 通知函數已經返回
    defer wg.Done()

    for{
        // 等待分配工做
        task, ok := <-tasks
        if !ok{
            // 這意味着通道已經空了,而且已被關閉
            fmt.Printf("Worker: %d: Shutting Down\n", worker)
            return
        }

        // 顯示咱們開始工做了
        fmt.Printf("Worker: %d: Started %s\n", worker, task)

        // 隨機等一段時間來模擬工做
        sleep := rand.Int63n(100)
        time.Sleep(time.Duration(sleep)* time.Millisecond)

        // 顯示咱們完成了工做
        fmt.Printf("Worker: %d: Completed %s\n", worker, task)
    }
}

運行上面的程序,輸出結果大體以下:

Worker: 2: Started Task: 1
Worker: 1: Started Task: 2
Worker: 1: Completed Task: 2
Worker: 1: Started Task: 3
Worker: 1: Completed Task: 3
Worker: 1: Started Task: 4
Worker: 2: Completed Task: 1
Worker: 2: Started Task: 5
Worker: 1: Completed Task: 4
Worker: 1: Shutting Down
Worker: 2: Completed Task: 5
Worker: 2: Shutting Down

代碼裏有很詳細的註釋,所以再也不贅言,只解釋一下通道的關閉:
關閉通道的代碼很是重要。當通道關閉後,goroutine 依舊能夠從通道接收數據,可是不能再向通道里發送數據。可以從已經關閉的通道接收數據這一點很是重要,由於這容許通道關閉後依舊能取出其中緩衝的所有值,而不會有數據丟失。從一個已經關閉且沒有數據的通道里獲取數據,總會馬上返回,並返回一個通道類型的零值。若是在獲取通道時還加入了可選的標誌,就能獲得通道的狀態信息。

處理超時

使用 channel 時須要當心,好比對於下面的簡單用法:

i := <-ch

碰到永遠沒有往 ch 中寫入數據的狀況,那麼這個讀取動做將永遠也沒法從 ch 中讀取到數據,致使的結果就是整個 goroutine 永遠阻塞而且沒有挽回的機會。若是 channel 只是被同一個開發者使用,那樣出問題的可能性還低一些。但若是一旦對外公開,就必須考慮到最差狀況並對程序進行維護。

Golang 沒有提供直接的超時處理機制,但能夠利用 select 機制變通地解決。由於 select 的特色是隻要其中一個 case 已經完成,程序就會繼續往下執行,而不會考慮其它的 case。基於此特性咱們來實現一個 channel 的超時機制:

ch := make(chan int)
// 首先實現並執行一個匿名的超時等待函數
timeout := make(chan bool, 1)
go func() {
    time.Sleep(1e9) // 等待 1 秒
    timeout <- true
}()
// 而後把 timeout 這個 channel 利用起來
select {
case <-ch:
    // 從 ch 中讀取到數據
case <- timeout:
    // 一直沒有從 ch 中讀取到數據,但從 timeout 中讀取到了數據
    fmt.Println("Timeout occurred.")
}

執行上面的代碼,輸出的結果爲:

Timeout occurred.

關閉 channel

關閉 channel 很是簡單,直接調用 Golang 內置的 close() 函數就能夠了:

close(ch)

在關閉了 channel 以後咱們要面對的問題是:如何判斷一個 channel 是否已關閉?
其實在從 channel 中讀取數據的同時,還能夠得到一個布爾類型的值,該值表示 channel 是否已關閉:

x, ok := <-ch

若是 ok 的值爲 false,則表示 ch 已經被關閉。

參考:
《Go語言實戰》
《Go語言編程入門與實戰技巧》

相關文章
相關標籤/搜索