c#高效的線程安全隊列ConcurrentQueue(上)

 

ConcurrentQueue<T>隊列是一個高效的線程安全的隊列,是.Net Framework 4.0,System.Collections.Concurrent命名空間下的一個數據結構。安全

ConcurrentQueue<T>數據結構

下圖是ConcurrentQueue<T>數據結構的示意圖:數據結構

clip_image002

ConcurrentQueue<T>隊列由若干Segment動態構成,每一個Segment是一塊連續的內存Buffer,大小固定爲SEGMENT_SIZE。函數

ConcurrentQueue<T>私有成員變量

ConcurrentQueue<T>類有三個私有成員變量:post

Segment* volatile m_head;spa

Segment* volatile m_tail;線程

Segment* volatile m_base;指針

m_head指向第一個segment,m_tail指向最後一個segment。這兩個指針指向的對象,隨着入隊列和出隊列操做而不斷變化。code

m_base指針固定指向ConcurrentQueue<T>實例化的第一個Segment,在析構ConcurrentQueue<T>對象時使用。對象

ConcurrentQueue<T>成員方法

void Enqueue(T item)
1
2
3
4
5
6
7
8
void  Enqueue(T item)
{
     DNetSpinWait wait;
     while  (!m_tail->TryAppend(item, &m_tail))
     {
         wait.SpinOnce();
     }
}
1
從m_tail指向的segment中,加入item的值,直到成功加入,函數返回。

該函數會在分配了新的segment後,更新m_tail指針。blog

bool TryDequeue(T* result)
1
2
3
4
5
6
7
8
9
10
11
12
13
bool  TryDequeue(T* result)
{
 
     while  (!IsEmpty())
     {
         if  (m_head->TryRemove(result, &m_head))
         {
             return  true ;
         }  
     }
     result = NULL;
     return  false ;
}

若是當前隊列爲空,返回false,不然返回隊列的第一個元素。

bool TryPeek(T* result)
1
2
3
4
5
6
7
8
9
10
11
12
bool  TryPeek(T* result)
{
     while  (!IsEmpty())
     {
         if  (m_head->TryPeek(result))
         {
             return  true ;
         }
     }
     result = NULL;
     return  false ;
}

跟TryDequeue()方法類似。

int Count()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int  Count()
{
     Segment* segment;
     Segment* segment2;
     int  num;
     int  num2;
     GetHeadTailPositions(&segment, &segment2, &num, &num2);
     if  (segment == segment2)
     {
         return  ((num2 - num) + 1);
     }
     int  num3 = SEGMENT_SIZE - num;
     num3 += SEGMENT_SIZE * ((( int ) (segment2->GetIndex() - segment->GetIndex())) - 1);
     return  (num3 + (num2 + 1));
}

經過獲得當前首尾的segment指針,以及首指針的m_low索引,以及尾指針的m_high索引,計算當前隊列中元素的個數。

該方法用到了GetHeadTailPositions方法。

bool IsEmpty()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
bool  IsEmpty()
{
     Segment* head = m_head;
     if  (head->IsEmpty())
     {
         if  (head->GetNext() == NULL)
         {
             return  true ;
         }
         DNetSpinWait wait;
         while  (head->IsEmpty())
         {
             if  (head->GetNext() == NULL)
             {
                 return  true ;
             }
             wait.SpinOnce();
             head = m_head;
         }
     }
     return  false ;
}

斷定當前隊列爲空有兩個條件,第一,m_head指向的segment爲空;第二,m_head->GetNext()也爲空,即m_head和m_tail指向同一個segment。

void Reset()
1
2
3
4
5
void  Reset()
{
     DeleteNodes();
     m_base = m_head = m_tail = new  Segment(0);
}

重置ConcurrentQueue<T>對象,刪除已經分配了的segment,並從新更新成員變量的值。

void GetHeadTailPositions(Segment** head, Segment** tail, int* headLow, int* tailHigh)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void  GetHeadTailPositions(Segment** head, Segment** tail, int * headLow, int * tailHigh)
{
     *head = m_head;
     *tail = m_tail;
     *headLow = (*head)->GetLow();
     *tailHigh = (*tail)->GetHigh();
     DNetSpinWait wait;
     while  ((((*head != m_head) || (*tail != m_tail)) ||
         ((*headLow != (*head)->GetLow()) || (*tailHigh != (*tail)->GetHigh()))) ||
         ((*head)->GetIndex() > (*tail)->GetIndex()))
     {
         wait.SpinOnce();
         *head = m_head;
         *tail = m_tail;
         *headLow = (*head)->GetLow();
         *tailHigh = (*tail)->GetHigh();
     }
}

該函數就是將隊列當前的m_head, m_tail指針以及m_head的m_low索引,m_tail的m_high索引取出來,放到線程棧上。而且在取出這些值後,再判斷這些值是否合法。

相關文章
相關標籤/搜索