golang之channel使用簡述

Channel

golang CSP 模型中的 C, 主要用於goroutine之間消息的傳遞,咱們知道在寫代碼的過程當中,解偶是很是重要的一環,而使用channel則能夠很好的隔離goroutine,使得goroutne之間的交互,只須要將重心關注在如何從channel中消費或者生產消息。
  • 聲明和使用
  • 阻塞場景
  • 關閉Channel
  • select & range
  • 使用channel模擬生產消費模型

聲明和使用

使用make聲明一個channel
ch := make(chan int)

    ch <- 1    // write    ch位於 <- 的左側(表明數據流入
    <- ch    // read        ch位於 <- 的右側(表明數據流出

阻塞場景

在真正使用channel前,咱們須要瞭解channel可能會產生 阻塞場景的全部可能,以防止在代碼中編寫出不符合咱們預期的代碼。
下面咱們羅列出可能的 四種情形
無緩衝
channel中無數據,可是執行 <- channel (讀
ch := make(chan interface{})

    <-ch
    fmt.Println("read buf succ")
channel中無數據,往 channel <- (寫 ,可是沒有goroutine讀取。
ch := make(chan interface{})

    ch <- 1

    fmt.Println("read buf succ")
有緩衝
channel中無數據,可是執行 <- channel
ch := make(chan interface{}, 1)

    <-ch
    fmt.Println("read buf succ")
channel中已滿, 繼續執行 channel <- 動做,可是沒有goroutine讀取。
ch := make(chan interface{}, 1)

    ch <- 1
    ch <- 2
    fmt.Println("read buf succ")

關閉Channel

使用 close關閉channel
ch := make(chan interface{})
    close(ch)

關閉channel須要注意golang

  1. 重複關閉會 panic
  2. 向關閉的channel發送數據會panic
  3. 從關閉的channel讀取數據,會讀取到值的初始值,好比interface類型,讀取到的就是nil

select & range

range 字段會阻塞監聽 channel, 直到channel 被close。
func recv(ch chan int) {
    for msg := range ch { // 使用 range 能夠自動等待 ch 的行爲, 直到ch 被close。
        fmt.Println(msg)
    }

    fmt.Println("channel closed")
}

func send(ch chan int, msg int) {
    ch <- msg
}

func main() {
    ch := make(chan int, 2)

    go recv(ch)
    ch <- 1
    ch <- 2
    ch <- 3

    time.AfterFunc(time.Second*2, func() {
        close(ch)
    })
}

select 的大體工做原理app

  1. 檢查全部的case
  2. 當檢查的case已經能夠發送|接收,則執行當前代碼塊
  3. 當有多個case能夠執行,則隨機選擇一個執行
  4. 當沒有case能夠執行,則阻塞
  5. 若是存在default,當沒有可執行代碼塊時,則執行default代碼塊

使用select來管理channel的讀取, 經過default防止阻塞.性能

func readCh(ch chan interface{}) error {
    select {
    case v := <-ch:
        fmt.Println(v)
    default:
        return errors.New("no data")
    }

    return nil
}
使用 timer 或者 context 來進行到期退出斷定. 另外咱們也可使用sync.Once()這種形式設定一個開關,
來控制select的退出邏輯,可參照grpc/internal/grpcsync/event.go
func readCh(ch chan interface{}) error {
    select {
    case v := <-ch:
        fmt.Println(v)
    case <-time.After(time.Second):
        return errors.New("time arrived")
    }

    return nil
}

使用channel模擬生產消費模型

下面代碼的 Unbounded 實現摘自grpc/internal/buffer/unbounded.go,
它沒有選擇使用帶容量的channel,而是另外使用了一個list來備份積壓的消息,這裏我猜有兩個緣由code

  1. 使用這種方式channel變成了一個任意長度的channel,不用考慮channel被寫滿致使的問題。
  2. 這裏爲何不直接使用list + mutex,由於須要channel的特性來隔離goroutine。
type Unbounded struct {
    c       chan interface{}
    backlog []interface{}

    sync.Mutex
}

func NewUnbounded() *Unbounded {
    return &Unbounded{c: make(chan interface{}, 1)}
}

func (b *Unbounded) Put(t interface{}) {
    b.Lock()
    if len(b.backlog) == 0 {
        select {
        case b.c <- t:
            b.Unlock()
            return
        default:
        }
    }
    b.backlog = append(b.backlog, t)
    b.Unlock()
}

func (b *Unbounded) Load() {
    b.Lock()
    if len(b.backlog) > 0 {
        select {
        case b.c <- b.backlog[0]:
            b.backlog[0] = nil
            b.backlog = b.backlog[1:]
        default:
        }
    }
    b.Unlock()
}

func (b *Unbounded) Get() <-chan interface{} {
    return b.c
}

var q *Queue

type Queue struct {
    buf *Unbounded
}
type QueueInterface interface {
    consume()
    produce(info int)
}

func (q *Queue) consume() {
    for {
        select {
        case t := <-q.buf.Get():
            q.buf.Load()
            fmt.Println(t)
        case <-time.After(time.Second * 10):
            fmt.Println(errors.New("the end"))
        }
    }
}

func (q *Queue) produce(info int) {
    q.buf.Put(info)
}

func main() {

    q := &Queue{
        buf: NewUnbounded(),
    }

    go q.consume()

    q.produce(1)
    q.produce(3)
    time.AfterFunc(time.Second*2, func() {
        for i := 0; i < 3; i++ {
            q.produce(4)
        }
        //q.produce(4)
    })

    select {}
}
注: 這裏的實現使用了interface做爲channel的消息體,凡是在有性能瓶頸的地方應該使用具體的類型獨立實現一版,相似grpc/internal/transport.go中的recvBuffer
相關文章
相關標籤/搜索