併發環境下最經常使用的同步手段是互斥鎖和讀寫鎖,例如pthread_mutex和pthread_readwrite_lock,經常使用的範式爲:html
void ConcurrencyOperation() { mutex.lock(); // do something mutex.unlock(); }
這種方法的優勢是:node
缺點是:算法
無鎖編程(嚴格來說是非阻塞編程)能夠分爲lock free和wait-free兩種,下面是對它們的簡單描述:shell
本文提到的無鎖單指lock free。編程
常見的lock free編程通常是基於CAS(Compare And Swap)操做:安全
CAS(void *ptr, Any oldValue, Any newValue);
即查看內存地址ptr處的值,若是爲oldValue則將其改成newValue,並返回true,不然返回false。X86平臺上的CAS操做通常是經過CPU的CMPXCHG指令來完成的。CPU在執行此指令時會首先鎖住CPU總線,禁止其它核心對內存的訪問,而後再查看或修改*ptr的值。簡單的說CAS利用了CPU的硬件鎖來實現對共享資源的串行使用。它的優勢是:數據結構
缺點是:併發
而在性能層面上,CAS與mutex/readwrite lock各有千秋,簡述以下:svg
單向鏈表實現的核心就是insert函數,這裏咱們用兩個版本的insert函數來進行簡單的演示,使用的CAS操做爲GCC提供的__sync_compare_and_swap函數。函數
首先是無序的insert操做,即將新結點插入到指定結點的後面。
void insert(Node *prev, Node *node) { while (true) { node->next = prev->next; if (__sync_compare_and_swap(&prev->next, node->next, node)) { return; } } }
代碼分析:
而後是有序的insert操做,即保證prev<= node <= next。
void insert(Node *prev, Node *node) { while (true) { Node *next = prev->next; while (next != NULL && next->item < node->item) { prev = next; next = prev->next; } node->next = next; if (__sync_compare_and_swap(&prev->next, next, node)) { return; } } }
這段代碼相比上一版本多了一個next變量。若是去掉next變量,那麼代碼就是下面的樣子。
void insert(Node *prev, Node *node) { while (true) { while (prev->next != NULL && prev->next->item < node->item) { prev = prev->next; } node->next = prev->next; if (__sync_compare_and_swap(&prev->next, node->next, node)) { return; } } }
上面的代碼有着很嚴重的安全隱患:prev是共享資源,所以每一個prev->next的值不必定是相等的!解決辦法就是用一個局部變量來保存某個時刻prev的值,從而保證咱們在不一樣地方進行比較的結點是一致的。
目前經常使用的key-value數據結構有三種:Hash表、紅黑樹、SkipList,它們各自有着不一樣的優缺點(不考慮刪除操做):
若是要實現一個key-value結構,需求的功能有插入、查找、迭代、修改,那麼首先Hash表就不是很適合了,由於迭代的時間複雜度比較高;而紅黑樹的插入極可能會涉及多個結點的旋轉、變色操做,所以須要在外層加鎖,這無形中下降了它可能的併發度。而SkipList底層是用鏈表實現的,能夠實現爲lock free,同時它還有着不錯的性能(單線程下只比紅黑樹略慢),很是適合用來實現咱們需求的那種key-value結構。LevelDB、Reddis的底層存儲結構就是用的SkipList。
那麼,SkipList是什麼呢?它由多層有序鏈表組成,每層鏈表的結點數量都是上一層的X倍,而它的插入和查找操做都從頂層開始進行。
(圖片取自wiki)
從上圖能夠很容易看出查找的方式:
在SkipList中,結點層數很是關鍵,若是各個結點的層數均勻分佈,那麼插入與查找的效率就會比較高。爲了實現這一目的,SkipList中每一個結點的層數是在插入前隨機算出來的,其基本原理就是令結點在i層的機率是i+1層的X倍,代碼以下:
int RandLevel(int X, int maxLevel) { int r = rand(); int level = 1; for (int j = X; r < RAND_MAX / j && level < maxLevel; ++level, j *= X) continue; return level; }
插入新結點的過程與查找很相似,這裏咱們假設鏈表中的各結點不容許重複:
能夠看出,插入操做的1-3步是單純的讀操做,只有第4步纔是對共享資源的寫操做。而第4步的插入實質上就是有序鏈表的插入操做,咱們在前面已經簡述瞭如何用CAS實現它。所以,只要保證插入順序是從底層向上依次插入,那麼就能夠將SkipList實現爲lock free。插入順序從底向上進行的緣由以下。
N個插入操做確定須要至少N次CAS,而任意一個CAS成功後就意味着新結點已經成爲了SkipList的一部分,變成了共享資源,則新結點就須要遵循其它結點的原則:每一個結點都同時存在於1-lv層。容易看出,只有從底層向上插入才能知足這一條件。
多個CAS操做自己沒有原子性,即在N次插入沒有完成前,新結點會表現出必定的不一致性,具體來講就是多個線程前後訪問新結點時,看到的它的層數並不相同。這種不一致性會比較輕微的影響SkipList的性能,而不會影響它的正確性。
SkipList的插入代碼以下:
void Insert(Node *node) { node->level = RandLevel(2, MAX_LEVEL); InsertInternal(head, node->level, node); } Node *InsertInternal(Node *prev, int lv, Node *node) { Node *next = prev->next[lv]; while (next != NULL && next->item < node->item) { prev = next; next = prev->next[lv]; } if (next == NULL || next->item > node->item) { if (lv != 0) { if (InsertInternal(prev, lv - 1, node) != NULL) { ListInsert(prev, node, lv); } } } else if (next->item == node->item) { return NULL; } return node; }}
其中ListInsert就是對前面有序鏈表插入的一個簡單改寫。整個插入過程遞歸實現,從而知足了插入順序要從底向上的要求。
在設計無鎖SkipList時,不光須要咱們將顯式的鎖用CAS替換掉,還須要儘可能避免一些隱式的鎖,以及一些非線程安全的函數。
本文只是對無鎖SkipList設計的一個簡單回顧,不包括詳細的實現代碼。由於還不肯定本身設計的有沒有紕漏,還須要認真學習一下LevelDB和Reddis中的SkipList代碼。
http://en.wikipedia.org/wiki/Skip_list
http://www.myexception.cn/ai/972131.html
http://www.seflerzhou.net/post-6.html