【go併發編程】

GO 併發編程

協程(進程 線程)

  1. 進程是「程序執行的一個實例」 ,擔當分配系統資源的實體。進程建立必須分配一個完整的獨立地址空間。進程切換隻發生在內核態。
  2. 線程:線程是進程的一個執行流,獨立執行它本身的程序代碼。
  3. 協程:協程不是進程或線程,其執行過程更相似於子例程,或者說不帶返回值的函數調用。在語言級別能夠建立併發協程,而後編寫代碼去進行管理。go將這一步承包下來,使協程併發運行成本更低。
func main() {
    http.HandleFunc("/next", handler)
    // func這個函數會是以協程的方式運行。這樣就能夠提供程序的併發處理能力
    go func() {
        for i := 0; ; i++ {
            nextID <- i
        }
    }()
    http.ListenAndServe("localhost:8080", nil)
}

goruntime

參考goruntime詳解,操做系統對cpu有本身的scheduler方案,如任務A在執行完後,選擇哪一個任務來執行,使得某個因素(如進程總執行時間,或者磁盤尋道時間等)最小,達到最優的服務。
Go有本身的scheduler,語言級別實現了併發。linux

每個Go程序都附帶一個runtime,runtime負責與底層操做系統交互,也都會有scheduler對goruntines進行調度。在scheduler中有三個很是重要的概念:P,M,G。
詳情後續再寫。golang

# Goroutine scheduler
# The scheduler's job is to distribute ready-to-run goroutines over worker threads.
#
# The main concepts are:
# G - goroutine.
# M - worker thread, or machine.
# P - processor, a resource that is required to execute Go code.
#     M must have an associated P to execute Go code, however it can be
#     blocked or in a syscall w/o an associated P.
#
# Design doc at https://golang.org/s/go11sched.

runtime包與goroutime

儘管 Go 編譯器產生的是本地可執行代碼,這些代碼仍舊運行在 Go 的 runtime(這部分的代碼能夠在 runtime 包中找到)當中。這個 runtime 相似 Java 和 .NET 語言所用到的虛擬機,它負責管理包括內存分配、垃圾回收(第 10.8 節)、棧處理、goroutine、channel、切片(slice)、map 和反射(reflection)等等。編程

  • Gosched:讓當前線程讓出 cpu 以讓其它線程運行,它不會掛起當前線程,所以當前線程將來會繼續執行
  • NumCPU:返回當前系統的 CPU 核數量
  • GOMAXPROCS:設置最大的可同時使用的 CPU 核數
  • Goexit:退出當前 goroutine(可是defer語句會照常執行)
  • NumGoroutine:返回正在執行和排隊的任務總數
  • GOOS:目標操做系統

NumCPU

package main
import (
    "fmt"
    "runtime"
)
func main() {
    fmt.Println("cpus:", runtime.NumCPU())
    fmt.Println("goroot:", runtime.GOROOT())
    fmt.Println("archive:", runtime.GOOS)
    // 4
    // /usr/local/golang
    // linux
}

GOMAXPROCS

package main

import (
    "fmt"
    "runtime"
)

func init() {
    runtime.GOMAXPROCS(1)
}

func main() {
    // 任務邏輯...
}

Golang 默認全部任務都運行在一個 cpu 核裏,若是要在 goroutine 中使用多核,可使用 runtime.GOMAXPROCS 函數修改,當參數小於 1 時使用默認值。緩存

Gosched

這個函數的做用是讓當前 goroutine 讓出 CPU,當一個 goroutine 發生阻塞,Go 會自動地把與該 goroutine 處於同一系統線程的其餘 goroutine 轉移到另外一個系統線程上去,以使這些 goroutine 不阻塞。併發

package main

import (
    "fmt"
    "runtime"
)

func init() {
    runtime.GOMAXPROCS(1)  # 使用單核
}

func main() {
    exit := make(chan int)
    go func() {
        defer close(exit)
        go func() {
            fmt.Println("b")
        }()
    }()

    for i := 0; i < 4; i++ {
        fmt.Println("a:", i)

        if i == 1 {
            runtime.Gosched()  #切換任務
        }
    }
    <-exit
}
# 運行結果
# a: 0
# a: 1
# b: 
# a:2
# a: 3

channel

channel是Go語言在語言級別提供的goroutine間的通訊方式。咱們可使用channel在兩個或 多個goroutine之間傳遞消息。
channel 會某種狀況下出現阻塞,經過控制channel的阻塞來管理協程的併發與流程控制。異步

channel類型

chan T          // 能夠接收和發送類型爲 T 的數據
chan<- float64  // 只能夠用來發送 float64 類型的數據(能夠關閉)
<-chan int      // 只能夠用來接收 int 類型的數據(也不能關閉)
func counter(out chan<- int) {
    for x := 0; x < 100; x++ {
        out <- x
    }
    close(out)
}
func squarer(out chan<- int, in <-chan int) {
    for v := range in {
        out <- v * v
    }
    close(out)
}
func printer(in <-chan int) {
    for v := range in {
        fmt.Println(v)
    }
}
func main() {
    naturals := make(chan int)
    squares := make(chan int)
    go counter(naturals)
    go squarer(squares, naturals)
    printer(squares)
}

