在Golang中實現有無限容量的同步Queue

chan對象是Golang的一個核心賣點,能夠輕鬆實現goroutine之間的通訊。Golang容許咱們爲chan設置不一樣的緩衝大小。當默認緩衝大小爲0的時候,一個goroutine對chan的寫入操做必需要等到有其餘goroutine對chan進行讀取的時候纔會返回,反之一個goroutine對chan進行讀取的時候要等到另一個goroutine對chan進行寫入纔會返回。若是咱們不但願每次對chan進行讀取和寫入都堵塞的話,能夠對chan設置緩衝大小。這樣,在緩衝區沒滿以前,goroutine能夠不停的對chan進行寫入而不會發生堵塞,直到緩衝區被填滿。git

有時候咱們須要把某個請求或者數據放入到chan中,而後馬上返回去作其餘事情。這種狀況下爲了不chan發生堵塞,咱們須要爲chan設置一個足夠大的緩衝大小。若是緩衝大小設置的太小,就很難避免出現堵塞,而把緩衝大小設置的過大,又會形成額外的內存開銷,由於chan對象在建立(make)的時候就已經分配了足夠的內存做爲緩衝。github

所以我在實際項目中常常使用一個同步的先入先出隊列(SyncQueue)。數據生產者調用隊列的Push函數將數據添加到隊列中,Push函數在任何狀況下不會發生堵塞。數據消費者使用Pop函數得到一個數據。若是隊列中當前爲空,則Pop函數會掛起當前goroutine,直到有其餘goroutine Push新的數據到隊列中。SyncQueue不須要提早生成一個巨大的緩存,所以不會佔用大量的內存,而且提供無限(除非內存滿)的隊列長度。golang

同步隊列(SyncQueue)實現:https://github.com/xiaonanln/go-xnsyncutil/blob/master/xnsyncutil/sync_queue.goapache

接口文檔:https://godoc.org/github.com/xiaonanln/go-xnsyncutil/xnsyncutil#SyncQueue緩存

 1 package xnsyncutil
 2 
 3 import (
 4     "sync"
 5 
 6     "gopkg.in/eapache/queue.v1"
 7 )
 8 
 9 // Synchronous FIFO queue
10 type SyncQueue struct {
11     lock    sync.Mutex
12     popable *sync.Cond
13     buffer  *queue.Queue
14     closed  bool
15 }
16 
17 // Create a new SyncQueue
18 func NewSyncQueue() *SyncQueue {
19     ch := &SyncQueue{
20         buffer: queue.New(),
21     }
22     ch.popable = sync.NewCond(&ch.lock)
23     return ch
24 }
25 
26 // Pop an item from SyncQueue, will block if SyncQueue is empty
27 func (q *SyncQueue) Pop() (v interface{}) {
28     c := q.popable
29     buffer := q.buffer
30 
31     q.lock.Lock()
32     for buffer.Length() == 0 && !q.closed {
33         c.Wait()
34     }
35 
36     if buffer.Length() > 0 {
37         v = buffer.Peek()
38         buffer.Remove()
39     }
40 
41     q.lock.Unlock()
42     return
43 }
44 
45 // Try to pop an item from SyncQueue, will return immediately with bool=false if SyncQueue is empty
46 func (q *SyncQueue) TryPop() (v interface{}, ok bool) {
47     buffer := q.buffer
48 
49     q.lock.Lock()
50 
51     if buffer.Length() > 0 {
52         v = buffer.Peek()
53         buffer.Remove()
54         ok = true
55     } else if q.closed {
56         ok = true
57     }
58 
59     q.lock.Unlock()
60     return
61 }
62 
63 // Push an item to SyncQueue. Always returns immediately without blocking
64 func (q *SyncQueue) Push(v interface{}) {
65     q.lock.Lock()
66     if !q.closed {
67         q.buffer.Add(v)
68         q.popable.Signal()
69     }
70     q.lock.Unlock()
71 }
72 
73 // Get the length of SyncQueue
74 func (q *SyncQueue) Len() (l int) {
75     q.lock.Lock()
76     l = q.buffer.Length()
77     q.lock.Unlock()
78     return
79 }
80 
81 func (q *SyncQueue) Close() {
82     q.lock.Lock()
83     if !q.closed {
84         q.closed = true
85         q.popable.Signal()
86     }
87     q.lock.Unlock()
88 }

 

 

Category: Golang 標籤:changolangqueuesync函數

相關文章
相關標籤/搜索