golang總結-併發

2.7 併發編程

go協程

golang 經過一個go關鍵字就能夠開啓一個協程。編程

func main() {
    //兩個交錯輸出
    go sayHello()
    go sayHello2()
    time.Sleep(time.Second * 3) //阻塞主線程
}

func sayHello() {
    for i := 0; i < 30; i++ {
        fmt.Println("hello world")
    }
}

func sayHello2() {
    for i := 0; i < 30; i++ {
        fmt.Println("你好中國")
    }
}
//經過sync.WaitGroup來等待全部線程完成
package main

import (
    "fmt"
    "sync"
)

func main() {
    var w = &sync.WaitGroup{}
    w.Add(2)
    go sayEn(w)
    go sayZh(w)
    w.Wait()
}

func sayEn(w *sync.WaitGroup) {
    for i := 0; i < 30; i++ {
        fmt.Println("hello world")
    }
    w.Done() //每當這個方法完成則減小1
}

func sayZh(w *sync.WaitGroup) {
    for i := 0; i < 30; i++ {
        fmt.Println("中國你好")
    }
    w.Done() //每當這個方法完成則減小1
}

go管道

管道的定義:併發

//無緩衝管道
flag := make(chan bool)
//有緩衝管道
data := make(chan int, 10)
//向管道中添加值
data <- 10
//從管道中取值
agr := <- data
<- data //也能夠直接釋放值,不用變量接收

1. 經過go實現同步

package main

import (
    "fmt"
)

func main() {
    w1, w2 := make(chan bool), make(chan bool)
    go sayEn_chan(w1)
    go sayZh_chan(w2)
    <- w1 //阻塞,直到chan 能夠取出數據
    <- w2
}

func sayEn_chan(w chan bool) {
    for i := 0; i < 30; i++ {
        fmt.Println("hello world")
    }
    w <- true //方法完成寫入通道
}

func sayZh_chan(w chan bool) {
    for i := 0; i < 30; i++ {
        fmt.Println("中國你好")
    }
    w <- true
}

2. 正確處理累加

package main

import (
    "fmt"
    "sync/atomic"
)

var (
    count int64
)

func main() {
    w1, w2 := make(chan bool), make(chan bool)
    go add(w1)
    go add(w2)
    <- w1 //阻塞,直到chan 能夠取出數據
    <- w2
    fmt.Println(count)
}

func add(w chan bool) {
    for i := 0; i < 5000; i++ {
        atomic.AddInt64(&count, 1)
    }
    w <- true
}

3. 通道實現數據共享

package main

import (
    "fmt"
    "math/rand"
    "sync"
)

var wg sync.WaitGroup

func main() {
    count := make(chan int)
    wg.Add(2)
    go player("張三", count)
    go player("李四", count)
    //發球
    count <- 1
    wg.Wait() //阻塞等待2個線程完成
}

func player(name string, count chan int) {
    defer wg.Done()

    for {
        i, ok := <-count

        if !ok { //通道關閉
            fmt.Printf("運動員 %s 贏了\n", name)
            return
        }

        tmp := rand.Intn(100)
        if tmp % 13 == 0 { //沒有接到球
            fmt.Printf("運動員 %s 輸了\n", name)
            close(count)
            return
        }
        fmt.Printf("運動員 %s 擊球 %d \n", name , i)
        i ++
        count <- i
    }
}

4. 緩衝管道

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    numberTasks = 10
    workers = 4
)

var wg2 sync.WaitGroup

func main() {
    wg2.Add(workers)
    tasks := make(chan int, numberTasks)

    for i := 0; i < workers; i++ {
        go work(tasks, i)
    }

    for j := 1; j <= numberTasks; j++ {
        tasks <- j
    }
    close(tasks)

    wg2.Wait()
}

func work(tasks chan int, worker int) {
    defer wg2.Done()
    for {
        task, ok := <- tasks
        if !ok {
            fmt.Printf("任務完成,工號:%d\n", worker)
            return
        }
        fmt.Printf("工號:%d, 開始工做:%d\n", worker, task)
        time.Sleep(time.Microsecond * 100)
        fmt.Printf("工號:%d, 完成工做:%d\n", worker, task)

    }

}

5. select

select 的特色是:不會阻塞,哪一個管道有值,我取哪一個。因此,下面當運行到go的時候,a,b尚未添值,因此只能選擇defaul運行,這裏能夠把defualt部分和b<-2去掉,select會被阻塞,直到a<-1執行app

func main() {
    a := make(chan int)
    b := make(chan int)
    go func() {
        b <- 2
        time.Sleep(time.Second * 3)
        a <- 1
    }()
    select {
    case <- a:
        fmt.Println("a")
    case <- b:
        fmt.Println("b")
        time.Sleep(time.Second * 3)
    default:
        fmt.Println("hello world")
    }
}

6. runner併發模型

package runner

import (
    "errors"
    "os"
    "os/signal"
    "time"
)

type Runner struct {
    interrupt chan os.Signal

    complete chan error

    timeout <-chan time.Time //聲明一個只讀的管道

    tasks []func(int)
}

var ErrorTimeout = errors.New("receive timeout")

var ErrorInterrupt = errors.New("interrupt error")

func New(duration time.Duration) *Runner {
    return &Runner{
        interrupt: make(chan os.Signal, 1),
        complete: make(chan error),
        timeout: time.After(duration),
    }
}

func (r *Runner) Add(tasks...func(int)) {
    r.tasks = append(r.tasks, tasks...)
}

func (r *Runner) getInterrupt() bool {
    select {
    case <-r.interrupt:
        signal.Stop(r.interrupt)
        return true
    default:
        return false
    }
}

func (r *Runner) run() error {
    for id, task := range r.tasks {
        if r.getInterrupt() {
            return ErrorInterrupt
        }
        task(id)
    }
    return nil
}

func (r *Runner) Start() error {
    signal.Notify(r.interrupt, os.Interrupt)
    go func() {
        r.complete <- r.run()
    }()
    
    select {
    case err := <- r.complete:
        return err
    case <- r.timeout:
        return ErrorTimeout
    }
}

測試測試

package main

import (
    "gorounting/runner"
    "log"
    "os"
    "time"
)

const (
    timeout  = 4 * time.Second
)

func main() {
    log.Println("任務開始")
    ru := runner.New(timeout)
    ru.Add(createTask(), createTask(), createTask(), createTask())

    if err := ru.Start(); err != nil {
        switch err {
        case runner.ErrorInterrupt:
            log.Println("系統被中斷")
            os.Exit(1)
        case runner.ErrorTimeout:
            log.Println("系統超時")
            os.Exit(2)

        }
    }
    log.Println("程序結束")

}

func createTask() func(int) {
    return func(id int) {
        log.Printf("process-task #%d\n", id)
        time.Sleep(time.Duration(id) * time.Second )
    }
}
相關文章
相關標籤/搜索