數據結構和算法(Golang實現)(6)簡單入門Golang-併發、協程和信道

併發、協程和信道

Golang語言提供了go關鍵字,以及名爲chan的數據類型,以及一些標準庫的併發鎖等,咱們將會簡單介紹一下併發的一些概念,而後學習這些Golang特徵知識。算法

1、併發介紹

咱們寫程序時,可能會讀取一個幾千兆的日誌,讀磁盤可能須要讀幾十秒鐘,咱們不可能一直等他,由於雖然磁盤IO繁忙,可是處理器CPU很空閒,咱們能夠併發地開另外一條路去處理其餘邏輯。編程

在操做系統層面,出現了多進程和多線程的概念。一個處理器會在一個時間片裏好比20納秒執行一個進程,當時間片用完了或者發生了中斷好比進程搶佔事件,當前進程上下文會被保存,而後處理器開始處理另一個進程,這樣頻繁地切換執行,切換和執行的速度特別快,就產生了貌似程序們都在同時執行,其實仍是串行執行,這種叫併發。在多核處理器上,進程能夠調度到不一樣的處理器,時間片輪訓也只是針對每個處理器,同一時間在兩個處理器上執行的兩個進程,它們是實在的同時,這種叫並行。通常狀況下,咱們統稱併發。segmentfault

進程是計算機資源分配的最小單位,進程是對處理器資源(CPU),虛擬內存(1)的抽象,數組

虛擬內存是對主存資源(Memory)和文件(2)的抽象,文件是對I/O設備的抽象。緩存

虛擬內存是操做系統初始化後內部維護的一個程序加載空間,對於32位操做系統來講,也就是寄存器有32位的比特長度,虛擬內存中每一個字節都有一個內存地址,內存地址的指針長度爲32位(恰好是寄存器能夠存放的位數),算下來2的32次,恰好能夠存放4G左右的字節,因此在32位的操做系統上,你的8G內存條只有50%的利用率,因此如今都是64位的操做系統。

其中,CPUMemoryI/O設備就是咱們所說的處理器,內存,硬盤。安全

線程是計算機調度的最小單位,也就是CPU大腦調度的最小單位,同個進程下的線程能夠共享同個進程分配的計算機資源。數據結構

同個進程下的線程間切換須要CPU切換上下文,但不須要建立新的虛擬內存空間,不須要內存管理單元切換上下文,比不一樣進程切換會顯得更輕量。多線程

總上所述,實際併發的是線程。首先,每一個進程都有一個主線程,由於線程是調度的最小單位,你能夠只有一個線程,可是你也能夠建立多幾個線程,線程調度須要CPU來切換,須要內核層的上下文切換,若是你跑了A線程,而後切到B線程,內核調用開始,CPU須要對A線程的上下文保留,而後切到B線程,而後把控制權交給你的應用層調度。進程切換也須要內核來切換,由於從C進程的主線程切換到D進程的主線程。併發

事實上,進程和線程只是概念上的劃分,在操做系統內部,只用了一個數據結構來表示,裏面有pid:進程ID,tgid:線程屬於的線程組ID(也就是進程ID,主線程ID),以下圖(其中fork表示建立進程):數據結構和算法

每個進程/線程都有一個pid,若是它是主線程,那麼tgid=pid,從一個主線程fork出來的是另外一個進程的主線程,pidtgid都變了,而new thread出來的線程,除了pid變了,tgid不變。

進程間還要通信,由於它們資源不共享,這個時候須要用IPCInter-Process Communication,進程間通訊),經常使用的有信號量,共享內存,套接字等。

而同個進程的多個線程共享資源,通信起來比進程容易多了,由於它們共享了虛擬內存的空間,直接就能夠讀取內存,如今不少PythonJava等編程語言都有這種線程庫實現。

至於IO多路複用,其實就是維持一個線程隊列,而後讓一個線程或多個線程,去隊列裏面拿任務去完成。爲何呢?由於線程的數量是有限的,並且線程間通信須要點資源,內核也要頻繁切換上下文,乾脆就弄一個池,有任務就派個小弟出去。

只有一個線程的IO多路複用,典型的就是RedisNodejs了,根本不須要切換上下文,一個線程走天下。而多個線程的IO多路複用,就是Golang協程的實現方式了,協程,本身管理線程,把線程控制到必定的數量,而後構造一個規則狀態機來調度任務。

二. 協程和 go 關鍵字

在操做系統更高層次的應用層,高級編程語言也有開發併發程序的需求。不管是一個進程下的多個線程,仍是不一樣進程,仍是不一樣進程下的線程,切換時都須要損耗資源,浪費一些資源,因此Golanggoruntime(協程)這種東西,它會在內部維持一個固定線程數的線程池,進行合理的調度,使得線程不那麼頻繁的切換。

