代碼以下,若有問題請聯繫 baibaibai_000@163.com
package work_test import ( "math/rand" "runtime" "sync" "testing" "time" ) // TestWorker // test目的:展現當多個worker同時處理一個通道的任務,被關閉時,通道剩餘的任務須要被所有處理 func TestWorker(t *testing.T) { rand.Seed(time.Now().UnixNano()) // 虛擬的處理task的函數 work := func(t int) { } // 任務通道 taskChan := make(chan int, 1024) // 關閉 closeChan := make(chan struct{}) wg := sync.WaitGroup{} // worker個數 workerNum := runtime.NumCPU() // 起多個worker來處理taskChan的事務 for i := 0; i < workerNum; i++ { wg.Add(1) go func() { defer wg.Done() LOOPING: for { select { case t := <-taskChan: //處理task work(t) case <-closeChan: //收到外界關閉的事件 // 繼續處理taskChan剩餘的task,直到通道爲空 for hasTask := true; hasTask; { select { case t := <-taskChan: work(t) default: hasTask = false } } break LOOPING } } }() } // 填充任務 go func() { ticker := time.NewTicker(time.Millisecond * time.Duration(1)) defer ticker.Stop() for { select { case <-ticker.C: taskChan <- 1 case <-closeChan: return } } }() // 隨機個事件後關閉 time.Sleep(time.Duration(2+rand.Int()%4) * time.Second) close(closeChan) wg.Wait() // 若是taskChan還有沒處理的任務就失敗了 if len(taskChan) > 0 { t.FailNow() } }