golang channel初次接觸

goroutine之間的同步

goroutine是golang中在語言級別實現的輕量級線程,僅僅利用go就能馬上起一個新線程。多線程會引入線程之間的同步問題,經典的同步問題如生產者-消費者問題,在c,java級別須要使用鎖、信號量進行共享資源的互斥使用和多線程之間的時序控制,而在golang中有channel做爲同步的工具。css

1. channel實現兩個goroutine之間的通訊

package main

import "strconv"
import "fmt"

func main() {
    taskChan := make(chan string, 3)
    doneChan := make(chan int, 1)

    for i := 0; i < 3; i++ {
        taskChan <- strconv.Itoa(i)
        fmt.Println("send: ", i)
    }

    go func() {
        for i := 0; i < 3; i++ {
            task := <-taskChan
            fmt.Println("received: ", task)
        }
        doneChan <- 1
    }()

    <-doneChan
}
  • 建立一個channel,make(chan TYPE {, NUM}), TYPE指的是channel中傳輸的數據類型,第二個參數是可選的,指的是channel的容量大小。
  • 向channel傳入數據,CHAN <- DATA, CHAN指的是目的channel即收集數據的一方,DATA則是要傳的數據。
  • 啓動一個goroutine接收main routine向channel發送的數據,go func(){ BODY }()新建一個線程運行一個匿名函數。
  • 從channel讀取數據,DATA := <-CHAN,和向channel傳入數據相反,在數據輸送箭頭的右側的是channel,形象地展示了數據從‘隧道’流出到變量裏。
  • 通知主線程任務執行結束,doneChan的做用是爲了讓main routine等待這個剛起的goroutine結束,這裏顯示了channel的另外一個特性,若是從empty channel中讀取數據,則會阻塞當前routine,直到有數據能夠讀取。

上面這個程序就是main routine向另外一個routine發送了3條int類型的數據,當3條數據被接收到後,主線程也從阻塞狀態恢復運行,隨後結束。html

2. 不要陷入「死鎖」

我一開始用channel的時候有報過"fatal error: all goroutines are asleep - deadlock! "的錯誤,真實的代碼是下面這樣的:java

package main
import "fmt"

func main() {
    ch := make(chan int)
    ch <- 1   // I'm blocked because there is no channel read yet. 
    fmt.Println("send")
    go func() {
        <-ch  // I will never be called for the main routine is blocked!
        fmt.Println("received")
    }()
    fmt.Println("over")
}

個人本意是從main routine發送給另外一個routine一個int型的數據,可是運行出了上述的錯誤,緣由有2個:golang

  • 當前routine向channel發送/接收數據時,若是另外一端沒有相應地接收/發送,那麼當前routine則會進行休眠。
  • 這個程序的main routine先行在ch <- 1進入休眠狀態,程序的餘下部分根原本不及運行,那麼channel裏的數據永遠不會被讀出,也就不能喚醒main routine,進入「死鎖」。

解決這個「死鎖」的方法但是是設置channel的容量大小大於1,那麼channel就不會由於數據輸入而阻塞主程; 或者將數據輸入channel的語句置於啓動新的goroutine以後。服務器

3. channel做爲狀態轉移的信號源

我跟着MIT的分佈式計算課程作了原型爲Map-Reduce的課後練習,目的是實現一個Master向Worker分派任務的功能:Master服務器去等待Worker服務器鏈接註冊,Master先將Map任務和Reduce任務分派給這些註冊Worker,等待Map任務所有完成,而後將Reduce任務再分配出去,等待所有完成。多線程

// Initialize a channel which records the process of the map jobs.
mapJobChannel := make(chan string)

// Start a goroutine to send the nMap(the number of the Map jobs) tasks to the main routine.
go func() {
    for m := 0; m < nMap; m++ {
        // Send the "start a Map job <m>" to the receiver.
        mapJobChannel <- "start, " + strconv.Itoa(m)
    }
}()

// Main routine listens on this mapJobChannel for the Map job task information.
nMapJobs := nMap

// Process the nMap Map tasks util they're all done.
for nMapJobs > 0 {
    // Receive the Map tasks from the channel.
    taskInfo := strings.Split(<-mapJobChannel, ",")
    state, mapNo := taskInfo[0], taskInfo[1]
    
    if state == "start" {
        // Assign a Map task with number mapNo to a worker.
        go func() {
            // Wait for a worker to finish the Map task.
            ok, workNo := assignJobToWorker("Map", mapNo)
            if ok {
                // Send a task finish signal and set the worker's state to idle.
                mapJobChannel <- "end, " + mapNo
                setWorkerState(workNo, "idle")
            } else {
                // Restart this task and set the worker's state to finished.
                mapJobChannel <- "start, " + mapNo
                setWorkerState(workNo, "finished")
            }
        }()
    } else {
        nMapJobs--
    }
}

以上是我截取本身寫的代碼,關於用channel來傳遞當前Map任務的進度信息,用相似信號的方式標註當前的任務執行狀態。分佈式

  • 當從channel中讀取到"start, {NUM}"時找一個空閒的Worker去執行Map任務,而且等待它的完成,完成成功則向channel中發送"end, {NUM}"信號,代表任務完成,若是失敗,就重發"start, {NUM}"信號。
  • 從channel中讀取到"end, {NUM}"時,把剩餘任務數減1。
    這種信號觸發的方式,觸發Master的狀態轉移,而且能夠經過增長信號以及信號處理的方式,拓展業務處理的狀況,暫時還能處理這個需求情景。

MIT分佈式系統課程函數

相關文章
相關標籤/搜索