【前言】node
隊列是衆多數據結構中最多見的一種之一。曾經有人和我說過這麼一句話,叫作「程序等於數據結構+算法」。所以在設計模塊、寫代碼時,隊列經常做爲一個很常見的結構出如今模塊設計中。DPDK不只是一個加速網絡IO的框架,其內部還提供衆多的功能組件,rte_ring就是DPDK內部提供的一種無鎖隊列,本篇文章將從使用的角度出發闡述DPDK的ring怎麼用?在怎麼用的角度上再來闡述ring無鎖的實現,最後將探討實現無鎖隊列的關鍵以及在不通平臺上如何實現,本文將會探討x86平臺下無鎖隊列的實現。linux
權當拋磚引玉,有問題請留言指正,感激涕零。算法
【場景】編程
程序等於數據結構+算法。可是場景仍然是最重要的,由於場景取決於咱們到底「用不用」某個技術或者是某個組件,亦或是某種數據結構。安全
作數據面的都應該見過如圖1的這種線程模型。網絡
圖1.常見的數據面線程模型數據結構
圖1是一種常見的數據面模型,好比linux基金會的FD.IO(VPP)採用的就是這種線程模型,這種線程模型下分工明確:架構
那麼如今有一種需求,fwd線程須要將一些信息上傳至控制面進程那麼最好的作法是什麼呢?這裏一般有不少種實現方式,可是均和本篇文章的主要討論對象無關,所以很少作討論。框架
其中一種常見的手段就是經過ring,還有一種場景就是DPDK的multiprocess場景,也一樣能夠經過ring來說數據包分發到其餘process中。如圖2這種狀況socket
圖2.另一種常見的場景
這種場景是典型的「僧多肉少」型,就是「processer的數量多於rx隊列數量」,那麼這種場景下注定有一些processer是沒法接管網卡隊列的,可是我還想發揮這些processer的處理能力,怎麼辦?
那麼常見的方案就是在接管到rx隊列的processer將數據包從rx queue上收上來後,計算數據包的rss,而後將數據包「儘可能均勻」的經過ring來發送到那些沒有分配到rx queue的fwd thread上。其實也不光是雲計算的數據面場景,在不少場景下咱們都須要用到隊列,由於隊列是一個再基礎不過的數據結構,所以咱們拿DPDK的ring出發,最終闡述無鎖隊列的常見實現方式。
【DPDK ring 從使用出發】
我我的以爲任何一種技術,出發點確定是「先用再分析」,說白了就是對一種技術或對某一個模塊的直觀印象都不是直接分析代碼就能獲得的,都是「先跑起來,玩一下,看看狀況」獲得的第一印象,所以這裏仍是會先從使用的角度出發,先會用再分析實現。若是有用過DPDK Ring,那麼本節能夠直接跳過,直接看後面的分析章節。
DPDK的ring代碼主要以lib的形式集成在DPDK源代碼中,具體代碼位置爲:DPDK根目錄/lib/librte_ring目錄中。如下代碼均已DPDK 19.11版本做爲參照(其餘版本基本都是大同小異)。
先介紹一下主要的函數接口:
struct rte_ring * rte_ring_create(const char *name, unsigned count, int socket_id, unsigned flags) //建立dpdk的rte_ring void rte_ring_free(struct rte_ring *r) //釋放已經建立的dpdk的rte_ring struct rte_ring * rte_ring_lookup(const char *name) //去尋找一個已經建立好的dpdk的rte_ring static __rte_always_inline unsigned int __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, unsigned int n, enum rte_ring_queue_behavior behavior, unsigned int is_sp, unsigned int *free_space) //此函數爲內部方法,全部入隊函數都是此函數的上層封裝 static __rte_always_inline unsigned int __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, unsigned int n, enum rte_ring_queue_behavior behavior, unsigned int is_sc, unsigned int *available) //此函數爲內部方法,全部出隊函數都是此函數的上層封裝 static __rte_always_inline unsigned int rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) //此函數爲批量入隊函數,爲多生產者安全(multi producer) static __rte_always_inline unsigned int rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) //此函數爲批量入隊函數,爲單生產者安全(single producer) static __rte_always_inline unsigned int rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) //此函數爲批量入隊函數,具體安全性質取決於建立隊列時的標誌(flags) static __rte_always_inline unsigned int rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) //此函數爲批量出隊函數,爲多消費者安全(multi consumer) static __rte_always_inline unsigned int rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) //此函數爲批量出隊函數,爲單消費者安全(single consumer) static __rte_always_inline unsigned int rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) //此函數爲批量出隊函數,具體安全性質取決於建立隊列時的標誌(flags) static inline unsigned rte_ring_count(const struct rte_ring *r) //此函數用於查看隊列中元素的數量
能夠看到上述函數列表(只是表明性的一部分)基本分爲三類接口
圖3.出隊的操做函數
實際上若是讓咱們本身設計一個隊列,基本上也逃離不出去這些接口,而且根據圖3能夠看出,全部的出隊函數基本都是基於__rte_ring_do_enqueue的封裝而已。
那麼實際使用起來的步驟能夠基本能夠爲如下流程圖描述
圖3.dpdk ring常見的使用流程
使用流程仍是很是簡單的,由於隊列自己做爲一個常見的數據結構使用起來並不複雜,具體使用的例子能夠看dpdk的example/multiprocess/中的例子。
可是使用的時候有幾個地方須要注意:
能夠看到dpdk的rte_ring使用上仍是蠻簡單的,所以接下來就從源碼出發解析一下dpdk的rte_ring的無鎖實現。
【DPDK ring 的無鎖實現】
先說結論:
無鎖的實現依賴於一個彙編指令: cmpxchg
翻譯過來就是compare and change
咱們先看看dpdk的ring是如何實現無鎖的,咱們拿__rte_ring_do_enqueue和__rte_ring_do_dequeue這兩個函數開刀,這兩個函數分別是入隊和出隊的底層實現函數,其他全部的入隊和出隊函數都是基於這兩個函數進行了上層封裝而已。
先想一下,在多生產者和多消費者場景下,分別要應付哪些問題?
咱們先看第一個問題和第二個問題是如何實現的,可是在分析實際函數的實現以前,咱們要先分析一下rte_ring。
struct rte_ring { char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; // ring的名稱,lookup的時候就是根據名稱進行查找對應的ring int flags; // 標記,用來描述隊列是單/多生產者仍是單/多消費者安全 const struct rte_memzone *memzone; // 所屬的memzone,memzone是dpdk內存管理底層的數據結構 uint32_t size; // 隊列長,爲2^n。若是flags爲RING_F_EXACT_SZ // 隊列size爲初始化時隊列長度的向上取2的n次冪,例如若是爲 // 7,那麼向上取最近的2^n冪的數爲8.若是flags不爲 // RING_F_EXACT_SZ,那麼初始化隊列的時候隊列長必須爲2^n冪 uint32_t mask; // 掩碼,爲隊列長 - 1,用來計算位置的時候取餘用 uint32_t capacity; // 隊列容量,通常不等於隊列長度,把隊列容量理解爲實際能夠 // 使用的元素個數便可。例如初始化時count爲7而且指定標誌爲 // RING_F_EXACT_SZ,那麼count最後爲8,可是capacity爲7,由於 // 8是向上取2^n冪取出來的,實際上仍然是建立時所需的個數,8. char pad0 __rte_cache_aligned; // 填充,考慮到性能,要使用填充法保證cache line struct rte_ring_headtail prod __rte_cache_aligned; // 生產者位置,裏面有一個生產者頭,即prod.head,還有一個生 // 產者尾,即prod.tail。prod.head表明着下一次生產時的起始 // 生產位置。prod.tail表明消費者能夠消費的位置界限,到達 // prod.tail後就沒法繼續消費,一般狀況下生產完成後, // prod.tail = prod.head,意味着剛生產的元素皆能夠被消費 char pad1 __rte_cache_aligned; struct rte_ring_headtail cons __rte_cache_aligned; // 消費者位置,裏面有一個消費者頭,即cons.head,還有一個消 // 費者尾,即cons.tail。cons.head表明着下一次消費時的起始 // 消費位置。cons.tail表明生產者能夠生產的位置界限,到達 // cons.tail後就沒法繼續生產,一般狀況下消費完成後, // cons.tail = cons.head,意味着剛消費的位置皆能夠被生產 char pad2 __rte_cache_aligned; /**< empty cache line */ };
上述數據結構爲rte_ring的數據結構,rte_ring就表明着一條ring,是ring的抽象。其中重要的是兩個地方,一個是prod,一個是cons,前者表明生產者,後者表明消費者,裏面分別有兩個標記,關於標記的用途已經在上述代碼的註釋中闡述。
可是還有一點,ring中存放的數據在哪?dpdk的ring中存放的數據位置能夠見圖4.
圖4.dpdk ring的內存分佈圖
能夠看到,rte_ring的data中存放的是指針(就由於是指針才能利用cmpxchg實現「無鎖」),而且data分佈在struct rte_ring緊鄰的空間中(圖中青色的內存塊)。在分析實際的函數前,再看幾個流程圖,結合rte_ring中的數據結構來看,理解會更加深入(固然這部分的內容在《深刻淺出dpdk》一書中的4.4.2節也有描述)。
1.入隊操做,以單生產者單消費者(多生產者和多消費者基本差很少)爲例。初始狀態爲圖5所示。初始狀態中隊列中有4個元素,分別是obj一、obj二、obj三、obj4.
圖5.初始狀態
2.第一步,新元素入隊,先偏移prod.head到新的生產者頭位置,例如如今位置爲5,若生產元素的個數爲2,那麼新位置即爲index = 7,可是因爲涉及到多生產者,其中多生產者無鎖的奧祕就在這一步,所以先佔位置,如圖6。
圖6.入隊的第一步操做
3.第二步,元素寫入。
圖7.入隊的第二步操做
4.第三步,更新生產者的尾指針,也就是prod.tail,由於第二步只是將元素寫入而已,涉及生產-消費的流程,還要告訴消費者「能夠消費」,prod.tail的做用即是如此,因此須要更新,可是假設當前消費者開始消費,那麼流程便如圖7所示,消費者的頭標記只能到達生產者尾標記的位置。
圖8.出隊的第一步操做
5.第四步,消費者開始消費元素,此時生產者的tail標記開始更新。
圖9.出隊的第二步操做
6.第五步,與生產者相同,消費者消費數據後,被消費後的空間不能當即用於生產,還須要更新tail標記才能夠(cons.tail)
圖10.生產-消費後的最終狀態
接下來,理解了上述生產-消費的流程後,既能夠分析具體的函數了,接下來將站在生產者的視角進行分析代碼實現(消費者與生產者幾乎相同),拿生產者的入隊函數__rte_ring_do_enqueue來分析。
static __rte_always_inline unsigned int __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, unsigned int n, enum rte_ring_queue_behavior behavior, unsigned int is_sp, unsigned int *free_space) { uint32_t prod_head, prod_next; uint32_t free_entries; //第一步,先偏移頭指針,搶佔生產位置 n = __rte_ring_move_prod_head(r, is_sp, n, behavior, &prod_head, &prod_next, &free_entries); if (n == 0) goto end; //第二步,塞數據 ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); //第三部,更新尾指針,讓消費者能夠消費 update_tail(&r->prod, prod_head, prod_next, is_sp, 1); end: if (free_space != NULL) *free_space = free_entries - n; return n; }
上述代碼是一個典型的「三步走」。
那麼很顯然,第一步就是對付第一個問題的,即在多生產者下如何讓生產者能夠順利生產而且多個生產者之間不會互相沖突,因此須要分析一下__rte_ring_move_prod_head函數。
static __rte_always_inline unsigned int __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, unsigned int n, enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *new_head, uint32_t *free_entries) { const uint32_t capacity = r->capacity; unsigned int max = n; int success; do { //1.先肯定生產者要生產多少個元素 n = max; //2.拿到如今生產者的head位置,也就是即將生產的位置 *old_head = r->prod.head; //內存屏障 rte_smp_rmb(); //3.計算剩餘的空間 *free_entries = (capacity + r->cons.tail - *old_head); //4.比較生產的元素個數和剩餘空間 if (unlikely(n > *free_entries)) n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *free_entries; if (n == 0) return 0; //5.計算生產後的新位置 *new_head = *old_head + n; if (is_sp) r->prod.head = *new_head, success = 1; else //6.若是是多生產者的話調用cpmset函數實現生產位置搶佔 success = rte_atomic32_cmpset(&r->prod.head, *old_head, *new_head); } while (unlikely(success == 0)); return n; }
上述函數邏輯是一個很是簡單的實現邏輯,而關鍵在於第6點和do while循環,cmpset函數是什麼?又是如何實現的生產位置搶佔呢?
1 static inline int 2 rte_atomic32_cmpset(volatile uint32_t *dst, uint32_t exp, uint32_t src) 3 { 4 uint8_t res; 5 6 asm volatile( 7 MPLOCKED 8 "cmpxchgl %[src], %[dst];" 9 "sete %[res];" 10 : [res] "=a" (res), /* output */ 11 [dst] "=m" (*dst) 12 : [src] "r" (src), /* input */ 13 "a" (exp), 14 "m" (*dst) 15 : "memory"); /* no-clobber list */ 16 return res; 17 }
上述cmpset爲x86體系下的實現,能夠看到,是一段GCC內聯的彙編指令,這段內聯的嵌入彙編指令由三個彙編指令構成,最核心的一個指令即是第8行的「cmpxchg」,這即是咱們最開始說的 "無鎖的實現依賴於cmpxchg指令",那麼這個指令到底是什麼意思呢?
cmpxchg指令的意思就是「compare and change」,即「比較並交換」。
舉個例子,若是A等於B,則將C賦值給A;若是A不等於B,則拒絕將C賦值給A。
根據這個特徵咱們能夠知道,在多生產者場景下,最擔憂的事情是什麼呢?最擔憂的事情即爲「前腳剛計算好生產位置(偏移),後腳還沒等寫入數據,結果就被另一個生產者把剛剛計算好的生產位置給佔了,結果本身沒得空間生產」,將這個場景結合剛纔的cmpxchg以後怎麼解決呢?
若是生產位置沒有變化(A等於B),那麼就將最新的生產位置(計算偏移後的生產位置)賦值給生產者指針;若是生產位置發生了變化(有其餘生產者也在生產),那麼就取消更新生產者指針
核心實現就是上面這句話。關於rte_atomic32_cmpset函數,下一章【x86的cas】中會詳細講解。
那麼頭指針偏移部分代碼的流程圖能夠總結以下:
那麼至此,第一個問題之「多生產者如何解決生產位置的問題獲得瞭解決」,那麼接下來就是第三個問題,「如何讓消費者能夠消費剛剛生產的數據?」
這個問題在「三步走」中的第三部中解決的。
static __rte_always_inline void update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, uint32_t single, uint32_t enqueue) { //1.內存屏障 if (enqueue) rte_smp_wmb(); else rte_smp_rmb(); //2.若是有其餘生產者生產數據,那麼須要等待其將數據生產完更新tail指針後,本生產者才能更新tail指針 if (!single) while (unlikely(ht->tail != old_val)) rte_pause(); //3.更新tail指針,更新的位置爲最新的生產位置,意味着剛剛生產的數據已經所有能夠被消費者消費 ht->tail = new_val; }
這裏面可能惟一會讓人產生些許疑惑的就是step 2.這裏有一個自旋鎖,自旋等待"ht->tail == old_val"條件的成立,這是爲何呢?想一下這樣的場景:
單生產者單消費者狀況下:生產數據成功後,應該講prod.tail指針前移至prod.head處,至關於告訴消費者隊列中的數據都是能夠消費的,可是若是此時是多生產者場景,因爲有多個生產者,prod.tail指針可能隨時發生變化,例如:
剛開始的時候,prod.head = prod.tail = 0,生產者A生產了3份數據,prod.head = 3而且prod.tail = 0,隨後生產者B生產了2份數據,prod.head = 5而且prod.tail = 0,那麼此時會知足「ht->tail == old_val」麼?不會,ht->tail = prod.tail = 0,而old_val的值卻爲生產元素前的prod.head的值,也就是3.那麼此時須要作的就是等待生產者A將3份數據徹底生產完,而且將prod.tail更新至3,那麼此時纔會知足「ht->tail == old_val」。說白了就是得等別的生產者徹底生產完才能生產。可是從最終結果而言,生產者A生產了3個元素,生產者B生產了2個元素,最終結果中,prod.tail = 5,也就是剛剛生產的5個元素能夠所有被消費者消費。
因此從上面的「__rte_ring_do_enqueue」函數能夠看出,想一想所謂的無鎖隊列真的實現了理想的「無鎖」麼?
「rte_ring_do_dequeue」的函數執行流程與「__rte_ring_do_enqueue」的流程基本一致,沒法後者爲生產者視角,而前者爲消費者視角,請讀者根據上述「隊列入隊」的分析過程自行分析「隊列出隊」。
【x86的CAS】
可能有的讀者在「無鎖」這個概念上知道「無鎖」的實現是一種"CAS"操做,那麼什麼纔是CAS操做呢?
CAS的全程爲「Compare And Swap」,意味比較並交換
「比較並交換」,這個概念和前一章中「cmpxchg」指令的含義基本一致。核心思想就是:
和預期結果比較,相同則賦值,不一樣則放棄
若是和預期不一樣,那麼我會一遍一遍的去嘗試,當沒有人和我競爭了,和預期結果天然就會「相同」,再回到以前的內聯彙編。
1 static inline int 2 rte_atomic32_cmpset(volatile uint32_t *dst, uint32_t exp, uint32_t src) 3 { 4 uint8_t res; 5 6 asm volatile( 7 MPLOCKED 8 "cmpxchgl %[src], %[dst];" 9 "sete %[res];" 10 : [res] "=a" (res), /* output */ 11 [dst] "=m" (*dst) 12 : [src] "r" (src), /* input */ 13 "a" (exp), 14 "m" (*dst) 15 : "memory"); /* no-clobber list */ 16 return res; 17 }
想讀懂這個函數首先須要先了解內聯彙編的正確寫法和格式。固然,接下來要說的內聯彙編格式爲intel格式。因爲涉及到內聯彙編的文章有許多,在這裏不會詳細介紹內聯彙編的格式和寫法,更多的會聚焦於此函數的實現。
內聯彙編的函數格式爲:
1 asm ( assembler template 2 : output operands /* optional */ 3 : input operands /* optional */ 4 : list of clobbered registers /* optional */ 5 );
很簡單,內聯彙編由4個部分組成:
那麼咱們接着回到rte_atomic32_cmpset函數的實現,line 7是一個x86架構下的「lock」指令指令前綴,注意「lock」其實本質上不是一個指令,而是一個指令前綴,也就是用來修飾接下來的指令,支隊接下來的指令有效力,而且修飾的指令必須是對內存有「讀-改-寫」三種操做的指令,就好比說cmpxchg指令就是。
#if RTE_MAX_LCORE == 1 #define MPLOCKED /**< No need to insert MP lock prefix. */ #else #define MPLOCKED "lock ; " /**< Insert MP lock prefix. */ #endif
在x86多核架構下,lock指令一般用來確保多核訪問cache line是具備排他性的(至關於一把鎖)。
第一個指令是cmpxchg,關於cmpxchg咱們前面已經大體講過此命令的做用。此命令的實際做用是:
比較源操做數和eax寄存器中的值,若是相同,則將目的操做數更新爲源操做數,而且將標誌寄存器中的ZF(zero flags)位置1;若是源操做數和eax寄存器中的值不通,則將源操做數寫入eax寄存器中,並將標誌寄存器中的ZF(zero flags)清0
那麼對照上面的場景,通常eax寄存器中存的值都是初始值,也就是尚未計算入隊偏移的初始值,因爲在計算入隊偏移操做時,其餘生產者可能也在進行計算入隊偏移,那麼就會起衝突,具體體現就是生產者頭指針發生變化,所以在cmpxchg指令中,再拿生產者頭指針和初始值進行比較,若是相同這說明如今沒有其餘生產者在更新,那麼源操做數(當前生產者頭指針)和eax寄存器中的值(事先備份的初始值)一定相同,此時則能夠安全的將目的操做數賦值至源操做數,也就是(prod.head = new_head);若是不一樣,這說明如今可能有其餘生產者在生產致使生產者頭指針發生變化(prod.head發生變化),那麼此時便不能更新源操做數(prod.head)。
第二個指令是sete,這個指令就很簡單了,就是單純的將標誌寄存器中的zf位的值賦值給目的操做數,也就是res。那就意味着若是cmpxchg執行交換成功,則zf位爲1,那麼通過sete設置後,res返回值也就是1;若是cmpxchg執行交換失敗,則zf爲0,那麼通過sete設置後,res的返回值也就是0.
那麼這個函數即是,若是cmpxchg成功,則函數返回1,若是cmpxchg失敗,則函數返回0,那麼根據函數的返回值,上層邏輯便知道更新生產者頭指針是否成功,成功直接返回便可;不成功怎麼辦呢?也很簡單,循環,我一次一次試(while循環),總會成功的。
能夠看到,CAS操做實現無鎖的本質上就是「比較」,比較什麼呢?這取決於咱們最擔憂什麼?那咱們最擔憂的是什麼呢?咱們最擔憂的無非就是
生產者的視角:我剛開始根據舊的生產者頭指針 + 生產的元素數量,計算出生產後的指針位置,結果在我計算的過程當中,因爲有其餘生產者干擾,致使舊的生產者頭指針已經發生了變化,那麼計算出的生產後的指針位置也是失效的。
消費者的視角:我剛開始根據舊的消費者頭指針 + 消費的元素數量,計算出消費後的指針位置,結果在我計算的過程當中,因爲有其餘消費者干擾,致使舊的消費者頭指針已經發生了變化,那麼計算出的消費後的指針位置也是失效的。
因此須要比較什麼呢?比較是「預期值」與「實際值」,預期值是咱們但願「舊的生產者頭指針不會發生變化」,那麼實際值即是「當前的生產者頭指針位置」,那麼我只須要比較二者即可以得知,是否有其餘生產者干擾,只有符合預期的狀況,我才能夠進行接下來的操做,也就是賦值。
【後續】