網易:mysql
package main import ( "fmt" ) func main() { var c chan int fmt.Printf("c=%v\n", c) c = make(chan int, 1) fmt.Printf("c=%v\n", c) c <- 100 /* data := <-c fmt.Printf("data:%v\n", data) */ <-c }
nobufChan 不帶緩衝(不帶大小的chan 沒法插入數據的,只有當有人在獲取數據時候才能夠放入數據)git
好比:收快遞:只有快遞員見到你本人後,只能寄快遞github
package main import ( "fmt" "time" ) func produce(c chan int) { c <- 1000 fmt.Println("produce finished") } func consume(c chan int) { data := <-c fmt.Println(data) } func main() { var c chan int fmt.Printf("c=%v\n", c) c = make(chan int) go produce(c) go consume(c) time.Sleep(time.Second * 5) }
goroutine_sync 模擬sleep阻塞的功能sql
package main import ( "fmt" "time" ) func hello(c chan bool) { time.Sleep(5 * time.Second) fmt.Println("hello goroutine") c <- true } func main() { var exitChan chan bool exitChan = make(chan bool) go hello(exitChan) fmt.Println("main thread terminate") <-exitChan }
只讀 只寫的chanapp
package main import "fmt" func sendData(sendch chan<- int) { sendch <- 10 //<-sendch } func readData(sendch <-chan int) { //sendch <- 10 data := <-sendch fmt.Println(data) } func main() { chnl := make(chan int) go sendData(chnl) readData(chnl) }
判斷管道是否關閉ide
package main import ( "fmt" ) func producer(chnl chan int) { for i := 0; i < 10; i++ { chnl <- i } close(chnl) } func main() { ch := make(chan int) go producer(ch) for { v, ok := <-ch if ok == false { fmt.Println("chan is closed") break } fmt.Println("Received ", v) } }
for-range-chan 不須要關注管道是否關閉 管道關閉後 自動退出循環atom
package main import ( "fmt" "time" ) func producer(chnl chan int) { for i := 0; i < 10; i++ { chnl <- i time.Sleep(time.Second) } close(chnl) } func main() { ch := make(chan int) go producer(ch) for v := range ch { fmt.Println("receive:", v) } }
待緩衝的chan(容量)url
特色:當沒有往chan放入數據,直接去獲取數據就會報錯(死鎖);當超過chan容量後,繼續放入數據也會報錯(死鎖)spa
package main import "fmt" func main() { ch := make(chan string, 2) var s string //s = <-ch ch <- "hello" ch <- "world" ch <- "!" //ch <- "test" s1 := <-ch s2 := <-ch fmt.Println(s, s1, s2) }
待緩衝的chan3d
package main import ( "fmt" "time" ) func write(ch chan int) { for i := 0; i < 5; i++ { ch <- i fmt.Println("successfully wrote", i, "to ch") } close(ch) } func main() { ch := make(chan int, 2) go write(ch) time.Sleep(2 * time.Second) for v := range ch { fmt.Println("read value", v, "from ch") time.Sleep(2 * time.Second) } }
長度和容量
package main import ( "fmt" ) func main() { ch := make(chan string, 3) ch <- "naveen" ch <- "paul" fmt.Println("capacity is", cap(ch)) fmt.Println("length is", len(ch)) fmt.Println("read value", <-ch) fmt.Println("new length is", len(ch)) }
如何等待一組goroutine結束?
方法1:low版本
package main import ( "fmt" "time" ) func process(i int, ch chan bool) { fmt.Println("started Goroutine ", i) time.Sleep(2 * time.Second) fmt.Printf("Goroutine %d ended\n", i) ch <- true } func main() { no := 3 exitChan := make(chan bool, no) for i := 0; i < no; i++ { go process(i, exitChan) } for i := 0; i < no; i++ { <-exitChan } fmt.Println("All go routines finished executing") }
方法2:sync.WaitGroup
package main import ( "fmt" "sync" "time" ) func process(i int, wg *sync.WaitGroup) { fmt.Println("started Goroutine ", i) time.Sleep(2 * time.Second) fmt.Printf("Goroutine %d ended\n", i) wg.Done() } func main() { no := 3 var wg sync.WaitGroup wg.Wait() fmt.Println("wait return") for i := 0; i < no; i++ { wg.Add(1) go process(i, &wg) } wg.Wait() fmt.Println("All go routines finished executing") }
workerpool的實現
woker池的實現
a,生產者,消費者模型,簡單有效
b,控制goroutine的數量,防止goroutine泄露和暴漲
c,基於goroutine和chan,構建wokerpool很是簡單
1,任務抽象程一個個job
2,使用job隊列和result隊列
3,開一個組goroutine進行實際任務計算,並把結果放回result隊列
案例:
package main import ( "fmt" "math/rand" ) type Job struct { Number int Id int } type Result struct { job *Job sum int } func calc(job *Job, result chan *Result) { var sum int number := job.Number for number != 0 { tmp := number % 10 sum += tmp number /= 10 } r := &Result{ job: job, sum: sum, } result <- r } func Worker(jobChan chan *Job, resultChan chan *Result) { for job := range jobChan { calc(job, resultChan) } } func startWorkerPool(num int, jobChan chan *Job, resultChan chan *Result) { for i := 0; i < num; i++ { go Worker(jobChan, resultChan) } } func printResult(resultChan chan *Result) { for result := range resultChan { fmt.Printf("job id:%v number:%v result:%d\n", result.job.Id, result.job.Number, result.sum) } } func main() { jobChan := make(chan *Job, 1000) resultChan := make(chan *Result, 1000) startWorkerPool(128, jobChan, resultChan) go printResult(resultChan) var id int for { id++ number := rand.Int() job := &Job{ Id: id, Number: number, } jobChan <- job } }
package main import ( "fmt" "time" ) func server1(ch chan string) { time.Sleep(time.Second * 6) ch <- "response from server1" } func server2(ch chan string) { time.Sleep(time.Second * 3) ch <- "response from server2" } func main() { output1 := make(chan string) output2 := make(chan string) go server1(output1) go server2(output2) /* s1 := <-output1 fmt.Println("s1:", s1) s2 := <-output2 fmt.Println("s2:", s2) */ select { case s1 := <-output1: fmt.Println("s1:", s1) case s2 := <-output2: fmt.Println("s2:", s2) default: fmt.Println("run default") } }
package main import ( "fmt" "time" ) func write(ch chan string) { for { select { case ch <- "hello": fmt.Println("write succ") default: fmt.Println("channel is full") } time.Sleep(time.Millisecond * 500) } } func main() { //select {} output1 := make(chan string, 10) go write(output1) for s := range output1 { fmt.Println("recv:", s) time.Sleep(time.Second) } }
package main import ( "fmt" "sync" ) var x int var wg sync.WaitGroup var mutex sync.Mutex func add() { for i := 0; i < 5000; i++ { mutex.Lock() x = x + 1 mutex.Unlock() } wg.Done() } func main() { wg.Add(2) go add() go add() wg.Wait() fmt.Println("x:", x) }
package main import ( "fmt" "sync" "time" ) var rwlock sync.RWMutex var x int var wg sync.WaitGroup func write() { rwlock.Lock() fmt.Println("write lock") x = x + 1 time.Sleep(10 * time.Second) fmt.Println("write unlock") rwlock.Unlock() wg.Done() } func read(i int) { fmt.Println("wait for rlock") rwlock.RLock() fmt.Printf("goroutine:%d x=%d\n", i, x) time.Sleep(time.Second) rwlock.RUnlock() wg.Done() } func main() { wg.Add(1) go write() time.Sleep(time.Millisecond * 5) for i := 0; i < 10; i++ { wg.Add(1) go read(i) } wg.Wait() }
互斥鎖和讀寫鎖比較
package main import ( "fmt" "sync" "time" ) var rwlock sync.RWMutex var x int var wg sync.WaitGroup var mutex sync.Mutex func write() { for i := 0; i < 100; i++ { //rwlock.Lock() mutex.Lock() x = x + 1 time.Sleep(10 * time.Millisecond) mutex.Unlock() //rwlock.Unlock() } wg.Done() } func read(i int) { for i := 0; i < 100; i++ { //rwlock.RLock() mutex.Lock() time.Sleep(time.Millisecond) mutex.Unlock() //rwlock.RUnlock() } wg.Done() } func main() { start := time.Now().UnixNano() wg.Add(1) go write() for i := 0; i < 100; i++ { wg.Add(1) go read(i) } wg.Wait() end := time.Now().UnixNano() cost := (end - start) / 1000 / 1000 fmt.Println("cost:", cost, "ms") }
package main import ( "fmt" "sync" "sync/atomic" "time" ) var x int32 var wg sync.WaitGroup var mutex sync.Mutex func addMutex() { for i := 0; i < 500; i++ { mutex.Lock() x = x + 1 mutex.Unlock() } wg.Done() } func add() { for i := 0; i < 500; i++ { //mutex.Lock() //x = x +1 atomic.AddInt32(&x, 1) //mutex.Unlock() } wg.Done() } func main() { start := time.Now().UnixNano() for i := 0; i < 10000; i++ { wg.Add(1) go add() //go addMutex() } wg.Wait() end := time.Now().UnixNano() cost := (end - start) / 1000 / 1000 fmt.Println("x:", x, "cost:", cost, "ms") }
其它案例:
先看代碼
package main import ( "strings" "fmt" "time" ) func main() { users:=strings.Split("shenyi,zhangsan,lisi,wangwu",",") ages:=strings.Split("19,21,25,26",",") c1,c2:=make(chan bool),make(chan bool) ret:=make([]string,0) go func() { for _,v:=range users{ <-c1 ret=append(ret,v) time.Sleep(time.Second) c2<-true } }() go func() { for _,v:=range ages{ <-c2 ret=append(ret,v) c1<-true } }() c1<-true fmt.Println(ret) }
打印:
[shenyi]
package main import ( //_ "github.com/go-sql-driver/mysql" "io/ioutil" "net/http" "fmt" ) func main() { url:="https://news.cnblogs.com/n/page/%d/" c:=make(chan map[int][]byte) for i:=1;i<=3;i++{ go func(index int) { url:=fmt.Sprintf(url,index) res,_:=http.Get(url) cnt,_:= ioutil.ReadAll(res.Body) c<-map[int][]byte{index:cnt} if index==3 { close(c) } }(i) } for getcnt:=range c{ for k,v:=range getcnt{ ioutil.WriteFile(fmt.Sprintf("./files/%d",k),v,666) } } }
打印:
。。。。會一直hang住