chan對象是Golang的一個核心賣點,能夠輕鬆實現goroutine之間的通訊。Golang容許咱們爲chan設置不一樣的緩衝大小。當默認緩衝大小爲0的時候,一個goroutine對chan的寫入操做必需要等到有其餘goroutine對chan進行讀取的時候纔會返回,反之一個goroutine對chan進行讀取的時候要等到另一個goroutine對chan進行寫入纔會返回。若是咱們不但願每次對chan進行讀取和寫入都堵塞的話,能夠對chan設置緩衝大小。這樣,在緩衝區沒滿以前,goroutine能夠不停的對chan進行寫入而不會發生堵塞,直到緩衝區被填滿。git
有時候咱們須要把某個請求或者數據放入到chan中,而後馬上返回去作其餘事情。這種狀況下爲了不chan發生堵塞,咱們須要爲chan設置一個足夠大的緩衝大小。若是緩衝大小設置的太小,就很難避免出現堵塞,而把緩衝大小設置的過大,又會形成額外的內存開銷,由於chan對象在建立(make)的時候就已經分配了足夠的內存做爲緩衝。github
所以我在實際項目中常常使用一個同步的先入先出隊列(SyncQueue)。數據生產者調用隊列的Push函數將數據添加到隊列中,Push函數在任何狀況下不會發生堵塞。數據消費者使用Pop函數得到一個數據。若是隊列中當前爲空,則Pop函數會掛起當前goroutine,直到有其餘goroutine Push新的數據到隊列中。SyncQueue不須要提早生成一個巨大的緩存,所以不會佔用大量的內存,而且提供無限(除非內存滿)的隊列長度。golang
同步隊列(SyncQueue)實現:https://github.com/xiaonanln/go-xnsyncutil/blob/master/xnsyncutil/sync_queue.goapache
接口文檔:https://godoc.org/github.com/xiaonanln/go-xnsyncutil/xnsyncutil#SyncQueue緩存
1 package xnsyncutil 2 3 import ( 4 "sync" 5 6 "gopkg.in/eapache/queue.v1" 7 ) 8 9 // Synchronous FIFO queue 10 type SyncQueue struct { 11 lock sync.Mutex 12 popable *sync.Cond 13 buffer *queue.Queue 14 closed bool 15 } 16 17 // Create a new SyncQueue 18 func NewSyncQueue() *SyncQueue { 19 ch := &SyncQueue{ 20 buffer: queue.New(), 21 } 22 ch.popable = sync.NewCond(&ch.lock) 23 return ch 24 } 25 26 // Pop an item from SyncQueue, will block if SyncQueue is empty 27 func (q *SyncQueue) Pop() (v interface{}) { 28 c := q.popable 29 buffer := q.buffer 30 31 q.lock.Lock() 32 for buffer.Length() == 0 && !q.closed { 33 c.Wait() 34 } 35 36 if buffer.Length() > 0 { 37 v = buffer.Peek() 38 buffer.Remove() 39 } 40 41 q.lock.Unlock() 42 return 43 } 44 45 // Try to pop an item from SyncQueue, will return immediately with bool=false if SyncQueue is empty 46 func (q *SyncQueue) TryPop() (v interface{}, ok bool) { 47 buffer := q.buffer 48 49 q.lock.Lock() 50 51 if buffer.Length() > 0 { 52 v = buffer.Peek() 53 buffer.Remove() 54 ok = true 55 } else if q.closed { 56 ok = true 57 } 58 59 q.lock.Unlock() 60 return 61 } 62 63 // Push an item to SyncQueue. Always returns immediately without blocking 64 func (q *SyncQueue) Push(v interface{}) { 65 q.lock.Lock() 66 if !q.closed { 67 q.buffer.Add(v) 68 q.popable.Signal() 69 } 70 q.lock.Unlock() 71 } 72 73 // Get the length of SyncQueue 74 func (q *SyncQueue) Len() (l int) { 75 q.lock.Lock() 76 l = q.buffer.Length() 77 q.lock.Unlock() 78 return 79 } 80 81 func (q *SyncQueue) Close() { 82 q.lock.Lock() 83 if !q.closed { 84 q.closed = true 85 q.popable.Signal() 86 } 87 q.lock.Unlock() 88 }