Golang —— goroutine(協程)和channel(管道)

協程(goroutine)

協程(goroutine)是Go中應用程序併發處理的部分,它能夠進行高效的併發運算。算法

  • 協程是輕量的,比線程更廉價。使用4K的棧內存就能夠在內存中建立。
  • 可以對棧進行分割,動態地增長或縮減內存的使用。棧的管理會在協程退出後自動釋放。
  • 協程的棧會根據須要進行伸縮,不出現棧溢出。

協程的使用

package main

import (
	"fmt"
	"time"
)

func main() {
	fmt.Println("In main()")
	go longWait()
	go shortWait()
	fmt.Println("About to sleep in main()")

	//time.Sleep(4 * 1e9)
	time.Sleep(10 * 1e9)
	fmt.Println("At the end of main()")
}

func longWait() {
	fmt.Println("Beginning longWait()")
	time.Sleep(5 * 1e9)
	fmt.Println("End of longWait()")
}

func shortWait() {
	fmt.Println("Beginning shortWait()")
	time.Sleep(2 * 1e9)
	fmt.Println("End of shortWait()")
}
複製代碼

Go中用go關鍵字來開啓一個協程,其中main函數也能夠看作是一個協程。緩存

不難理解,上述代碼的輸出爲:bash

In main()
About to sleep in main()
Beginning shortWait()
Beginning longWait()
End of shortWait()
End of longWait()
At the end of main()
複製代碼

可是,當咱們將main的睡眠時間設置成4s時,輸出發生了改變。併發

In main()
About to sleep in main()
Beginning shortWait()
Beginning longWait()
End of shortWait()
At the end of main()
複製代碼

程序並無輸出End of longWait(),緣由在於,longWait()main()運行在不一樣的協程中,二者是異步的。也就是說,早在longWait()結束以前,main已經退出,天然也就看不到輸出了。異步

通道(channel)

通道(channel)是Go中一種特殊的數據類型,能夠經過它們發送類型化的數據在協程之間通訊,避開內存共享致使的問題。async

通道的通訊方式保證了同步性,而且同一時間只有一個協程可以訪問數據,不會出現數據競爭函數

以工廠的傳輸帶爲例,一個機器放置物品(生產者協程),通過傳送帶,到達下一個機器打包裝箱(消費者協程)。學習

通道的使用

在學習使用管道以前,咱們先來看一個「悲劇」。ui

package main

import (
	"fmt"
	"time"
)

func main() {
	fmt.Println("Reveal romantic feelings...")
	go sendLove()
	go responseLove()
	waitFor()
	fmt.Println("Leaving ☠️....")
}

func waitFor() {
	for i := 0; i < 5; i++ {
		fmt.Println("Keep waiting...")
		time.Sleep(1 * 1e9)
	}
}

func sendLove() {
	fmt.Println("Love you, mm ❤️")
}

func responseLove() {
	time.Sleep(6 * 1e9)
	fmt.Println("Love you, too")
}
複製代碼

用上面學習的知識,不難看出。。。真的慘啊spa

Reveal romantic feelings...
Love you, mm ❤️
Keep waiting...
Keep waiting...
Keep waiting...
Keep waiting...
Keep waiting...
Leaving ☠️....
複製代碼

明明收到了暗戀女孩的迴應,然而卻覺得對方不接受本身的情感,含淚離去。【TAT】

可見,協程之間沒有互相通訊將會引發多麼大的誤解。幸虧,咱們有了channel,如今就來一塊兒改寫故事的結局吧~

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan string)
	var answer string

	fmt.Println("Reveal fomantic feelings...")
	go sendLove()
	go responseLove(ch)
	waitFor()
	answer = <-ch

	if answer != "" {
		fmt.Println(answer)
	} else {
		fmt.Println("Dead ☠️....")
	}

}

func waitFor() {
	for i := 0; i < 5; i++ {
		fmt.Println("Keep waiting...")
		time.Sleep(1 * 1e9)
	}
}

func sendLove() {
	fmt.Println("Love you, mm ❤️")
}

func responseLove(ch chan string) {
	time.Sleep(6 * 1e9)
	ch <- "Love you, too"
}
複製代碼

輸出爲:

Reveal fomantic feelings...
Love you, mm ❤️
Keep waiting...
Keep waiting...
Keep waiting...
Keep waiting...
Keep waiting...
Love you, too
複製代碼

皆大歡喜。