這裏使用了單向channel。很明顯數據的流向是單向的。獲取的地方不該該對channel賦值。這樣把一個雙向的channel轉爲一個單向的channel可以防止channel被濫用。下降了風險。函數

channel初始化

make(chan int, 100)
make(chan int)

非緩衝的Channel

ch1 := make(chan int, 1) //緩衝通道
ch2 := make(chan int, 0) //非緩衝通道
ch3 := make(chan int) //非緩衝通道

非緩衝通道特性:ui

  • 向此類通道發送元素值的操做會被阻塞,直到至少有一個針對該通道的接收操做開始進行爲止。
  • 今後類通道接收元素值的操做會被阻塞,直到至少有一個針對該通道的發送操做開始進行爲止。
  • 針對非緩衝通道的接收操做會在與之相應的發送操做完成以前完成。

對於第三條要特別注意,發送操做在向非緩衝通道發送元素值的時候,會等待可以接收該元素值的那個接收操做。而且確保該元素值被成功接收,它纔會真正的完成執行。而緩衝通道中,恰好相反,因爲元素值的傳遞是異步的,因此發送操做在成功向通道發送元素值以後就會當即結束(它不會關心是否有接收操做)操作系統

make(chan int) 和 make(chan int, 1)線程

package main
import "fmt"
func main() {
    var c = make(chan int)
    var a string

    go func() {
        a = "hello world"
        <-c
    }()

    c <- 0
    fmt.Println(a)
}

上面的例子會打印 "hello world"。若是改爲 var c = make(chan int, 1) a 多是 "hello world" 也多是空,make(chan int) 是 unbuffered channel, send 以後 send 語句會阻塞執行,直到有人 receive 以後 send 解除阻塞,後面的語句接着執行。
因此執行 c <- 0 時會阻塞,直到 <-c, 這時 a 已賦值。

make(chan int, 1) 是 buffered channel, 容量爲 1。在 buffer 未滿時往裏面 send 值並不會阻塞, 只有 buffer 滿時再 send 纔會阻塞,因此執行到 c <- 0 時並不會阻塞

send語句

c := make(chan int)
defer close(c)
go func() { c <- 3 + 4 }()
i := <-c
fmt.Println(i)

send被執行前(proceed)通信(communication)一直被阻塞着。如前所言,無緩存的channel只有在receiver準備好後send才被執行。若是有緩存,而且緩存未滿,則send會被執行。

往一個已經被close的channel中繼續發送數據會致使run-time panic。

往nil channel中發送數據會一致被阻塞着。

receive語句

<-ch用來從channel ch中接收數據,這個表達式會一直被block,直到有數據能夠接收。 從一個nil channel中接收數據會一直被block。從一個被close的channel中接收數據不會被阻塞,而是當即返回,接收完已發送的數據後會返回元素類型的零值(zero value)。
如前所述,你可使用一個額外的返回參數來檢查channel是否關閉。

x, ok := <-ch
x, ok = <-ch
var x, ok = <-ch

若是OK 是false,代表接收的x是產生的零值,這個channel被關閉了或者爲空。

Range

func main() {
    go func() {
        time.Sleep(1 * time.Hour)
    }()
    c := make(chan int)
    go func() {
        for i := 0; i < 10; i = i + 1 {
            c <- i
        }
        close(c)
    }()
    for i := range c {
        fmt.Println(i)
    }
    fmt.Println("Finished")
}

range c產生的迭代值爲Channel中發送的值,它會一直迭代知道channel被關閉。上面的例子中若是把close(c)註釋掉,程序會一直阻塞在for …… range那一行。

select

  • 每一個case語句裏必須是一個IO操做
  • 若是有多個case均可以運行,Select會隨機公平地選出一個執行(其餘不會執行)。
  • 全部跟在case關鍵字右邊的發送語句或接收語句中的通道表達式和元素表達式都會先被求值。不管它們所在的case是否有可能被選擇都會這樣。
func fibonacci(c, quit chan int) {
    x, y := 0, 1
    for {
        select {
        case c <- x:
            x, y = y, x+y
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}
func main() {
    c := make(chan int)
    quit := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(<-c)
        }
        quit <- 0
    }()
    fibonacci(c, quit)
}

咱們不想等到通道被關閉後再退出循環,利用一個輔助通道模擬出操做超時。

package main

