---恢復內容開始---程序員
一 :併發基礎sql
1 併發和並行數據庫
併發和並行是兩個不一樣的概念: 1 並行意味着程序在任意時刻都是同時運行的: 2 併發意味着程序在單位時間內是同時運行的
詳解:
並行就是在任一粒度的時間內都具有同時執行的能力:最簡單的並行就是多機,多臺機器
並行處理;SMP 表面上看是並行的,但因爲是共享內存,以及線程間的同步等,不可能徹底作
到並行
併發是在規定的時間內多個請求都獲得執行和處理,強調的是給外界的感受,實際上內部
多是分時操做的 。併發重在避免阻塞,使程序不會由於 個阻 而中止處理。併發典型的應
用場景:分時操做系統就是一種併發設計(忽略多核 CPU )。
並行是硬件和操做系統開發者重點考慮的問題,做爲應用層的程序員,惟一能夠選擇的就
是充分藉助操做系統提供的 API 和程序語言特性,結合實際需求設計出具備良好併發結構的程
序,提高程序的併發處理能力。現代操做系統可以提供的最基礎的併發模型就是多線程和多進
程;編程語言這一層級能夠進一步封裝來提高程序的井發處理能力。
在當前的計算機體系下:並行具備瞬時性,併發具備過程性;併發在於結構,井行在於執
行。應用程序具有好的併發結構,操做系統才能更好地利用硬件並行執行,同時避免阻塞等待,
合理地進行調度,提高 CPU 利用率。應用層程序員提高程序併發處理能力的一個重要手段就是
爲程序設計良好的併發結構。
2 go語言之併發編程
go從語言層面就支持了併發,簡化了併發程序的編寫
3 goroutine 是什麼安全
1 它是go併發設計的核心 2 goroutine就是協程,它比線程更小,十幾個goroutine在底層可能就是五六個線程 3 go語言內部實現了goroutine的內存共享,執行goroutine只需極少的棧內存(大概是4~5KB)
4 建立goroutine服務器
1 只須要在語句前添加go關鍵字,就能夠建立併發執行單元 2 開發⼈員無需瞭解任何執⾏細節,調度器會自動將其安排到合適的系統線程上執行
package main import ( "fmt" "time" ) //子協程 func newTask() { i := 0 for { i++ fmt.Printf("new goroutin:i=%d\n", i) time.Sleep(1 * time.Second) } } func main() { //啓動子協程 go newTask() i := 0 for { i++ fmt.Printf("main goroutin:i=%d\n", i) time.Sleep(1 * time.Second) } }
主線程退出 其餘任務就不會執行了多線程
package main import ( "fmt" "time" ) func main() { go func() { i := 0 for { i++ fmt.Printf("new goroutin:i=%d\n", i) time.Sleep(1 * time.Second) } }() i := 0 for { i++ fmt.Printf("main goroutin:i=%d\n", i) time.Sleep(1 * time.Second) if i == 2 { break } } }
main裏面沒有東西,其餘任務也不會執行併發
package main import ( "fmt" "time" ) func main() { go func() { i:=0 for { i++ fmt.Printf("new goroutine:i=%d\n",i) time.Sleep(1* time.Second) } }() }
5 runtime包異步
runtime.Gosched():出讓CPU時間片,等待從新安排任務編程語言
package main import ( "fmt" "runtime" ) func main() { go func(s string) { for i:=0;i<2;i++{ fmt.Println(s) } }("world") for i :=0;i<2;i++{ //出讓時間片 runtime.Gosched() fmt.Println("hello") } } /* 打印結果 world world hello hello */
runtime.Goexit():馬上結束當前攜程的執行
package main import ( "fmt" "runtime" "time" ) func main() { go func() { defer fmt.Println("A.defer") //匿名函數 func () { defer fmt.Println("A.defer") runtime.Goexit() fmt.Println("B") }() fmt.Println("A") }() for { time.Sleep(time.Second) } }
runtime.GOMAXPROCS():設置並行計算的CPU最大核數
package main import ( "fmt" "runtime" ) func main() { n :=runtime.GOMAXPROCS(3) fmt.Printf("n=%d\n",n) for { go fmt.Println(0) fmt.Println(1) } }
6 通道
概述: 1 goroutine 運行在相同的地址空間,所以訪問共享內存必須作好同步,處理好線程安全問題 2 goroutine奉行經過通訊來共享內存,而不是共享內存來通訊 3 channel是一個引用類型,共用多個goroutine通信,其內部實現了同步,確保併發安全
channel的基本使用
1 channel能夠用內置make()函數建立 2 定義一個channel時,也須要定義發送到channel的值的類型
make(chan 類型)
make(chan 類型, 容量)
1 當 capacity= 0 時,channel 是無緩衝阻塞讀寫的,當capacity> 0 時,channel 有緩衝、是非阻塞的,直到寫滿 capacity個元素才阻塞寫入 2 channel經過操做符<-來接收和發送數據,發送和接收數據語法:
channel <- value //發送value到channel <-channel //取出數據扔掉 x := <-channel //取出數據,給x x, ok := <-channel //功能同上,順便檢查一下通道
package main import "fmt" func main() { c := make(chan int) //建立一個通道 go func() { defer fmt.Println("子攜程結束") fmt.Println("子攜程正在運行。。。") c <- 666 }() <-c num := <-c fmt.Println("num=",num) fmt.Println("main結束") }
無反衝的channel
無緩衝的通道是指在接收前沒有能力保存任何值的通道
package main import ( "fmt" "time" ) func main() { //無反衝通道 c := make(chan int,0) fmt.Printf("len(c)=%d,cap(c)=%d\n",len(c),cap(c)) //子攜程存數據 go func() { defer fmt.Println("子攜程結束") //向通道添加數據 for i := 0; i<3;i++ { c <- i fmt.Printf("子攜程正在運行[%d]:"+"len(c)=%d,cap(c)=%d\n",i,len(c),cap(c)) } }() time.Sleep(2*time.Second) //主攜程取數據 for i :=0;i<3;i++{ num :=<- c fmt.Println("num=",num) } fmt.Println("主攜程結束") }
運行結果
有換成的channel
有緩衝的通道是一種在被接收前能存儲一個或者多個值的通道
package main import ( "fmt" "time" ) func main() { //無緩衝通道 c := make(chan int,3) fmt.Printf("len(c)=%d,cap(c)=%d\n",len(c),cap(c)) //字攜程存數據 go func() { defer fmt.Println("子攜程結束") //向通道添加數據 for i :=0;i <3;i++{ c<-i fmt.Println("子攜程正在運行[%d]:"+"len(c)=%d,cap=%d\n",i,len(c),cap(c)) } }() time.Sleep(2*time.Second) //主攜程取數據 for i :=0;i<3;i++{ num :=<- c fmt.Println("num=",num) } fmt.Println("主攜程結束") }
7 關閉通道
能夠經過內置的close()函數關閉channel
package main import ( "fmt" ) func main() { c := make(chan int) //子協程存數據 go func() { for i := 0; i < 5; i++ { c <- i } close(c) }() //主協程取數據 for { if data, ok := <-c; ok { fmt.Println(data) } else { break } } fmt.Println("主協程結束") }
8,單方向的channel
1 默認狀況下,通道是雙向的,也就是,既能夠往裏面發送數據也能夠接收數據 2 go能夠定義單方向的通道,也就是隻發送數據或者只接收數據,聲明以下
var ch1 chan int //正常 var ch2 chan<- float64 //只能寫float64 var ch3 <-chan int //只能讀int類型數據
3 能夠將 channel 隱式轉換爲單向隊列,只收或只發,不能將單向 channel 轉換爲普通channel
package main func main() { //定義個正常的 c:=make(chan int,3) //轉換爲只寫的通道 var send chan <-int =c //轉換爲只讀 var recv <-chan int = c //寫數據 send <- 1 //讀數據 <-recv //不能單的轉回正常的 //xx :=(chan int)(send) }
單向的有啥用?能夠用做生產者和消費者問題
package main import ( "fmt" ) //生產者,只寫 func producter(out chan <- int) { defer close(out) for i :=0;i<5;i++ { out <- i } } //消費者,只讀 func consumer(in <- chan int){ for num :=range in{ fmt.Println(num) } } func main() { //正常通道 c:=make(chan int) //生產者 go producter(c) //消費者 consumer(c) fmt.Println("done") }
9 定時器
Timer:時間到了,響應一次
package main import ( "time" "fmt" ) func main() { ////1.建立定時器 //timer1 := time.NewTimer(time.Second * 2) ////打印系統當前時間 //t1 := time.Now() //fmt.Printf("t1:%v\n", t1) //t2 := <-timer1.C //fmt.Printf("t2:%v\n", t2) //2.timer響應 //timer2 := time.NewTimer(time.Second) //for { // <-timer2.C // fmt.Println("時間到") //} //3.timer延時 //time.Sleep(2 * time.Second) //fmt.Println("2秒時間到") //timer3 := time.NewTimer(3 * time.Second) //<-timer3.C //fmt.Println("3秒時間到") //4.中止定時器 //timer4 := time.NewTimer(3 * time.Second) //go func() { // <-timer4.C // fmt.Println("定時器時間到了") //}() ////中止定時器 //stop := timer4.Stop() //if stop { // fmt.Println("timer4已經關閉") //} //5.重置定時器 timer5 := time.NewTimer(3 * time.Second) timer5.Reset(time.Second) fmt.Println(time.Now()) fmt.Println(<-timer5.C) for { } }
Ticker,不停地響應
package main import ( "time" "fmt" ) func main() { ////1.建立定時器 ticker := time.NewTicker(time.Millisecond) i := 0 go func() { for { <-ticker.C fmt.Println(<-ticker.C) i++ fmt.Println("i=", i) //中止定時器 if i == 5 { ticker.Stop() } } }() for { } }
9 select
1 go語言提供了select關鍵字,能夠監聽channel上的數據流動 2 語法與switch相似,區別是select要求每一個case語句裏必須是一個IO操做
select { case <-chan1: // 若是chan1成功讀到數據,則進行該case處理語句 case chan2 <- 1: // 若是成功向chan2寫入數據,則進行該case處理語句 default: // 若是上面都沒有成功,則進入default處理流程 }
注意: 操做不一樣狀態的chan會引起三種行爲 panic (1)向已經關閉的通道寫數據會致使 panic 最佳實踐是由寫入者關閉通道,能最大程度地避免向已經關閉的通道寫數據而致使的 panic (2)重複關閉的通道會致使 panic
10 阻塞與非阻塞
阻塞: 1 向未初始化的通道寫數據或讀數據都會致使當前goroutine的永久阻塞 2 向緩衝區已滿的通道寫入數據會致使goroutine阻塞 3 通道中沒有數據,讀取該通道會致使goroutine阻塞 非阻塞: 1 讀取已經關閉的通道不會引起阻塞,而是當即返回通道元素類型的零值,comma.ok 語法判斷通道是否已經關閉 2 向有緩衝且沒有滿的通道讀/寫不會引起阻塞
二:併發範式
1,生成器
應用場景:
是調用一個統 的全局的生成器服務, 於生成全
局事務號、訂單號、序列號和隨機數等
(1)帶緩衝的生成器
package main import ( "fmt" "math/rand" ) func GenerateIntA()chan int{ ch :=make(chan int,) //啓動一個goroutine 用於生成隨機數 go func() { for { ch <- rand.Int() //讀取一個隨機數 } }() return ch } func main() { ch := GenerateIntA() fmt.Println(<-ch) fmt.Println(<-ch) }
(2) 多個goroutine加強型生成器
package main import ( "fmt" "math/rand" ) func GenerateIntA() chan int { ch := make(chan int,10) go func() { for { ch<-rand.Int() } }() return ch } func GenerateIntB() chan int { ch :=make(chan int,10) go func() { for { ch <-rand.Int() } }() return ch } func GenerateInt() chan int{ ch :=make(chan int,20) go func() { for { //使用 selecet聚合多條通道服務;所謂的扇出是指將一條通道發散到多條通道中處理,
//在Go語言裏面具體實現就是使用go關鍵字啓動多個goroutine併發處理。
select { case ch <- <- GenerateIntA(): //監聽通道數據 case ch <- <-GenerateIntB(): } } }() return ch } func main() { ch :=GenerateInt() for i :=0; i<100;i++{ fmt.Println(<-ch) } }
運行效果
(3) 有時須要生成器自動退出,能夠藉助go通道的退出機制(close channel to broadcast)來實現
package main import ( "fmt" "math/rand" ) func GenerateIntA(done chan struct{}) chan int { ch := make(chan int) go func() { lable: for { //經過select監聽一個信號chan 來肯定是否中止生成 select { case ch <- rand.Int(): case <- done: break lable } } close(ch) }() return ch } func main() { done := make(chan struct{}) ch := GenerateIntA(done) fmt.Println(<-ch) fmt.Println(<-ch) //不在須要生成器經過close chan 發送一個通知給生成器 close(done) for v :=range ch{ fmt.Println(v) } }
(4) 一個融合了併發、緩衝、退出通知等多重特性的生成器
package main import ( "fmt" "math/rand" ) //done 接收通知退出信號 func GenerateIntA(done chan struct{}) chan int { ch := make(chan int,5) go func() { label: for { select { case ch <- rand.Int(): case <- done: break label } } close(ch) }() return ch } func GenerateIntB(done chan struct{}) chan int { ch :=make(chan int,10) go func() { label: for { select { case ch <-rand.Int(): case <-done: break label } } close(ch) }() return ch } //經過select 執行扇入操做 func GenerateInt( done chan struct{}) chan int { ch := make(chan int) send :=make(chan struct{}) go func() { label: for { select { case ch <- <- GenerateIntA(send): case ch <- <- GenerateIntB(send): case <-done: send <- struct{}{} send <- struct{}{} break label } } close(ch) }() return ch } func main() { //建立一個做爲接收退出信號的chan done := make(chan struct{}) //啓動生成器 ch :=GenerateInt(done) //獲取生成器資源 for i:=0; i<10;i++{ fmt.Println(<- ch) } //通知生產者中止生產 done <- struct{}{} fmt.Println("stop gernarate") }
2 固定工做池
服務器編程中使用最多的就是經過線程池來提高服務的併發處理能力,在go中同樣能夠輕鬆地構建固定數目的goroutines做爲工做線程池
程序中除了主要的main goroutine,還開啓了以下幾類goroutine:
{1} 初始化任務的goroutine
{2} 分發任務的goroutine。
{3} 等待全部worker結束通知,而後關閉結果通道goroutine。main函數負責拉起上述goroutine並從結果通道獲取最終結果。
1,傳遞task任務通道
2.傳遞task結果通道
3,接收worker處理完任務後所發送通知的通道
package main import ( "fmt" ) const ( NUMBER =10 ) //工做任務 type task struct { begin int end int result chan<- int } //任務處理:計算begin到end 的和 //執行結果寫入結果chan result func (t *task) do() { sum := 0 for i :=t.begin;i<=t.end;i++{ sum += i } t.result <- sum } func main() { workers :=NUMBER //建立通道 taskchan :=make(chan task,10) //結果通道 resultchan :=make(chan int,10) //worker 信號通道 done :=make(chan struct{},10) //初始化task的goroutine,計算100個天然數之和 go Inittask(taskchan,resultchan,100) //分發任務到NUMBER個goroutine池中 DistributeTask(taskchan,workers,done) //獲取各個goroutine 處理完成任務通知,並關閉結果通道 go CloseResult(done,resultchan,workers) //t經過結果通道獲取結果並彙總 sum :=ProcessResult(resultchan) fmt.Println("sum=",sum) } //初始化待處理任務 通道 (task cham) func Inittask(taskchan chan<- task,r chan int,p int) { qu := p/10 mod :=p%10 high :=qu*10 for j :=0; j<qu;j++{ b:=10*j +1 e :=10*(j+1) tsk :=task{ begin:b, end:e, result:r, } taskchan <-tsk } if mod != 0 { tsk :=task{ begin:high +1, end: p, result:r, } taskchan<- tsk } close(taskchan) } //讀取task chan 並分發到worker goroutine 處理,總的數量是workers func DistributeTask(taskchan <-chan task, workers int,done chan struct{}) { for i := 0; i<workers;i++{ go ProcessTask(taskchan,done) } } //工做goroutine處理具體工做,並將處理結果發送到 func ProcessTask(taskchan <-chan task,done chan struct{}) { for t :=range taskchan{ t.do() } done <- struct{}{} } //經過done channel 同步等待全部工做goroutine的結束,而後關閉結果chan func CloseResult(done chan struct{},resultchan chan int,workers int) { for i:= 0;i <workers;i++{ <-done } close(done) close(resultchan) } //讀取結果通道 func ProcessResult(resultchan chan int) int { sum := 0 for r := range resultchan{ sum +=r } return sum }
邏輯分析: 1 構建task併發送到task通道中 2 分別啓動n個工做線程,不停地從task通道中獲取任務,而後將結果寫入到結果通道 3 收到結果的goroutine接收到全部task已經處理完畢的信號,主動關閉結果通道 4 main 中函數ProcessResult 讀取並統計全部的結果
3 future模式
應用場景:
編程中常常遇到在一個流程中須要調用多個子調用的狀況,這些子調用相互之間沒有依賴,
若是串行地調用,則耗時會很長,此時可使用GO併發編程中的future模式
工做原理 (1)使用 an 做爲函數參數。 (2)啓動 oroutine 調用函數。 (3)經過 han 傳入參數。 (4)作其餘能夠並行處理的事情。 (5)經過 chan 異步獲取結果
示例:
package main import ( "fmt" "time" ) //一個查詢結構體 //這裏的sql 和result是一個簡單抽象,具體的應用多是更復雜的數據類型 type query struct { //參數Channel sql chan string //結果channel result chan string } //執行query func execQuery(q query){ //啓動協程 go func() { //獲取輸入 sql :=<-q.sql //訪問數據庫 //輸出結果通道 q.result<-"result from " + sql }() } func main() { //初始化Query q :=query{make(chan string),make(chan string,1)} //執行query,注意執行的時候無須準備參數 go execQuery(q) //發送參數 q.sql <-"select * from table" //作其餘事情,經過sleep描述 time.Sleep(1*time.Second) //獲取結果 fmt.Println(<-q.result) } /*打印結果 result from select * from table */
缸」 最大的好處是將函數的同步調用轉換爲異步調用 適用於 個交易須要多 子調用
且這些子調用沒有依賴的場景 實際狀況可能比上面示例複雜得多,要考慮錯誤和異常的處理,
讀者着重體驗這種思想,而不是細節
三:context標準庫
Go 中的 goroutine 之間沒有父與子的關係,也就沒有所謂子進程退出後的通知機制,多個 goroutine 都是平行地被調度,多個 goroutine 如何協做工做涉及通訊、同步、通知和退出四個方面。 通訊: chan 通道固然是 goroutine 之間通訊的基礎, 注意這裏的通訊主要是指程序的數據通道 同步:不帶緩衝的 han 提供了一個自然的同步等待機制:固然 sync.WaitGroup 也爲多個go routine 協同工做提供 種同步等待機制 通知:這個通知和上面通訊的數據不同,通知一般不是業務數據,而是管理、控制流數據。要處理這個也好辦,在輸入端綁定兩個 chan,一個用於業務流數據,另 個用於異常通知 數據,而後經過 select 收斂進行處理。這個方案能夠解決簡 的問題,但不是一個通用的解決方案。 退出: goroutine 之間沒有父子關係,如何通知 goroutine 退出?能夠經過增長一個單獨的通 道,藉助通道和 select 的廣播機制( close channel to broadcast )實現退出
---恢復內容結束---