Golang語言實現的調度器,其實就是經過使用數量合適的線程並在每個線程上執行更多的工做來下降操做系統和硬件的負載。

主要用法以下:

package main

import (
    "fmt"
    "time"
)

func Hu() {
    // 使用睡眠模仿一些耗時
    time.Sleep(2 * time.Second)
    fmt.Println("after 2 second hu!!!")
}

func main() {

    // 將會堵塞
    //Hu()

    // 開啓新的協程,不會堵塞
    go Hu()

    fmt.Println("start hu, wait...")

    // 必須死循環,否則主協程退出了,程序就結束了
    for {
        time.Sleep(1 * time.Second)
    }

}

若是直接使用Hu()函數,由於函數內部使用time.Sleep進行睡眠,需等待兩秒,因此程序會堵塞。

這個時候可使用關鍵字go開啓一個新的協程,再也不堵塞,即go Hu()執行完畢後,立刻會接着執行後續的語句。

輸出:

start hu, wait...
after 2 second hu!!!

由於main函數自己做爲程序的主協程,若是main函數結束的話,其餘協程也會死掉,必須使用死循環來避免主協程終止。

3、信道 chan

如何在兩個協程間通信呢?Golang提供了一種稱爲chan的數據類型,咱們能夠把它叫作信道。

package main

import (
    "fmt"
    "time"
)

func Hu(ch chan int) {
    // 使用睡眠模仿一些耗時
    time.Sleep(2 * time.Second)
    fmt.Println("after 2 second hu!!!")

    // 執行語句後,通知主協程已經完成操做
    ch <- 1000
}

func main() {
    // 新建一個沒有緩衝的信道
    ch := make(chan int)

    // 將信道傳入函數,開啓協程
    go Hu(ch)
    fmt.Println("start hu, wait...")

    // 從空緩衝的信道讀取 int,將會堵塞,直到有消息到來
    v := <-ch
    fmt.Println("receive:", v)
}

輸出:

start hu, wait...
after 2 second hu!!!
receive: 1000

咱們可使用make(chan int)建立一個能存取int類型的沒有緩衝的信道,沒有緩衝,意味着往裏面發送消息,或者接收消息都會堵塞。

咱們將ch傳入函數func Hu(ch chan int),由於信道和字典,切片同樣都是引用類型,因此在函數內能夠往信道里面發送消息,外面的信道能夠收到。

發送一個整數到信道可使用ch <- 1000,接收整數可使用:v := <-ch

咱們執行協程後,由於函數裏面會睡眠兩分鐘,因此兩分鐘以後信道纔會收到消息,在沒有收到消息以前v := <-ch會堵塞,直到協程go Hu(ch)完成,那麼消息收到,程序結束。

使用信道chan除了能夠用來協程間通信,也能夠用來緩存數據,好比建一個帶有緩衝的信道:

package main

import (
    "fmt"
    "time"
)

func Receive(ch chan int) {
    // 先等幾秒後再接收消息
    time.Sleep(2 * time.Second)
    for {
        select {
        case v, ok := <-ch:
            // 接收信道里面的消息,接收後緩衝就充足了

            // 信道被關閉了,退出
            if !ok {
                fmt.Println("chan close,receive:", v)
                return
            }
            // 打印
            fmt.Println("receive:", v)
        }
    }
}

func Send(ch chan int) {
    // 發到第11個時,會卡住,由於信道滿了
    for i := 0; i < 13; i++ {
        ch <- i
        fmt.Println("send:", i)
    }
    // 打印完畢,關閉信道
    close(ch)
}

func main() {
    // 新建一個5個緩衝的信道
    ch := make(chan int, 10)

    // 將信道傳入函數,開啓協程
    go Receive(ch)
    go Send(ch)

    // 必須死循環,否則主協程退出了,程序就結束了
    for {
        time.Sleep(1 * time.Second)
    }

}

咱們建了一個有10個緩衝的信道:make(chan int, 10),而後開了兩個協程:go Receive(ch)go Send(ch),一個用來收消息,一個用來發送消息。

func Receive(ch chan int)中咱們先睡眠幾秒後再接收消息:time.Sleep(2 * time.Second)

func Send(ch chan int)中使用循環,往信道打消息,打到第十個,由於信道緩衝滿了,因此會堵塞,直到Receive開始接收消息再繼續打,而後關閉信道:close(ch)

輸出結果:

send: 0
send: 1
send: 2
send: 3
send: 4
send: 5
send: 6
send: 7
send: 8
send: 9
receive: 0
receive: 1
receive: 2
receive: 3
receive: 4
receive: 5
receive: 6
receive: 7
receive: 8
receive: 9
receive: 10
send: 10
send: 11
receive: 11
send: 12
receive: 12
chan close,receive: 0

在這裏有一種select語句專門用來和信道打交道:

select {
        case v, ok := <-ch:
            // 接收信道里面的消息,接收後緩衝就充足了

            // 信道被關閉了,退出
            if !ok {
                fmt.Println("chan close,receive:", v)
                return
            }
            // 打印
            fmt.Println("receive:", v)
        }

<-ch接收消息,若是信道ch沒被關閉,且信道沒有消息了,那麼會堵塞。若是信道有消息,那麼oktrue,而且消息賦值給v。當信道被關閉:close(ch),那麼ok將會爲false,表示信道關閉了。

使用range也能夠遍歷信道里的消息,如:

package main

import "fmt"

func main() {
    buffedChan := make(chan int, 2)
    buffedChan <- 2
    buffedChan <- 3
    for i := range buffedChan { // 必須關閉,不然死鎖
        fmt.Println(i)
    }
}

上面運行後會輸出:

2
3
fatal error: all goroutines are asleep - deadlock!

由於range會一直讀取消息,若是沒有消息將會堵塞,主協程堵塞了,Golang會認爲死鎖了,這時候咱們能夠關閉信道後再打印,如:

package main

import "fmt"

func main() {
    buffedChan := make(chan int, 2)
    buffedChan <- 2
    buffedChan <- 3
    close(buffedChan) // 關閉後才能for打印出,不然死鎖

    //close(buffedChan) // 不能重複關閉
    //buffedChan <- 4  // 關閉後就不能再送數據了,可是以前的數據還在
    for i := range buffedChan { // 必須關閉,不然死鎖
        fmt.Println(i)
    }
}

輸出:

2
3

信道關閉後,range操做讀完消息後,將會結束。

在這裏要注意,不能屢次關閉一個信道,不能往關閉了的信道打消息,不然會報錯:

panic: send on closed channel

4、鎖實現併發安全

多個協程可能對同一個變量作修改操做,可能不符合預期,好比轉帳:

package main

import (
    "fmt"
    "time"
)

type Money struct {
    amount int64
}

// 加錢
func (m *Money) Add(i int64) {
    m.amount = m.amount + i
}

// 減錢
func (m *Money) Minute(i int64) {
    // 錢足才能減
    if m.amount >= i {
        m.amount = m.amount - i
    }
}

// 查看還有多少錢
func (m *Money) Get() int64 {
    return m.amount
}

func main() {
    m := new(Money)
    m.Add(10000)

    for i := 0; i < 1000; i++ {
        go func() {
            time.Sleep(500 * time.Millisecond)
            m.Minute(5)
        }()
    }

    time.Sleep(20 * time.Second)
    fmt.Println(m.Get())

}

咱們先m.Add(10000),這樣就有一萬塊錢了,而後轉帳1000次,每次轉5元,因此結果應該是5000,但事與願違,結果一直在變化,多是5725或者5720

由於轉帳是併發的,減錢操做會讀取結構體Money裏面的amount,同時操做時可能讀到同一個值,好比兩個協程都讀到9995,那麼作減法時,就都變成9990,有一次轉帳就失敗了。

咱們須要實現併發安全,同一時間只能容許一個協程修改金額,咱們須要加鎖,以下:

type Money struct {
    lock   sync.Mutex // 鎖
    amount int64
}

// 加錢
func (m *Money) Add(i int64) {
    // 加鎖
    m.lock.Lock()

    // 在該函數結束後執行
    defer m.lock.Unlock()
    m.amount = m.amount + i
}

// 減錢
func (m *Money) Minute(i int64) {
    // 加鎖
    m.lock.Lock()

    // 在該函數結束後執行
    defer m.lock.Unlock()

    // 錢足才能減
    if m.amount >= i {
        m.amount = m.amount - i
    }
}

咱們爲結構體Money多加了一個字段:lock sync.Mutex,每次修改amount時都會先加鎖,函數執行完後再把鎖去掉。如:

// 加鎖
    m.lock.Lock()

    // 在該函數結束後執行
    defer m.lock.Unlock()

    // 開始進行一些操做

協程若是想修改金額,進入函數後,須要先經過m.lock.Lock()獲取到鎖,若是獲取不到鎖的話,會堵塞,直到拿到鎖,修改完金額後函數結束時會調用m.lock.Unlock(),這樣就實現了併發安全。

咱們看到有一個defer的關鍵字,這是Golang提供的延遲執行的關鍵字,會延遲到函數結束後,該關鍵字後面的指令纔會執行。

不少時候咱們會忘了釋放鎖,這樣有些協程會一直堵塞,致使死鎖的狀況發生,因此在得到鎖後,可使用defer來確保在函數執行後,鎖必定會被釋放。

系列文章入口

我是陳星星,歡迎閱讀我親自寫的 數據結構和算法(Golang實現),文章首發於 閱讀更友好的GitBook

相關文章
相關標籤/搜索