併發:強調一段時間作多件事編程
並行:強調同一時間作多件事segmentfault
Actor 模型是一個通用的併發編程模型,能夠應用在幾乎任何一種編程語言中,典型的是 Erlang。多個 actor(進程) 能夠同時運行、不共享狀態、經過向與進程綁定的消息隊列(也稱爲信箱)異步發送消息來進行通訊。設計模式
actor-1 與 actor-2 進程通訊依賴一個消息隊列,並且消息隊列與進程互相耦合綁定。actor-1 在發送完消息以後,在 actor-2 沒有處理該消息的狀況下,能夠繼續執行其餘任務,這說明 actor 進程之間的通訊是異步的。網絡
CSP即通訊順序進程(communicating sequential processes),與 Actor 模型相似,該模型也是由獨立的、併發執行的實體所組成,實體之間經過發送消息進行通訊。go 中的 csp 模型 channel
對於goroutine
來講是匿名的,不須要和 gid
綁定,經過 channel
完成 goroutine
之間的通訊。(channel 在 CSP 表明通道的概念,這裏只討論 Go 相關,channel 等價於 Go 中的 channel)併發
channel
是第一類對象,能夠被獨立創造、寫入、獨處數據,也能夠在不一樣執行單元中傳遞。channel
channel
,而 channel
不像 Actor 模型那樣進程與隊列緊耦合。而是能夠單首創建和讀寫,並在進程 (goroutine) 之間傳遞。Go 是採用 SCP 的思想的,channel 是 go 在併發編程通訊的推薦手段,Go 的設計者 Rob Pike有一句經典的名言,app
Do not communicate by sharing memory; instead, share memory by communicating.異步
這句話是說「不要使用共享內存通訊,而是應該使用通訊取共享內存」,Go 語言推薦咱們使用通訊來進行進程間同步消息。這樣作有三點好處,來源於 draveness 的博客文章。編程語言
上文介紹了 Go 中使用的併發模型,而在這種併發模型下面 channel
是一個重要的概念,而下面每一種模式的設計都依賴於 channel
,因此有必要了解一下。分佈式
barrier 屏障模式故名思義就是一種屏障,用來阻塞直到聚合全部 goroutine 返回結果。 可使用 channel
來實現。函數
/* * Barrier */
type barrierResp struct {
Err error
Resp string
Status int
}
// 構造請求
func makeRequest(out chan<- barrierResp, url string) {
res := barrierResp{}
client := http.Client{
Timeout: time.Duration(2*time.Microsecond),
}
resp, err := client.Get(url)
if resp != nil {
res.Status = resp.StatusCode
}
if err != nil {
res.Err = err
out <- res
return
}
byt, err := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
res.Err = err
out <- res
return
}
res.Resp = string(byt)
out <- res
}
// 合併結果
func barrier(endpoints ...string) {
requestNumber := len(endpoints)
in := make(chan barrierResp, requestNumber)
response := make([]barrierResp, requestNumber)
defer close(in)
for _, endpoints := range endpoints {
go makeRequest(in, endpoints)
}
var hasError bool
for i := 0; i < requestNumber; i++ {
resp := <-in
if resp.Err != nil {
fmt.Println("ERROR: ", resp.Err, resp.Status)
hasError = true
}
response[i] = resp
}
if !hasError {
for _, resp := range response {
fmt.Println(resp.Status)
}
}
}
func main() {
barrier([]string{"https://www.baidu.com", "http://www.sina.com", "https://segmentfault.com/"}...)
}
複製代碼
Barrier 模式也可使用 errgroup
擴展庫來實現,這樣更加簡單明瞭。這個包有點相似於 sync.WaitGroup
,可是區別是當其中一個任務發生錯誤時,能夠返回該錯誤。而這也知足咱們 Barrier 模式的需求。
func barrier(endpoints ...string) {
var g errgroup.Group
var mu sync.Mutex
response := make([]barrierResp, len(endpoints))
for i, endpoint := range endpoints {
i, endpoint := i, endpoint // create locals for closure below
g.Go(func() error {
res := barrierResp{}
resp, err := http.Get(endpoint)
if err != nil {
return err
}
byt, err := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
return err
}
res.Resp = string(byt)
mu.Lock()
response[i] = res
mu.Unlock()
return err
})
}
if err := g.Wait(); err != nil {
fmt.Println(err)
}
for _, resp := range response {
fmt.Println(resp.Status)
}
}
複製代碼
future 即將來,來自將來的模式(手動狗頭)。這個模式經常使用在異步處理也稱爲 Promise 模式,採用一種 fire-and-forget
的方式,是指主 goroutine 不等子 goroutine 執行完就直接返回了,而後等到將來執行完的時候再去取結果。在 Go 中因爲 goroutine 的存在,實現這種模式是挺簡單的。
/* * Future */
type Function func(string) (string, error) type Future interface {
SuccessCallback() error
FailCallback() error
Execute(Function) (bool, chan struct{})
}
type AccountCache struct {
Name string
}
func (a *AccountCache) SuccessCallback() error {
fmt.Println("It's success~")
return nil
}
func (a *AccountCache) FailCallback() error {
fmt.Println("It's fail~")
return nil
}
func (a *AccountCache) Execute(f Function) (bool, chan struct{}){
done := make(chan struct{})
go func(a *AccountCache) {
_, err := f(a.Name)
if err != nil {
_ = a.FailCallback()
} else {
_ = a.SuccessCallback()
}
done <- struct{}{}
}(a)
return true, done
}
func NewAccountCache(name string) *AccountCache {
return &AccountCache{
name,
}
}
func testFuture() {
var future Future
future = NewAccountCache("Tom")
updateFunc := func(name string) (string, error){
fmt.Println("cache update:", name)
return name, nil
}
_, done := future.Execute(updateFunc)
defer func() {
<-done
}()
}
func main() {
var future Future
future = NewAccountCache("Tom")
updateFunc := func(name string) (string, error){
fmt.Println("cache update:", name)
return name, nil
}
_, done := future.Execute(updateFunc)
defer func() {
<-done
}()
// do something
}
複製代碼
這裏有一個技巧:爲何使用
struct
類型做爲channel
的通知?不少開源代碼都是使用這種方式來做爲信號通知機制,主要是由於空
struct
在 Go 中佔的內存是最少的。
Pipeline 自己翻譯過來就是管道的意思,注意和 Barrire 模式不一樣的是,它是按順序的,相似於流水線。
這個圖不是很能表達並行的概念,其實三個 goroutine 是同時執行的,經過 buffer channel 將三者串起來,只要前序 goroutine 處理完一部分數據,就往下傳遞,達到並行的目的。
實現一個功能,給定一個切片,而後求它的子項的平方和。
例如,[1, 2, 3] -> 1^2 + 2^2 + 3^2 = 14。
正常的邏輯,遍歷切片,而後求平方累加。使用 pipeline 模式,能夠把求和和求平方拆分出來並行計算。
/* * Pipeline 模式 */
func generator(max int) <-chan int{
out := make(chan int, 100)
go func() {
for i := 1; i <= max; i++ {
out <- i
}
close(out)
}()
return out
}
func power(in <-chan int) <-chan int{
out := make(chan int, 100)
go func() {
for v := range in {
out <- v * v
}
close(out)
}()
return out
}
func sum(in <-chan int) <-chan int{
out := make(chan int, 100)
go func() {
var sum int
for v := range in {
sum += v
}
out <- sum
close(out)
}()
return out
}
func main() {
// [1, 2, 3]
fmt.Println(<-sum(power(generator(3))))
}
複製代碼
在 Go 中 goroutine 已經足夠輕量,甚至 net/http
server 的處理方式也是 goroutine-per-connection
的,因此比起其餘語言來講可能場景稍微少一些。每一個 goroutine 的初始內存消耗在 2~8kb,當咱們有大批量任務的時候,須要起不少 goroutine 來處理,這會給系統代理很大的內存開銷和 GC 壓力,這個時候就能夠考慮一下協程池。
/* * Worker pool */
type TaskHandler func(interface{}) type Task struct {
Param interface{}
Handler TaskHandler
}
type WorkerPoolImpl interface {
AddWorker() // 增長 worker
SendTask(Task) // 發送任務
Release() // 釋放
}
type WorkerPool struct {
wg sync.WaitGroup
inCh chan Task
}
func (d *WorkerPool) AddWorker() {
d.wg.Add(1)
go func(){
for task := range d.inCh {
task.Handler(task.Param)
}
d.wg.Done()
}()
}
func (d *WorkerPool) Release() {
close(d.inCh)
d.wg.Wait()
}
func (d *WorkerPool) SendTask(t Task) {
d.inCh <- t
}
func NewWorkerPool(buffer int) WorkerPoolImpl {
return &WorkerPool{
inCh: make(chan Task, buffer),
}
}
func main() {
bufferSize := 100
var workerPool = NewWorkerPool(bufferSize)
workers := 4
for i := 0; i < workers; i++ {
workerPool.AddWorker()
}
var sum int32
testFunc := func (i interface{}) {
n := i.(int32)
atomic.AddInt32(&sum, n)
}
var i, n int32
n = 1000
for ; i < n; i++ {
task := Task{
i,
testFunc,
}
workerPool.SendTask(task)
}
workerPool.Release()
fmt.Println(sum)
}
複製代碼
協程池使用了反射來獲取執行的函數及參數,在 Go 中可能有點讓人有點膈應。可是若是批量執行的函數是已知的,能夠優化成一種只執行指定函數的協程池,可以提高性能。
發佈訂閱模式是一種消息通知模式,發佈者發送消息,訂閱者接收消息。
/* * Pub/Sub */
type Subscriber struct {
in chan interface{}
id int
topic string
stop chan struct{}
}
func (s *Subscriber) Close() {
s.stop <- struct{}{}
close(s.in)
}
func (s *Subscriber) Notify(msg interface{}) (err error) {
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("%#v", rec)
}
}()
select {
case s.in <-msg:
case <-time.After(time.Second):
err = fmt.Errorf("Timeout\n")
}
return
}
func NewSubscriber(id int) SubscriberImpl {
s := &Subscriber{
id: id,
in: make(chan interface{}),
stop: make(chan struct{}),
}
go func() {
for{
select {
case <-s.stop:
close(s.stop)
return
default:
for msg := range s.in {
fmt.Printf("(W%d): %v\n", s.id, msg)
}
}
}}()
return s
}
// 訂閱者須要實現的方法
type SubscriberImpl interface {
Notify(interface{}) error
Close()
}
// sub 訂閱 pub
func Register(sub Subscriber, pub *publisher){
pub.addSubCh <- sub
return
}
// pub 結果定義
type publisher struct {
subscribers []SubscriberImpl
addSubCh chan SubscriberImpl
removeSubCh chan SubscriberImpl
in chan interface{}
stop chan struct{}
}
// 實例化
func NewPublisher () *publisher{
return &publisher{
addSubCh: make(chan SubscriberImpl),
removeSubCh: make(chan SubscriberImpl),
in: make(chan interface{}),
stop: make(chan struct{}),
}
}
// 監聽
func (p *publisher) start() {
for {
select {
// pub 發送消息
case msg := <-p.in:
for _, sub := range p.subscribers{
_ = sub.Notify(msg)
}
// 移除指定 sub
case sub := <-p.removeSubCh:
for i, candidate := range p.subscribers {
if candidate == sub {
p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...)
candidate.Close()
break
}
}
// 增長一個 sub
case sub := <-p.addSubCh:
p.subscribers = append(p.subscribers, sub)
// 關閉 pub
case <-p.stop:
for _, sub := range p.subscribers {
sub.Close()
}
close(p.addSubCh)
close(p.in)
close(p.removeSubCh)
return
}
}
}
func main() {
// 測試代碼
pub := NewPublisher()
go pub.start()
sub1 := NewSubscriber(1)
Register(sub1, pub)
sub2 := NewSubscriber(2)
Register(sub2, pub)
commands:= []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
for _, c := range commands {
pub.in <- c
}
pub.stop <- struct{}{}
time.Sleep(time.Second*1)
}
複製代碼
channel
一塊兒用時,容易出現死鎖