import (
    "fmt"
    "time"
)
func main(){
    //初始化通道
    ch11 := make(chan int, 1000)
    sign := make(chan int, 1)

    //給ch11通道寫入數據
    for i := 0; i < 1000; i++ {
        ch11 <- i
    }
    //關閉ch11通道
    close(ch11)

    //咱們不想等到通道被關閉以後再推出循環,咱們建立並初始化一個輔助的通道,利用它模擬出操做超時行爲
    timeout := make(chan bool,1)
    go func(){
        time.Sleep(time.Millisecond) //休息1ms
        timeout <- false
    }()

    //單獨起一個Goroutine執行select
    go func(){
        var e int
        ok := true

        for{
            select {
                case e,ok = <- ch11:
                    if !ok {
                        fmt.Println("End.")
                        break
                    }
                    fmt.Printf("ch11 -> %d\n",e)
                case ok = <- timeout:
                //向timeout通道發送元素false後,該case幾乎立刻就會被執行, ok = false
                    fmt.Println("Timeout.")
                    break
            }
            //終止for循環
            if !ok {
                sign <- 0
                break
            }
        }

    }()

    //慣用手法,讀取sign通道數據,爲了等待select的Goroutine執行。
    <- sign
}

上面實現了單個操做的超時,可是那個超時觸發器開始計時有點早。

package main

import (
    "fmt"
    "time"
)
func main(){
    //初始化通道
    ch11 := make(chan int, 1000)
    sign := make(chan int, 1)

    //給ch11通道寫入數據
    for i := 0; i < 1000; i++ {
        ch11 <- i
    }
    //關閉ch11通道
    //close(ch11),爲了看效果先註釋掉

    //單獨起一個Goroutine執行select
    go func(){
        var e int
        ok := true

        for{
            select {
                case e,ok = <- ch11:
                    if !ok {
                        fmt.Println("End.")
                        break
                    }
                    fmt.Printf("ch11 -> %d\n",e)
                case ok = <- func() chan bool {
                    //通過大約1ms後,該接收語句會從timeout通道接收到一個新元素並賦值給ok,從而恰當地執行了針對單個操做的超時子流程,恰當地結束當前for循環
                    timeout := make(chan bool,1)
                    go func(){
                        time.Sleep(time.Millisecond)//休息1ms
                        timeout <- false
                    }()
                    return timeout
                }():
                    fmt.Println("Timeout.")
                    break
            }
            //終止for循環
            if !ok {
                sign <- 0
                break
            }
        }

    }()

    //慣用手法,讀取sign通道數據,爲了等待select的Goroutine執行。
    <- sign
}

timeout

咱們可能就須要一個超時操做,用來處理超時的狀況。 下面這個例子咱們會在2秒後往channel c1中發送一個數據,可是select設置爲1秒超時,所以咱們會打印出timeout 1,而不是result 1。

import "time"
import "fmt"
func main() {
    c1 := make(chan string, 1)
    go func() {
        // time.Sleep(time.Millisecond) 1ms
        time.Sleep(time.Second * 2)
        c1 <- "result 1"
    }()
    select {
    case res := <-c1:
        fmt.Println(res)
    case <-time.After(time.Second * 1):
        fmt.Println("timeout 1")
    }
}

其實它利用的是time.After方法,它返回一個類型爲<-chan Time的單向的channel,在指定的時間發送一個當前時間給返回的channel中。

Timer和Ticker

咱們看一下關於時間的兩個Channel。 timer是一個定時器,表明將來的一個單一事件,你能夠告訴timer你要等待多長時間,它提供一個Channel,在未來的那個時間那個Channel提供了一個時間值。下面的例子中第二行會阻塞2秒鐘左右的時間,直到時間到了纔會繼續執行。

timer1 := time.NewTimer(time.Second * 2)
<-timer1.C
fmt.Println("Timer 1 expired")

固然若是你只是想單純的等待的話,可使用time.Sleep來實現。你還可使用timer.Stop來中止計時器。

timer2 := time.NewTimer(time.Second)
    go func() {
        <-timer2.C
        fmt.Println("Timer 2 expired")
    }()
    stop2 := timer2.Stop()
    if stop2 {
        fmt.Println("Timer 2 stopped")
    }

ticker是一個定時觸發的計時器,它會以一個間隔(interval)往Channel發送一個事件(當前時間),而Channel的接收者能夠以固定的時間間隔從Channel中讀取事件。下面的例子中ticker每500毫秒觸發一次,你能夠觀察輸出的時間。

ticker := time.NewTicker(time.Millisecond * 500)
go func() {
   for t := range ticker.C {
      fmt.Println("Tick at", t)
    }
}()

相似timer, ticker也能夠經過Stop方法來中止。一旦它中止,接收者再也不會從channel中接收數據了。

close

總結一下channel關閉後sender的receiver操做。 若是channel c已經被關閉,繼續往它發送數據會致使panic: send on closed channel,可是從這個關閉的channel中不但能夠讀取出已發送的數據,還能夠不斷的讀取零值。

c := make(chan int, 10)
c <- 1
c <- 2
close(c)
fmt.Println(<-c) //1
fmt.Println(<-c) //2
fmt.Println(<-c) //0
fmt.Println(<-c) //0

可是若是經過range讀取,channel關閉後for循環會跳出:

c := make(chan int, 10)
c <- 1
c <- 2
close(c)
for i := range c {
   fmt.Println(i)
}
相關文章
相關標籤/搜索