1、使用場景協程
大背景是從kafka 中讀取oplog進行增量處理,可是當我想發一條命令將這個增量過程阻塞,而後開始進行一次全量同步以後,在開始繼續增量。blog
因此須要對多個協程進行控制。get
2、使用知識kafka
1. 從一個未初始化的管道讀會阻塞同步
2.從一個關閉的管道讀不會阻塞it
利用兩個管道和select 進行控制io
3、上代碼class
控制器代碼import
package util import ( "errors" "sync" ) const ( //STOP 中止 STOP = iota //START 開始 START //PAUSE 暫停 PAUSE ) //Control 控制器 type Control struct { ch1 chan struct{} ch2 chan struct{} stat int64 lock sync.RWMutex } var ( //ErrStat 錯誤狀態 ErrStat = errors.New("stat error") ) //NewControl 得到一個新Control func NewControl() *Control { return &Control{ ch1: make(chan struct{}), ch2: nil, stat: START, lock: sync.RWMutex{}, } } //Stop 中止 func (c *Control) Stop() error { c.lock.Lock() defer c.lock.Unlock() if c.stat == START { c.ch2 = nil close(c.ch1) c.stat = STOP } else if c.stat == PAUSE { ch2 := c.ch2 c.ch2 = nil close(c.ch1) close(ch2) c.stat = STOP } else { return ErrStat } return nil } //Pause 暫停 func (c *Control) Pause() error { c.lock.Lock() defer c.lock.Unlock() if c.stat == START { c.ch2 = make(chan struct{}) close(c.ch1) c.stat = PAUSE } else { return ErrStat } return nil } //Start 開始 func (c *Control) Start() error { c.lock.Lock() defer c.lock.Unlock() if c.stat == PAUSE { c.ch1 = make(chan struct{}) close(c.ch2) c.stat = START } else { return ErrStat } return nil } //C 控制管道 func (c *Control) C() <-chan struct{} { c.lock.RLock() defer c.lock.RUnlock() return c.ch1 } //Wait 等待 func (c *Control) Wait() bool { c.lock.RLock() ch2 := c.ch2 c.lock.RUnlock() if ch2 == nil { //經過賦值nil 發送中止推出命令 return false } <-ch2 //會進行阻塞 return true }
使用代碼select
for { select { case part, ok := <-c.Partitions(): if !ok { conf.Logger.Error("get kafka Partitions not ok", regular.Name) return } go readFromPart(c, part, regular, respChan) case <-regular.C(): //regular 爲Control 類 if !regular.Wait() { conf.Logger.Debug("Stop! ") return } conf.Logger.Debug("Start! ") } }
這樣就能夠隨時隨地的控制工程中的協程
regular := util.NewControl() regular.Pause() regular.Start() regular.Stop()