c# 高效的線程安全隊列ConcurrentQueue(下) Segment類

 

 

Segment成員變量

 

        long long m_index;


記錄該segment的索引號。數組

        int* volatile m_state;


狀態數組,標識所對應的元素節點的狀態,默認值爲0,若是該元素節點添加了值,則標記爲1。函數

        T* volatile m_array;


隊列元素存儲空間的指針。spa

        Segment* volatile m_next;


指向下一個segment的指針。線程

        volatile long m_high;


標識在當前segment,元素最後添加的索引值,初始值爲-1,若是該segment被填滿了,則該值爲SEGMENT_SIZE – 1。指針

        volatile long m_low;


標識在當前segment,元素最後取出位置的索引值,初始值爲0,若是該segment一個都沒有取走元素,則該值爲0。若是m_low >m_high,表示該segment爲空。code

Segment成員函數

 

void Grow(Segment* volatile* tail)

1
2
3
4
5
6
void  Grow(Segment* volatile * tail)
{
     Segment* segment = new  Segment(m_index + 1);
     m_next = segment;
     *tail = m_next;
}
1
建立下一個segment,並將tail指針指向新建立的segment;

bool TryAppend(T value, Segment* volatile *  tail)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
bool  TryAppend(T value, Segment* volatile  *  tail)
{
     if  (m_high >= SEGMENT_SIZE - 1)
     {
         return  false ;
     }
 
     int  index = SEGMENT_SIZE;
 
     index = InterlockedIncrement(&m_high);
 
     if  (index <= SEGMENT_SIZE - 1)
     {
         m_array[index] = value;
         m_state[index] = 1;
     }
     if  (index == SEGMENT_SIZE - 1)
     {
         Grow(tail);
     }
     
     return  (index <= SEGMENT_SIZE - 1);
}

往當前segment裏面,增長一個元素,若是添加滿了,就建立下一個segment。索引

bool TryPeek(T* result)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bool  TryPeek(T* result)
{
     int  low = GetLow();
     if  (low > GetHigh())
     {
         return  false ;
     }
 
     DNetSpinWait wait;
     while  (m_state[low] == 0)
     {
         wait.SpinOnce();
     }
     *result = m_array[low];
     return  true ;
}

若是segment爲空,返回false,不然,返回low所在位置的值。隊列

bool TryRemove(T* result, Segment* volatile * head)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
bool  TryRemove(T* result, Segment* volatile  * head)
{
     DNetSpinWait wait;
     int  low = GetLow();
     for  ( int  i = GetHigh(); low <= i; i = GetHigh())
     {
         if  (InterlockedCompareExchange(&m_low, low + 1, low) == low)
         {  
 
             DNetSpinWait wait2;
             while  (m_state[low] == 0)
             {
                 wait2.SpinOnce();
             }
             *result = m_array[low];
             if  ((low + 1) >= SEGMENT_SIZE)
             {
                 wait2.Reset();
                 while  (m_next == NULL)
                 {
                     wait2.SpinOnce();
                 }
                 *head = m_next;
             }
             return  true ;
         }
         wait.SpinOnce();
         low = GetLow();
     }
     result = NULL;
     return  false ;
}


這是最複雜的一個方法。利用了InterlockedCompareExchange方法,該方法的解釋:
LONG __cdecl InterlockedCompareExchange(
  __inout  LONG volatile* Destination,
  __in     LONG Exchange,
  __in     LONG Comparand
);
Parameters
Destination 
A pointer to the destination value. The sign is ignored.ci

Exchange 
The exchange value. The sign is ignored.rem

Comparand 
The value to compare to Destination. The sign is ignored.

Return Value
The function returns the initial value of the Destination parameter.

經過自旋來保證線程同步。

int GetHigh()



    return min(m_high, SEGMENT_SIZE - 1);
}

bool IsEmpty()


{
    return m_low > m_high;
}

int GetLow()



    return min(m_low, SEGMENT_SIZE);
}

Segment* GetNext()


{
    return m_next;
}

long long GetIndex()

{    return m_index;}

相關文章
相關標籤/搜索