這裏咱們用ch := make(chan string)建立了一個string類型的管道,固然咱們還能夠構建其餘類型好比ch := make(chan int),甚至一個函數管道funcChan := chan func()

咱們還用到了一個通訊操做符<-

  • 流向通道:ch <- content,用管道ch發送變量content。

  • 從通道流出:answer := <- ch,變量answer從通道ch接收數據。

  • <- ch能夠單獨調用,以獲取通道的下一個值,當前值會被丟棄,可是能夠用來驗證,好比:

    if <- ch != 100 {
        /* do something */
    }
    複製代碼

通道阻塞

  • 對於同一通道,發送操做在接受者準備好以前是不會結束的。這就意味着,若是一個無緩衝通道在沒有空間接收數據的時候,新的輸入數據沒法輸入,即發送者處於阻塞狀態。
  • 對於同一通道,接收操做是阻塞的,直到發送者可用。若是通道中沒有數據,接收者會保持阻塞。

以上兩條性質,反映了無緩衝通道的特性:同一時間只容許至多一個數據存在於通道中

咱們經過例子來感覺一下:

package main

import "fmt"

func main() {
	ch1 := make(chan int)
	go pump(ch1)
	fmt.Println(<-ch1)
}

func pump(ch chan int) {
	for i := 0; ; i++ {
		ch <- i
	}
}
複製代碼

程序輸出:

0
複製代碼

這裏的pump()函數被稱爲生產者

解除通道阻塞

package main

import "fmt"
import "time"

func main() {
	ch1 := make(chan int)
	go pump(ch1)
	go suck(ch1)
	time.Sleep(1e9)
}

func pump(ch chan int) {
	for i := 0; ; i++ {
		ch <- i
	}
}

func suck(ch chan int) {
	for {
		fmt.Println(<-ch)
	}
}
複製代碼

這裏咱們定義了一個suck函數,做爲接收者,並給main協程一個1s的運行時間,因而,便產生了70W+的輸出【TAT】。

通道死鎖

通道兩段互相阻塞對方,會造成死鎖狀態。Go運行時會檢查並panic,中止程序。無緩衝通道會被阻塞。

package main

import "fmt"

func main() {
	out := make(chan int)
	out <- 2
	go f1(out)
}

func f1(in chan int) {
	fmt.Println(<-in)
}
複製代碼
fatal error: all goroutines are asleep - deadlock!
複製代碼

顯然在out <- 2的時候,因爲沒有接受者,主線程被阻塞。

同步通道

除了普通的無緩存通道外,還有一種特殊的帶緩存通道——同步通道

buf := 100
ch1 := make(chan string, buf)
複製代碼

buf是通道能夠同時容納的元素個數,即ch1的緩衝區大小,在buf滿以前,通道都不會阻塞。

若是容量大於0,通道就是異步的:在緩衝滿載或邊控以前通訊不會阻塞,元素會按照發送的順序被接收。

同步:ch := make(chan type, value)

  • value ==0 --> synchronous, unbuffered(阻塞)
  • value > 0 --> asynchronous, buffered(非阻塞)取決於value元素

使用通道緩衝能使程序更具備伸縮性(scalable)。

儘可能在首要位置使用無緩衝通道,只在不肯定的狀況下使用緩衝。

package main

import "fmt"
import "time"

func main() {
	c := make(chan int, 50)
	go func() {
		time.Sleep(15 * 1e9)
		x := <-c
		fmt.Println("received", x)
	}()
	fmt.Println("sending", 10)
	c <- 10
	fmt.Println("send", 10)
}

複製代碼

信號量模式

func compute(ch chan int) {
    ch <- someComputation()
}

func main() {
    ch := make(chan int)
    go compute(ch)
    doSomethingElaseForAWhile()
    result := <-ch
}
複製代碼

協程經過在通道ch中放置一個值來處理結束信號。main線程等待<-ch直到從中獲取到值。

咱們能夠用它來處理切片排序:

done := make(chan bool)

doSort := func(s []int) {
    sort(s)
    done <- true
}
i := pivot(s)
go doSort(s[:i])
go doSort(s[i:])
<-done
<-done
複製代碼

帶緩衝通道實現信號量

信號量時實現互斥鎖的經常使用同步機制,限制對資源的訪問,解決讀寫問題。

  • 帶緩衝通道的容量要和同步的資源容量相同
  • 通道的長度(當前存放的元素個數)與當前資源被使用的數量相同
  • 容量減去通道的長度等於未處理的資源個數
