使用CAS實現無鎖的SkipList

無鎖

併發環境下最經常使用的同步手段是互斥鎖和讀寫鎖,例如pthread_mutex和pthread_readwrite_lock,經常使用的範式爲:html

void ConcurrencyOperation() {
	mutex.lock();
	// do something
	mutex.unlock();
}

這種方法的優勢是:node

  1. 編程模型簡單,若是當心控制上鎖順序,通常來講不會有死鎖的問題;
  2. 能夠經過調節鎖的粒度來調節性能。

缺點是:算法

  1. 全部基於鎖的算法都有死鎖的可能;
  2. 上鎖和解鎖時進程要從用戶態切換到內核態,並可能伴隨有線程的調度、上下文切換等,開銷比較重;
  3. 對共享數據的讀與寫之間會有互斥。

無鎖編程(嚴格來說是非阻塞編程)能夠分爲lock free和wait-free兩種,下面是對它們的簡單描述:shell

  • lock free:鎖無關,一個鎖無關的程序可以確保它全部線程中至少有一個可以繼續往下執行。這意味着有些線程可能會被任意的延遲,然而在每個步驟中至少有一個線程可以執行下去。所以這個系統做爲一個總體老是在前進的,儘管有些線程的進度可能沒有其它線程走的快。
  • wait free:等待無關,一個等待無關的程序能夠在有限步以內結束,而無論其它線程的相對執行速度如何。
  • lock based:基於鎖,基於鎖的程序沒法提供上面的任何保證,任一線程持有了某互斥體並處於等待狀態,那麼其它想要獲取贊成互斥體的線程只有等待,全部基於鎖的算法沒法擺脫死鎖的陰影。

本文提到的無鎖單指lock free。編程

lock free與CAS

常見的lock free編程通常是基於CAS(Compare And Swap)操做:安全

CAS(void *ptr, Any oldValue, Any newValue);

即查看內存地址ptr處的值,若是爲oldValue則將其改成newValue,並返回true,不然返回false。X86平臺上的CAS操做通常是經過CPU的CMPXCHG指令來完成的。CPU在執行此指令時會首先鎖住CPU總線,禁止其它核心對內存的訪問,而後再查看或修改*ptr的值。簡單的說CAS利用了CPU的硬件鎖來實現對共享資源的串行使用。它的優勢是:數據結構

  1. 開銷較小:不須要進入內核,不須要切換線程;
  2. 沒有死鎖:總線鎖最長持續爲一次read+write的時間;
  3. 只有寫操做須要使用CAS,讀操做與串行代碼徹底相同,可實現讀寫不互斥。

缺點是:併發

  1. 編程很是複雜,兩行代碼之間可能發生任何事,不少常識性的假設都不成立。
  2. CAS模型覆蓋的狀況很是少,沒法用CAS實現原子的複數操做。

而在性能層面上,CAS與mutex/readwrite lock各有千秋,簡述以下:svg

  1. 單線程下CAS的開銷大約爲10次加法操做,mutex的上鎖+解鎖大約爲20次加法操做,而readwrite lock的開銷則更大一些。
  2. CAS的性能爲固定值,而mutex則能夠經過改變臨界區的大小來調節性能;
  3. 若是臨界區中真正的修改操做只佔一小部分,那麼用CAS能夠得到更大的併發度。
  4. 多核CPU中線程調度成本較高,此時更適合用CAS。

使用CAS實現無鎖單向鏈表

單向鏈表實現的核心就是insert函數,這裏咱們用兩個版本的insert函數來進行簡單的演示,使用的CAS操做爲GCC提供的__sync_compare_and_swap函數。函數

首先是無序的insert操做,即將新結點插入到指定結點的後面。

void insert(Node *prev, Node *node) {
	while (true) {
		node->next = prev->next;
		if (__sync_compare_and_swap(&prev->next, node->next, node)) {
			return;
		}
	}
}

代碼分析:

  1. 首先修改node->next,此時node尚未完成插入,只能被本線程看到,所以這個修改能夠直接進行。
  2. 在if中嘗試修改prev->next,若是失敗,則代表prev->next剛剛被其它線程修改了,則重複這一過程。

