一、前言html
隊列在計算機中很是重要的一種數據結構,尤爲在操做系統中。隊列典型的特徵是先進先出(FIFO),符合流水線業務流程。在進程間通訊、網絡通訊之間常常採用隊列作緩存,緩解數據處理壓力。結合本身在工做中遇到的隊列問題,總結一下對不一樣場景下的隊列實現。根據操做隊列的場景分爲:單生產者——單消費者、多生產者——單消費者、單生產者——多消費者、多生產者——多消費者四大模型。其實後面三種的隊列,能夠概括爲一種多對多。根據隊列中數據分爲:隊列中的數據是定長的、隊列中的數據是變長的。linux
二、隊列操做模型算法
(1)單生產者——單消費者shell
(2)多生產者——單消費者windows
(3)單生產者——多消費者api
(4)多生產者——多消費者數組
三、隊列數據定長與變長緩存
(1)隊列數據定長網絡
(2)隊列數據變長數據結構
四、併發無鎖處理
(1)單生產者——單消費者模型
此種場景不須要加鎖,定長的能夠經過讀指針和寫指針進行控制隊列操做,變長的經過讀指針、寫指針、結束指針控制操做。具體實現能夠參考linux內核提供的kfifo的實現。能夠參考:
http://blog.csdn.net/linyt/article/details/5764312
(2)(一)多對多(一)模型
正常邏輯操做是要對隊列操做進行加鎖處理。加鎖的性能開銷較大,通常採用無鎖實現。無鎖實現原理是CAS、FAA等機制。定長的能夠參考:
http://coolshell.cn/articles/8239.html
變長的能夠參考intel dpdk提供的rte_ring的實現。
http://blog.csdn.net/linzhaolover/article/details/9771329
無鎖隊列的實現
關於無鎖隊列的實現,網上有不少文章,雖然本文可能和那些文章有所重複,可是我仍是想以我本身的方式把這些文章中的重要的知識點串起來和你們講一講這個技術。下面開始正文。
關於CAS等原子操做
在開始說無鎖隊列以前,咱們須要知道一個很重要的技術就是CAS操做——Compare & Set,或是 Compare & Swap,如今幾乎全部的CPU指令都支持CAS的原子操做,X86下對應的是 CMPXCHG 彙編指令。有了這個原子操做,咱們就能夠用其來實現各類無鎖(lock free)的數據結構。
這個操做用C語言來描述就是下面這個樣子:(代碼來自Wikipedia的Compare And Swap詞條)意思就是說,看一看內存*reg裏的值是否是oldval,若是是的話,則對其賦值newval。
int
compare_and_swap (
int
* reg,
int
oldval,
int
newval)
{
int
old_reg_val = *reg;
if
(old_reg_val == oldval)
*reg = newval;
return
old_reg_val;
}
這個操做能夠變種爲返回bool值的形式(返回 bool值的好處在於,能夠調用者知道有沒有更新成功):
bool
compare_and_swap (
int
*accum,
int
*dest,
int
newval)
{
if
( *accum == *dest ) {
*dest = newval;
return
true
;
}
return
false
;
}
與CAS類似的還有下面的原子操做:(這些東西你們本身看Wikipedia吧)
注:在實際的C/C++程序中,CAS的各類實現版本以下:
1)GCC的CAS
GCC4.1+版本中支持CAS的原子操做(完整的原子操做可參看 GCC Atomic Builtins)
bool
__sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
2)Windows的CAS
在Windows下,你可使用下面的Windows API來完成CAS:(完整的Windows原子操做可參看MSDN的InterLocked Functions)
InterlockedCompareExchange ( __inout
LONG
volatile
*Target,
__in
LONG
Exchange,
__in
LONG
Comperand);
3) C++11中的CAS
C++11中的STL中的atomic類的函數可讓你跨平臺。(完整的C++11的原子操做可參看 Atomic Operation Library)
template
<
class
T >
bool
atomic_compare_exchange_weak( std::atomic* obj,
T* expected, T desired );
template
<
class
T >
bool
atomic_compare_exchange_weak(
volatile
std::atomic* obj,
T* expected, T desired );
無鎖隊列的鏈表實現
下面的東西主要來自John D. Valois 1994年10月在拉斯維加斯的並行和分佈系統系統國際大會上的一篇論文——《Implementing Lock-Free Queues》。
咱們先來看一下進隊列用CAS實現的方式:
EnQueue(x)
{
q =
new
record();
q->value = x;
q->next = NULL;
do
{
p = tail;
}
while
( CAS(p->next, NULL, q) != TRUE);
CAS(tail, p, q);
}
咱們能夠看到,程序中的那個 do- while 的 Re-Try-Loop。就是說,頗有可能我在準備在隊列尾加入結點時,別的線程已經加成功了,因而tail指針就變了,因而個人CAS返回了false,因而程序再試,直到試成功爲止。這個很像咱們的搶電話熱線的不停重播的狀況。
你會看到,爲何咱們的「置尾結點」的操做(第12行)不判斷是否成功,由於:
- 若是有一個線程T1,它的while中的CAS若是成功的話,那麼其它全部的 隨後線程的CAS都會失敗,而後就會再循環,
- 此時,若是T1 線程尚未更新tail指針,其它的線程繼續失敗,由於tail->next不是NULL了。
- 直到T1線程更新完tail指針,因而其它的線程中的某個線程就能夠獲得新的tail指針,繼續往下走了。
這裏有一個潛在的問題——若是T1線程在用CAS更新tail指針的以前,線程停掉或是掛掉了,那麼其它線程就進入死循環了。下面是改良版的EnQueue()
EnQueue(x)
{
q =
new
record();
q->value = x;
q->next = NULL;
p = tail;
oldp = p
do
{
while
(p->next != NULL)
p = p->next;
}
while
( CAS(p.next, NULL, q) != TRUE);
CAS(tail, oldp, q);
}
咱們讓每一個線程,本身fetch 指針 p 到鏈表尾。可是這樣的fetch會很影響性能。而通實際狀況看下來,99.9%的狀況不會有線程停轉的狀況,因此,更好的作法是,你能夠接合上述的這兩個版本,若是retry的次數超了一個值的話(好比說3次),那麼,就本身fetch指針。
好了,咱們解決了EnQueue,咱們再來看看DeQueue的代碼:(很簡單,我就不解釋了)
DeQueue()
{
do
{
p = head;
if
(p->next == NULL){
return
ERR_EMPTY_QUEUE;
}
while
( CAS(head, p, p->next) != TRUE );
return
p->next->value;
}
咱們能夠看到,DeQueue的代碼操做的是 head->next,而不是head自己。這樣考慮是由於一個邊界條件,咱們須要一個dummy的頭指針來解決鏈表中若是隻有一個元素,head和tail都指向同一個結點的問題,這樣EnQueue和DeQueue要互相排斥了。
注:上圖的tail正處於更新以前的裝態。
CAS的ABA問題
所謂ABA(見維基百科的ABA詞條),問題基本是這個樣子:
- 進程P1在共享變量中讀到值爲A
- P1被搶佔了,進程P2執行
- P2把共享變量裏的值從A改爲了B,再改回到A,此時被P1搶佔。
- P1回來看到共享變量裏的值沒有被改變,因而繼續執行。
雖然P1覺得變量值沒有改變,繼續執行了,可是這個會引起一些潛在的問題。ABA問題最容易發生在lock free 的算法中的,CAS首當其衝,由於CAS判斷的是指針的地址。若是這個地址被重用了呢,問題就很大了。(地址被重用是很常常發生的,一個內存分配後釋放了,再分配,頗有可能仍是原來的地址)
好比上述的DeQueue()函數,由於咱們要讓head和tail分開,因此咱們引入了一個dummy指針給head,當咱們作CAS的以前,若是head的那塊內存被回收並被重用了,而重用的內存又被EnQueue()進來了,這會有很大的問題。(內存管理中重用內存基本上是一種很常見的行爲)
這個例子你可能沒有看懂,維基百科上給了一個活生生的例子——
你拿着一個裝滿錢的手提箱在飛機場,此時過來了一個火辣性感的美女,而後她很暖昧地挑逗着你,並趁你不注意的時候,把用一個如出一轍的手提箱和你那裝滿錢的箱子調了個包,而後就離開了,你看到你的手提箱還在那,因而就提着手提箱去趕飛機去了。
這就是ABA的問題。
解決ABA的問題
維基百科上給了一個解——使用double-CAS(雙保險的CAS),例如,在32位系統上,咱們要檢查64位的內容
1)一次用CAS檢查雙倍長度的值,前半部是指針,後半部分是一個計數器。
2)只有這兩個都同樣,纔算經過檢查,要吧賦新的值。並把計數器累加1。
這樣一來,ABA發生時,雖然值同樣,可是計數器就不同(可是在32位的系統上,這個計數器會溢出回來又從1開始的,這仍是會有ABA的問題)
固然,咱們這個隊列的問題就是不想讓那個內存重用,這樣明確的業務問題比較好解決,論文《Implementing Lock-Free Queues》給出一這麼一個方法——使用結點內存引用計數refcnt!
SafeRead(q)
{
loop:
p = q->next;
if
(p == NULL){
return
p;
}
Fetch&Add(p->refcnt, 1);
if
(p == q->next){
return
p;
}
else
{
Release(p);
}
goto
loop;
}
其中的 Fetch&Add和Release分是是加引用計數和減引用計數,都是原子操做,這樣就能夠阻止內存被回收了。
用數組實現無鎖隊列
本實現來自論文《Implementing Lock-Free Queues》
使用數組來實現隊列是很常見的方法,由於沒有內存的分部和釋放,一切都會變得簡單,實現的思路以下:
1)數組隊列應該是一個ring buffer形式的數組(環形數組)
2)數組的元素應該有三個可能的值:HEAD,TAIL,EMPTY(固然,還有實際的數據)
3)數組一開始所有初始化成EMPTY,有兩個相鄰的元素要初始化成HEAD和TAIL,這表明空隊列。
4)EnQueue操做。假設數據x要入隊列,定位TAIL的位置,使用double-CAS方法把(TAIL, EMPTY) 更新成 (x, TAIL)。須要注意,若是找不到(TAIL, EMPTY),則說明隊列滿了。
5)DeQueue操做。定位HEAD的位置,把(HEAD, x)更新成(EMPTY, HEAD),並把x返回。一樣須要注意,若是x是TAIL,則說明隊列爲空。
算法的一個關鍵是——如何定位HEAD或TAIL?
1)咱們能夠聲明兩個計數器,一個用來計數EnQueue的次數,一個用來計數DeQueue的次數。
2)這兩個計算器使用使用Fetch&ADD來進行原子累加,在EnQueue或DeQueue完成的時候累加就行了。
3)累加後求個模什麼的就能夠知道TAIL和HEAD的位置了。
以下圖所示:
小結
以上基本上就是全部的無鎖隊列的技術細節,這些技術均可以用在其它的無鎖數據結構上。
1)無鎖隊列主要是經過CAS、FAA這些原子操做,和Retry-Loop實現。
2)對於Retry-Loop,我我的感受其實和鎖什麼什麼兩樣。只是這種「鎖」的粒度變小了,主要是「鎖」HEAD和TAIL這兩個關鍵資源。而不是整個數據結構。
還有一些和Lock Free的文章你能夠去看看:
【
注:我配了一張look-free的自行車,寓意爲——若是不用專門的車鎖,那麼自行得本身鎖本身!】
intel dpdk api ring 模塊源碼詳解
摘要
intel dpdk 提供了一套ring 隊列管理代碼,支持單生產者產品入列,單消費者產品出列;多名生產者產品入列,多產品消費這產品出列操做;
咱們以app/test/test_ring.c文件中的代碼進行講解,test_ring_basic_ex()函數完成一個基本功能測試函數;
一、ring的建立
- rp = rte_ring_create("test_ring_basic_ex", RING_SIZE, SOCKET_ID_ANY,
- RING_F_SP_ENQ | RING_F_SC_DEQ);
調用rte_ring_create函數去建立一個ring,
第一參數"test_ring_basic_ex"是這個ring的名字,
第二個參數RING_SIZE是ring的大小;
第三個參數是在哪一個socket id上建立 ,這指定的是任意;
第四個參數是指定此ring支持單入單出;
我看一下rte_ring_create函數主要完成了哪些操做;
- rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
執行讀寫鎖的加鎖操做;
- mz = rte_memzone_reserve(mz_name, ring_size, socket_id, mz_flags);
預留一部份內存空間給ring,其大小就是RING_SIZE個sizeof(struct rte_ring)的尺寸;
- r = mz->addr;
-
- memset(r, 0, sizeof(*r));
- rte_snprintf(r->name, sizeof(r->name), "%s", name);
- r->flags = flags;
- r->prod.watermark = count;
- r->prod.sp_enqueue = !!(flags & RING_F_SP_ENQ);
- r->cons.sc_dequeue = !!(flags & RING_F_SC_DEQ);
- r->prod.size = r->cons.size = count;
- r->prod.mask = r->cons.mask = count-1;
- r->prod.head = r->cons.head = 0;
- r->prod.tail = r->cons.tail = 0;
-
- TAILQ_INSERT_TAIL(ring_list, r, next);
將獲取到的虛擬地址給了ring,而後初始化她,prod 表明生成者,cons表明消費者;
生產者最大能夠生產count個,其取模的掩碼是 count-1; 目前是0個產品,因此將生產者的頭和消費者頭都設置爲0;其尾也設置未0;
- rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
執行讀寫鎖的寫鎖解鎖操做;
二、ring的單生產者產品入列
- rte_ring_enqueue(rp, obj[i])
ring的單個入列;
最終會調用到上面這個函數,進行單次入列,咱們看一下它的實現;
- prod_head = r->prod.head;
- cons_tail = r->cons.tail;
暫時將生產者的頭索引和消費者的尾部索引交給臨時變量;
- free_entries = mask + cons_tail - prod_head;
計算還有多少剩餘的存儲空間;
- prod_next = prod_head + n;
- r->prod.head = prod_next;
若是有足夠的剩餘空間,咱們先將臨時變量prod_next 進行後移,同事將生產者的頭索引後移n個;
- for (i = 0; likely(i < n); i++)
- r->ring[(prod_head + i) & mask] = obj_table[i];
- rte_wmb();
執行寫操做,將目標進行入隊操做,它並無任何大數據量的內存拷貝操做,只是進行指針的賦值操做,所以dpdk的內存操做很快,應該算是零拷貝;
- r->prod.tail = prod_next;
成功寫入以後,將生產者的尾部索引賦值爲prox_next ,也就是將其日後挪到n個索引;咱們成功插入了n個產品;目前是單個操做,索引目前n=1;
三、ring的單消費者產品出列
- rte_ring_dequeue(rp, &obj[i]);
一樣出隊也包含了好幾層的調用,最終定位到__rte_ring_sc_do_dequeue函數;
- cons_head = r->cons.head;
- prod_tail = r->prod.tail;
先將消費者的頭索引和生產者的頭索引賦值給臨時變量;
- entries = prod_tail - cons_head;
計算目前ring中有多少產品;
- cons_next = cons_head + n;
- r->cons.head = cons_next;
若是有足夠的產品,就將臨時變量cons_next日後挪到n個值,指向你想取出幾個產品的位置;同時將消費者的頭索引日後挪到n個;這目前n=1;由於是單個取出;
- rte_rmb();
- for (i = 0; likely(i < n); i++) {
- obj_table[i] = r->ring[(cons_head + i) & mask];
- }
執行讀取操做,一樣沒有任何的大的數據量拷貝,只是進行指針的賦值;
- r->cons.tail = cons_next;
最後將消費者的尾部索引也像後挪動n個,最終等於消費者的頭索引;
四、ring的多生產者產品入列
多生產者入列的實現是在 __rte_ring_mp_do_enqueue()函數中;在dpdk/lib/librte_ring/rte_ring.h 文件中定義;其實這個函數和單入列函數很類似;
-
- do {
-
- n = max;
- .................
-
- prod_next = prod_head + n;
- success = rte_atomic32_cmpset(&r->prod.head, prod_head,
- prod_next);
- } while (unlikely(success == 0));
在單生產者中時將生產者的頭部和消費者的尾部直接賦值給臨時變量,去求剩餘存儲空間;最後將生產者的頭索引日後移動n個,
但在多生產者中,要判斷這個頭部是否和其餘的生產者發出競爭,
success = rte_atomic32_cmpset(&r->prod.head, prod_head,
prod_next);
是否有其餘生產者修改了prod.head,因此這要從新判斷一下prod.head是否還等於prod_head,若是等於,就將其日後移動n個,也就是將prod_next值賦值給prod.head;
若是不等於,就會失敗,就須要進入do while循環再次循環一次;從新刷新一下prod_head和prod_next 以及prod.head的值 ;
- for (i = 0; likely(i < n); i++)
- r->ring[(prod_head + i) & mask] = obj_table[i];
- rte_wmb();
執行產品寫入操做;
寫入操做完成以後,如是單生產者應該是直接修改生產者尾部索引,將其日後順延n個,但目前是多生產者操做;是怎樣實現的呢?
- while (unlikely(r->prod.tail != prod_head))
- rte_pause();
-
- r->prod.tail = prod_next;
這也先進行判斷,判斷當前的生產者尾部索引是否還等於,存儲在臨時變量中的生產者頭索引,
若是不等於,說明,有其餘的線程還在執行,並且應該是在它以前進行存儲,還沒來得及更新prod.tail;等其餘的生產者更新tail後,就會使得prod.tail==prod_head;
以後再更新,prod.tail 日後挪動n個,最好實現 prod.tail==prod.head==prod_next==prod_head+n;
五、ring的多消費者產品出列
多個消費者同時取產品是在__rte_ring_mc_do_dequeue()函數中實現;定義在dpdk/lib/librte_ring/rte_ring.h文件中;
-
- do {
-
- n = max;
-
- cons_head = r->cons.head;
- prod_tail = r->prod.tail;
- ...................
-
- cons_next = cons_head + n;
- success = rte_atomic32_cmpset(&r->cons.head, cons_head,
- cons_next);
- } while (unlikely(success == 0));
和多生產者同樣,在外面多包含了一次do while循環,防止多消費者操做發生競爭;
在循環中先將消費者的頭索引和生產者的爲索引賦值給臨時變量;讓後判斷有多少剩餘的產品在循環隊列,
若有n個產品,就將臨時變量cons_next 日後挪動n個,而後判斷目前的消費者頭索引是否還等於剛纔的保存在臨時變量cons_head 中的值,如相等,說明沒有發生競爭,就將cons_next賦值給
消費者的頭索引 r->cons.head,如不相等,就須要從新作一次do while循環;
- rte_rmb();
- for (i = 0; likely(i < n); i++) {
- obj_table[i] = r->ring[(cons_head + i) & mask];
- }
在成功更新消費者頭索引後,執行讀取產品操做,這並無大的數據拷貝操做,只是進行指針的從新賦值操做;
- while (unlikely(r->cons.tail != cons_head))
- rte_pause();
-
- __RING_STAT_ADD(r, deq_success, n);
- r->cons.tail = cons_next;
讀取完成後,就要更新消費者的尾部索引;
爲了不競爭,就要判是否有其餘的消費者在更新消費者尾部索引;若是目前的消費者尾部索引不等於剛纔保存的在臨時變量cons_head 的值,就要等待其餘消費者修改這個尾部索引;
如相等,機能夠將當前消費者的尾部索引日後挪動n個索引值了,
實現 r->cons.tail=r->cons.head=cons_next=cons_head+n;
六、ring的其餘斷定函數
- rte_ring_lookup("test_ring_basic_ex")
驗證以test_ring_basic_ex 爲名的ring是否建立成功;
判斷ring是否爲空;
判斷ring是否已經滿;
判斷當前ring還有多少剩餘存儲空間;
一、前言
最近工做比較忙,加班較多,天天晚上回到家10點多了。我不知道本身還能堅持多久,既然選擇了就要作到最好。寫博客的少了。總以爲少了點什麼,須要繼續學習。今天繼續上個開篇寫,介紹單生產者單消費者模型的隊列。根據寫入隊列的內容是定長仍是變長,分爲單生產者單消費者定長隊列和單生產者單消費者變長隊列兩種。單生產者單消費者模型的隊列操做過程是不須要進行加鎖的。生產者經過寫索引控制入隊操做,消費者經過讀索引控制出隊列操做。兩者相互之間對索引是獨享,不存在競爭關係。以下圖所示:
二、單生產者單消費者定長隊列
這種隊列要求每次入隊和出隊的內容是定長的,即生產者寫入隊列和消費者讀取隊列的內容大小事相同的。linux內核中的kfifo就是這種隊列,提供了讀和寫兩個索引。單生產者單消費者隊列數據結構定義以下所示:
typedef struct { uint32_t r_index; /*讀指針*/ uint32_t w_index; /*寫指針*/ uint32_t size; /*緩衝區大小*/ char *buff[0]; /*緩衝區起始地址*/ }ring_buff_st;
爲了方便計算位置,設置隊列的大小爲2的次冪。這樣能夠將以前的取餘操做轉換爲位操做,即r_index = r_index % size 與 r_index = r_index & (size -1)等價。位操做很是快,充分利用了二進制的特徵。
(1)隊列初始狀態,讀寫索引相等,此時隊列爲空。
(2)寫入隊列
寫操做即進行入隊操做,入隊有三種場景,
2.1 寫索引大於等於讀索引
2.2寫索引小於讀索引
2.3.寫索引後不夠寫入一個
(3)讀取隊列
讀隊列分爲三種場景
3.1寫索引大於等於讀索引
3.2寫索引小於讀索引
3.3.讀索引後面不夠一個
三、單生產者單消費者變長隊列
有些時候生產者每次寫入的數據長度是不肯定的,致使寫入隊列的數據時變長的。這樣爲了充分利用隊列,須要增長一個結束索引,保證隊列末尾至少可以寫入一個數據。變長隊列數據結構定義以下:
typedef struct { uint32_t r_index; /*讀指針*/ uint32_t w_index; /*寫指針*/ uint32_t e_index; /*隊列結束指針*/ uint32_t size; /*緩衝區大小*/ char *buff[0]; /*緩衝區起始地址*/ }ring_buff_st;