//建立一個長度可變但容量爲0的通道
type Empty interface {}
type semaphore chan Empty
複製代碼

初始化信號量

sem = make(semaphore, N)
複製代碼

對信號量進行操做,創建互斥鎖

func (s semaphore) P (n int) {
    e := new(Empty)
    for i := 0; i < n; i++ {
        s <- e
    }
}

func (a semaphore) V (n int) {
    for i := 0; i < n; i++ {
        <- s
    }
}

/* mutexes */
func (s semaphore) Lock() {
	s.P(1)
}

func (s semaphore) Unlock(){
	s.V(1)
}

/* signal-wait */
func (s semaphore) Wait(n int) {
	s.P(n)
}

func (s semaphore) Signal() {
	s.V(1)
}
複製代碼

通道工廠模式

不將通道做爲參數傳遞,而是在函數內生成一個通道,並返回。

package main

import (
	"fmt"
	"time"
)

func main() {
	stream := pump()
	go suck(stream)
	time.Sleep(1e9)
}

func pump() chan int {
	ch := make(chan int)
	go func() {
		for i := 0; ; i++ {
			ch <- i
		}
	}()
	return ch
}

func suck(ch chan int) {
	for {
		fmt.Println(<-ch)
	}
}
複製代碼

通道使用for循環

for循環能夠從ch中持續獲取值,直到通道關閉。(這意味着必須有另外一個協程寫入ch,而且在寫入完成後關閉)

for v := range ch {
    fmt.Println("The value is", v)
}
複製代碼
package main

import (
	"fmt"
	"time"
)

func main() {
	suck(pump())
	time.Sleep(1e9)
}

func pump() chan int {
	ch := make(chan int)
	go func() {
		for i := 0; ; i++ {
			ch <- i
		}
	}()
	return ch
}

func suck(ch chan int) {
	go func() {
		for v := range ch {
			fmt.Println(v)
		}
	}()
}
複製代碼

通道的方向

通道能夠表示它只發送或者只接受:

var send_only chan<- int    // channel can only send data
var recv_only <-chan int    // channel can only receive data
複製代碼

只接收的通道(<-chan T)沒法關閉,由於關閉通道是發送者用來表示再也不給通道發送值,因此對只接收通道是沒有意義的。

管道和選擇器模式

借鑑一個經典的例子篩法求素數來學習這一內容。

這個算法的主要思想是,引入篩法(一種時間複雜度爲O(x * ln(lnx))的算法),對一個給定返回的正整數從大到小排序,而後從中篩選掉全部的非素數,那麼剩下的數中最小的就是素數,再去掉該數的倍數,以此類推。

假設一個範圍爲1~30的正整數集,已經從大到小排序。

第一遍篩掉非素數1,而後剩餘數中最小的是2。

因爲2是一個素數,將其取出,而後去掉全部2的倍數,那麼剩下的數爲:

3 5 7 9 11 13 15 17 19 21 23 25 27 29

剩下的數中3最小,且爲素數,取出並去除全部3的倍數,循環直至全部數都篩完。

代碼以下:

// 通常寫法
package main

import (
	"fmt"
)

func generate(ch chan int) {
	for i := 2; i < 100; i++ {
		ch <- i
	}
}

func filter(in, out chan int, prime int) {
	for {
		i := <-in
		if i%prime != 0 {
			out <- i
		}
	}
}

func main() {
	ch := make(chan int)
	go generate(ch)
	for {
		prime := <-ch
		fmt.Print(prime, " ")
		ch1 := make(chan int)
		go filter(ch, ch1, prime)
		ch = ch1
	}
}
複製代碼
// 習慣寫法
package main

import (
	"fmt"
)

func generate() chan int {
	ch := make(chan int)
	go func() {
		for i := 2; ; i++ {
			ch <- i
		}
	}()
	return ch
}

func filter(in chan int, prime int) chan int {
	out := make(chan int)
	go func() {
		for {
			if i := <-in; i%prime != 0 {
				out <- i
			}
		}
	}()
	return out
}

func sieve() chan int {
	out := make(chan int)
	go func() {
		ch := generate()
		for {
			prime := <-ch
			ch = filter(ch, prime)
			out <- prime
		}
	}()
	return out
}

func main() {
	primes := sieve()
	for {
		fmt.Println(<-primes)
	}
}
複製代碼
相關文章
相關標籤/搜索