而後是有序的insert操做,即保證prev<= node <= next。

void insert(Node *prev, Node *node) {
	while (true) {
		Node *next = prev->next;
		while (next != NULL && next->item < node->item) {
			prev = next;
			next = prev->next;
		}
		node->next = next;
		if (__sync_compare_and_swap(&prev->next, next, node)) {
			return;
		}
	}
}

這段代碼相比上一版本多了一個next變量。若是去掉next變量,那麼代碼就是下面的樣子。

void insert(Node *prev, Node *node) {
    while (true) {
        while (prev->next != NULL && prev->next->item < node->item) {
            prev = prev->next;
        }
        node->next = prev->next;
        if (__sync_compare_and_swap(&prev->next, node->next, node)) {
            return;
        }
    }
}

上面的代碼有着很嚴重的安全隱患:prev是共享資源,所以每一個prev->next的值不必定是相等的!解決辦法就是用一個局部變量來保存某個時刻prev的值,從而保證咱們在不一樣地方進行比較的結點是一致的。

Key-Value數據結構

目前經常使用的key-value數據結構有三種:Hash表、紅黑樹、SkipList,它們各自有着不一樣的優缺點(不考慮刪除操做):

  1. Hash表:插入、查找最快,爲O(1);如使用鏈表實現則可實現無鎖;數據有序化須要顯式的排序操做。
  2. 紅黑樹:插入、查找爲O(logn),但常數項較小;無鎖實現的複雜性很高,通常須要加鎖;數據自然有序。
  3. SkipList:插入、查找爲O(logn),但常數項比紅黑樹要大;底層結構爲鏈表,可無鎖實現;數據自然有序。

若是要實現一個key-value結構,需求的功能有插入、查找、迭代、修改,那麼首先Hash表就不是很適合了,由於迭代的時間複雜度比較高;而紅黑樹的插入極可能會涉及多個結點的旋轉、變色操做,所以須要在外層加鎖,這無形中下降了它可能的併發度。而SkipList底層是用鏈表實現的,能夠實現爲lock free,同時它還有着不錯的性能(單線程下只比紅黑樹略慢),很是適合用來實現咱們需求的那種key-value結構。LevelDB、Reddis的底層存儲結構就是用的SkipList。

SkipList

那麼,SkipList是什麼呢?它由多層有序鏈表組成,每層鏈表的結點數量都是上一層的X倍,而它的插入和查找操做都從頂層開始進行。

470px-Skip_list.svg

 

(圖片取自wiki)

從上圖能夠很容易看出查找的方式:

  1. 從頂層的頭結點出發;
  2. 若下一結點爲目標值,則返回結果;
  3. 若下一結點小於目標值,則前進;
  4. 若下一結點大於目標值或爲NULL,則:
    1. 若當前處於最底層,則返回NULL;
    2. 降低一層,重複2-4步。

在SkipList中,結點層數很是關鍵,若是各個結點的層數均勻分佈,那麼插入與查找的效率就會比較高。爲了實現這一目的,SkipList中每一個結點的層數是在插入前隨機算出來的,其基本原理就是令結點在i層的機率是i+1層的X倍,代碼以下:

int RandLevel(int X, int maxLevel) {
	int r = rand();
	int level = 1;
	for (int j = X; r < RAND_MAX / j && level < maxLevel; ++level, j *= X)
		continue;
	return level;
}

插入新結點的過程與查找很相似,這裏咱們假設鏈表中的各結點不容許重複:

  1. 計算出新結點的層數lv;
  2. 從lv層的頭結點出發,開始查找過程;
  3. 若是找到目標值,返回NULL;
  4. 若是當前處於最底層,則建立新結點,並依次將新結點插入到1-lv層;

能夠看出,插入操做的1-3步是單純的讀操做,只有第4步纔是對共享資源的寫操做。而第4步的插入實質上就是有序鏈表的插入操做,咱們在前面已經簡述瞭如何用CAS實現它。所以,只要保證插入順序是從底層向上依次插入,那麼就能夠將SkipList實現爲lock free。插入順序從底向上進行的緣由以下。

N個插入操做確定須要至少N次CAS,而任意一個CAS成功後就意味着新結點已經成爲了SkipList的一部分,變成了共享資源,則新結點就須要遵循其它結點的原則:每一個結點都同時存在於1-lv層。容易看出,只有從底層向上插入才能知足這一條件。

多個CAS操做自己沒有原子性,即在N次插入沒有完成前,新結點會表現出必定的不一致性,具體來講就是多個線程前後訪問新結點時,看到的它的層數並不相同。這種不一致性會比較輕微的影響SkipList的性能,而不會影響它的正確性。

SkipList的插入代碼以下:

void Insert(Node *node) {
	node->level = RandLevel(2, MAX_LEVEL);
	InsertInternal(head, node->level, node);
}
Node *InsertInternal(Node *prev, int lv, Node *node) {
	Node *next = prev->next[lv];
	while (next != NULL && next->item < node->item) {
		prev = next;
		next = prev->next[lv];
	}
	if (next == NULL || next->item > node->item) {
		if (lv != 0) {
			if (InsertInternal(prev, lv - 1, node) != NULL) {
				ListInsert(prev, node, lv);
			}
		}
	} else if (next->item == node->item) {
		return NULL;
	}
	return node;
}}

其中ListInsert就是對前面有序鏈表插入的一個簡單改寫。整個插入過程遞歸實現,從而知足了插入順序要從底向上的要求。

更多思考

在設計無鎖SkipList時,不光須要咱們將顯式的鎖用CAS替換掉,還須要儘可能避免一些隱式的鎖,以及一些非線程安全的函數。

  1. RandLevel中的rand()是非線程安全的函數,須要替換爲線程安全的版本(如非標準庫的rand_r()),或是由各線程本身來保存rand使用的seed。
  2. 在建立SkipList的時候須要指定一個MAX_LEVEL,即頭結點的層數,這個值在此SkipList生命期中固定不變。通常來講12-20層都是能夠接受的。
  3. 全局new內部會加鎖,若是這裏有瓶頸的話須要換用自定義的內存池。
  4. 若是使用了內存池,那麼必須確保內存池自己是無鎖且支持併發寫的。不然就只能將SkipList改寫爲單寫多讀版本。
  5. 在計算新結點的層數時,須要傳入一個maxLevel,這裏有兩種常見作法:能夠傳入SkipList的最大層數MAX_LEVEL,也能夠傳入當前最大層數topLevel + 1。兩種作法的優缺點爲:
    1. 傳入MAX_LEVEL可能在SkipList中結點數量較少時就達到很高的層數,下降了此時插入與查找的性能;但若是有序插入多個新結點,能保證各結點的層數均勻分佈。
    2. 傳入topLevel + 1能夠保證在結點數較少時不太可能出現很高的層數,但在有序插入多個新結點時,可能致使前面插入結點的層數總體要低於後面插入的結點。
  6. SkipList的修改操做也須要是lock free的,所以須要將Node中的item改成指針,在修改某結點值的時候用CAS來替換掉舊指針,並在完成後刪除。
  7. SkipList也能夠在最底層加入反向指針prev,這樣就能直接O(1)的反向迭代。帶來的問題是更大的不一致性——在插入未完成時兩個線程分別正向和反向迭代,看到的SkipList是不一致的。但能夠保證SkipList在插入完成後的最終狀態是一致的。

本文只是對無鎖SkipList設計的一個簡單回顧,不包括詳細的實現代碼。由於還不肯定本身設計的有沒有紕漏,還須要認真學習一下LevelDB和Reddis中的SkipList代碼。

參考:

http://en.wikipedia.org/wiki/Skip_list

http://www.myexception.cn/ai/972131.html

http://www.seflerzhou.net/post-6.html

http://coolshell.cn/articles/8239.html

http://blog.csdn.net/sunmenggmail/article/details/12648465

相關文章
相關標籤/搜索