經過對併發的介紹,咱們看到了併發編程的一個最基本問題:因爲單處理器上的中斷(或者多個線程在多處理器上併發執行),一些咱們但願能原子執行的指令並不能正確運行。鎖(lock)就是用來解決這一問題最基本的方法。程序員在源代碼中加鎖,放在臨界區周圍,保證臨界區可以像單條原子指令同樣執行。node
下面是使用鎖的一個簡單示例:程序員
1 pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; 2 3 Pthread_mutex_lock(&lock); // wrapper for pthread_mutex_lock() 4 balance = balance + 1; 5 Pthread_mutex_unlock(&lock);
鎖就是一個變量,這個鎖變量保存了鎖在某一時刻的狀態。它要麼是可用的(available,或unlocked,或free),表示沒有線程持有鎖;要麼是被佔用的(acquired,或locked,或held),表示有一個線程持有鎖,正處於臨界區。咱們也能夠保存其餘的信息,好比持有鎖的線程,或請求獲取鎖的線程隊列,但這些信息會隱藏起來,鎖的使用者不會發現。編程
鎖通常只支持兩個操做:lock()和unlock()。調用lock()嘗試獲取鎖,若是沒有其餘線程持有鎖,該線程會得到鎖,進入臨界區,這個線程被稱爲鎖的持有者(owner)。若是另一個線程對相同的鎖變量調用lock(),由於鎖被另外一線程持有,該調用不會返回。這樣,當持有鎖的線程在臨界區時,其餘線程就沒法進入臨界區。數據結構
鎖的持有者一旦調用unlock(),鎖就變成可用了。若是沒有其餘等待線程(即沒有其餘線程調用過lock()並卡在那裏),鎖的狀態就變成可用了。若是有等待線程,其中一個會(最終)注意到(或收到通知)鎖狀態的變化,獲取該鎖,進入臨界區。多線程
顯然,咱們須要硬件和操做系統的幫助來實現一個可用的鎖。近些年來,各類計算機體系結構的指令集都增長了一些不一樣的硬件原語,咱們可使用它們來實現像鎖這樣的互斥原語。併發
在實現鎖以前,對於鎖是否能工做良好,應該事先設立一些標準。首先是鎖是否能完成它的基本任務,即提供互斥(mutual exclusion),是否可以阻止多個線程進入臨界區。app
第二是公平性(fairness)。當鎖可用時,是否每個競爭線程有公平的機會搶到鎖?是否有競爭鎖的線程會餓死(starve),一直沒法得到鎖?函數
最後是性能(performance),也即便用鎖以後增長的時間開銷。有幾種場景須要考慮:一種是隻有一個線程搶鎖、釋放鎖的開銷如何?另一種是一個CPU上多個線程競爭,最後一種是多個CPU、多個線程競爭時的性能。高併發
最先提供的互斥解決方案之一,就是在臨界區關閉中斷。這個解決方案是爲單處理器系統開發的。經過在進入臨界區以前關閉中斷(使用特殊的硬件指令),能夠保證臨界區的代碼不會被中斷,從而原子地執行。結束以後,咱們從新打開中斷,程序正常運行。性能
這個方法的主要優勢就是簡單,可是缺點不少。首先,這種方法要求咱們容許全部調用線程執行特權操做(打開關閉中斷),可是惡意程序可能會利用這點。例如一個惡意程序可能在它開始時就調用lock(),從而獨佔處理器。系統沒法從新得到控制,只能重啓系統。
第二,這種方案不支持多處理器。若是多個線程運行在不一樣的CPU上,每一個線程都試圖進入同一個臨界區,關閉中斷也沒有做用。
第三,關閉中斷致使中斷丟失,可能會致使嚴重的系統問題。假如磁盤設備完成了讀取請求,但CPU由於關閉了中斷錯失了這一信號,操做系統如何知道去喚醒等待的進程?
最後一個不過重要的緣由就是效率低。與正常指令執行相比,現代CPU對於關閉和打開中斷的代碼執行得較慢。
基於以上緣由,只在頗有限的狀況下用關閉中斷來實現互斥原語。
由於關閉中斷的方法沒法工做在多處理器上,因此係統設計者開始讓硬件支持鎖。最簡單的硬件支持是測試並設置指令(test-and-set instruction),也叫做原子交換(atomic exchange)。測試並設置指令的工做大體能夠用下面的C代碼來定義:
1 int TestAndSet(int *old_ptr, int new) { 2 int old = *old_ptr; // fetch old value at old_ptr 3 *old_ptr = new; // store 'new' into old_ptr 4 return old; // return the old value 5 }
它返回old_ptr指向的舊值,同時更新爲new的新值。固然,關鍵是這些代碼是原子地(atomically)執行。由於既能夠測試舊值,又能夠設置新值,因此咱們把這條指令叫做「測試並設置」。
爲了理解該指令如何構造一個可用的鎖,咱們首先嚐試實現一個不依賴它的鎖。
在第一次嘗試中,想法很簡單:用一個變量來標誌鎖是否被某些線程佔用。第一個線程進入臨界區,調用lock(),檢查標誌是否爲1,而後設置標誌爲1,代表線程持有該鎖。結束臨界區時,線程調用unlock(),清除標誌,表示鎖未被持有。
當第一個線程正處於臨界區時,若是另外一個線程調用lock(),它會在while循環中自旋等待(spin-wait),直到第一個線程調用unlock()清空標誌。而後等待的線程會退出while循環,設置標誌,執行臨界區代碼。
1 typedef struct lock_t { int flag; } lock_t; 2 3 void init(lock_t *mutex) { 4 // 0 -> lock is available, 1 -> held 5 mutex->flag = 0; 6 } 7 8 void lock(lock_t *mutex) { 9 while (mutex->flag == 1) // TEST the flag 10 ; // spin-wait (do nothing) 11 mutex->flag = 1; // now SET it! 12 } 13 14 void unlock(lock_t *mutex) { 15 mutex->flag = 0; 16 }
遺憾的是,這段代碼並不能正確工做。假設代碼按照下表執行,開始時flag=0。
從這種交替執行能夠看出,經過適時的中斷,咱們很容易構造出兩個線程都將標誌設置爲1,都能進入臨界區的場景。
使用測試並設置指令改進後的代碼以下:
1 typedef struct lock_t { 2 int flag; 3 } lock_t; 4 5 void init(lock_t *lock) { 6 // 0 indicates that lock is available, 1 that it is held 7 lock->flag = 0; 8 } 9 10 void lock(lock_t *lock) { 11 while (TestAndSet(&lock->flag, 1) == 1) 12 ; // spin-wait (do nothing) 13 } 14 15 void unlock(lock_t *lock) { 16 lock->flag = 0; 17 }
咱們理解一下這個鎖的工做原理。首先假設一個線程在運行,調用lock(),沒有其餘線程持有鎖,因此flag是0。當調用TestAndSet(flag, 1)方法,返回0,線程會跳出while循環,獲取鎖。同時也會原子地設置flag爲1,標誌鎖已經被持有。當線程離開臨界區,調用unlock()將flag清理爲0。
當某一個線程已經持有鎖時。工做線程調用lock(),而後調用TestAndSet(flag, 1),這一次返回1。只要另外一個線程一直持有鎖,TestAndSet()會重複返回1,本線程會一直自旋。當flag終於被改成0,本線程會調用TestAndSet(),返回0而且原子地設置爲1,從而得到鎖,進入臨界區。
這種鎖被稱爲自旋鎖(spin lock)。這是最簡單的一種鎖,一直自旋,利用CPU週期,直到鎖可用。在單處理器上,須要搶佔式的調度器。不然,自旋鎖在單CPU上沒法使用,由於一個自旋的線程永遠不會放棄CPU。
咱們按照以前的標準來評價一下咱們實現的自旋鎖。首先是正確性:自旋鎖一次只容許一個線程進入臨界區,所以能夠正確運行。
下一個標準是公平性:答案是自旋鎖不提供任何公平性保證。實際上,自旋的線程在競爭條件下可能會永遠自旋。自旋鎖沒有公平性,可能會致使餓死。
最後一個標準是性能。對於自旋鎖,在單CPU的狀況下,性能開銷至關大。假設一個線程持有鎖進入臨界區時被搶佔。調度器可能會運行其餘每個線程(假設有N−1個這種線程)。而其餘線程都在競爭鎖,都會在放棄CPU以前,自旋一個時間片,浪費CPU週期。
可是,在多CPU上,自旋鎖性能不錯(若是線程數大體等於CPU數)。假設線程A在CPU 1,線程B在CPU 2競爭同一個鎖。線程A佔有鎖時,線程B競爭鎖就會自旋。然而,臨界區通常都很短,所以很快鎖就可用,而後線程B得到鎖。自旋等待其餘處理器上的鎖,並無浪費不少CPU週期,所以效果不錯。
某些系統提供了另外一個硬件原語,即比較並交換指令(compare-and-swap)。下圖是這條指令的C語言僞代碼。
1 int CompareAndSwap(int *ptr, int expected, int new) { 2 int actual = *ptr; 3 if (actual == expected) 4 *ptr = new; 5 return actual; 6 }
比較並交換的基本思路是檢測ptr指向的值是否和expected相等;若是是,更新ptr所指的值爲新值。不然,什麼也不作。不論哪一種狀況,都返回該內存地址的實際值,讓調用者可以知道執行是否成功。
有了比較並交換指令,就能夠實現一個鎖,相似於用測試並設置指令那樣。例如,咱們只要用下面的代碼替換上面例子中的lock()函數:
1 void lock(lock_t *lock) { 2 while (CompareAndSwap(&lock->flag, 0, 1) == 1) 3 ; // spin 4 }
它的行爲以及對其的評價等價於上面分析的自旋鎖。
最後一個硬件原語是獲取並增長(fetch-and-add)指令,它能原子地返回特定地址的舊值,而且讓該值自增一。獲取並增長的C語言僞代碼以下:
1 int FetchAndAdd(int *ptr) { 2 int old = *ptr; 3 *ptr = old + 1; 4 return old; 5 }
咱們可使用獲取並增長指令,實現一個更有趣的ticket鎖:
1 typedef struct lock_t { 2 int ticket; 3 int turn; 4 } lock_t; 5 6 void init(lock_t *lock) { 7 lock->ticket = 0; 8 lock->turn = 0; 9 } 10 11 void lock(lock_t *lock) { 12 int myturn = FetchAndAdd(&lock->ticket); 13 while (lock->turn != myturn) 14 ; // spin 15 } 16 17 void unlock(lock_t *lock) { 18 FetchAndAdd(&lock->turn); 19 }
這裏不是用一個值,而是使用了ticket和turn變量來構建鎖。基本操做也很簡單:若是線程但願獲取鎖,首先對一個ticket值執行一個原子的獲取並相加指令。這個值做爲該線程的「turn」(順位,即myturn)。根據全局共享的lock->turn變量,當某一個線程的(myturn == turn)時,則輪到這個線程進入臨界區。unlock則是增長turn,從而下一個等待線程能夠進入臨界區。
不一樣於以前的方法:本方法可以保證全部線程都能搶到鎖。只要一個線程得到了ticket值,它最終會被調度。好比基於測試並設置的方法,一個線程有可能一直自旋,即便其餘線程在獲取和釋放鎖。
基於硬件的鎖簡單並且有效,可是在某些場景下,這些解決方案會效率低下。以兩個線程運行在單處理器上爲例,當一個線程(線程1)持有鎖時,被中斷。第二個線程(線程2)去獲取鎖,發現鎖已經被持有。所以,它就一直自旋。最後,時鐘中斷產生,線程1從新運行,它釋放鎖。最後,線程2不須要繼續自旋了,它獲取了鎖。
相似的場景下,一個線程會一直自旋檢查一個不會改變的值,浪費掉整個時間片。若是有N個線程去競爭一個鎖,狀況會更糟糕。一樣的場景下,會浪費N−1個時間片,只是自旋並等待一個線程釋放該鎖。所以,咱們的下一個關鍵問題是:怎樣避免沒必要要的自旋,浪費CPU時間?
第一種簡單的方法就是,在要自旋的時候,放棄CPU。下圖展現了這種方法。
1 void init() { 2 flag = 0; 3 } 4 5 void lock() { 6 while (TestAndSet(&flag, 1) == 1) 7 yield(); // give up the CPU 8 } 9 10 void unlock() { 11 flag = 0; 12 }
在這種方法中,咱們假定操做系統提供原語yield(),線程能夠調用它主動放棄CPU,讓其餘線程運行。yield()系統調用可以讓線程由運行(running)態變爲就緒(ready)態,從而容許其餘線程運行。所以,讓出線程本質上取消調度(deschedules)了它本身。
考慮在單CPU上運行兩個線程,基於yield的方法十分有效。一個線程調用lock(),發現鎖被佔用時,讓出CPU,另一個線程運行,完成臨界區。在這個簡單的例子中,讓出方法工做得很是好。
如今來考慮許多線程(例如100個)反覆競爭一把鎖的狀況。在這種狀況下,一個線程持有鎖,在釋放鎖以前被搶佔。其餘99個線程分別調用lock(),發現鎖被搶佔,而後讓出CPU。假定採用某種輪轉調度程序,這99個線程會一直處於運行—讓出這種模式,直到持有鎖的線程再次運行。雖然比原來的浪費99個時間片的自旋方案要好,但這種方法仍然成本很高,上下文切換的成本是實實在在的,所以浪費很大。
更糟的是,咱們尚未考慮飢餓的問題。一個線程可能一直處於讓出的循環,而其餘線程反覆進出臨界區。很顯然,咱們須要一種方法來解決這個問題。
前面一些方法的真正問題是存在太多的偶然性:調度程序決定如何調度線程。若是調度不合理,線程或者一直自旋,或者馬上讓出CPU。不管哪一種方法,均可能形成浪費,也不能防止飢餓。
所以,咱們必須顯式地施加某種控制,決定鎖釋放時,誰能搶到鎖。爲了作到這一點,咱們須要操做系統的更多支持,並須要一個隊列來保存等待鎖的線程。
簡單起見,咱們利用Solaris提供的支持,它提供了兩個調用:park()可以讓調用線程休眠,unpark(threadID)則會喚醒threadID標識的線程。能夠用這兩個調用來實現鎖,讓調用者在獲取不到鎖時睡眠,在鎖可用時被喚醒。
1 typedef struct lock_t { 2 int flag; 3 int guard; 4 queue_t *q; 5 } lock_t; 6 7 void lock_init(lock_t *m) { 8 m->flag = 0; 9 m->guard = 0; 10 queue_init(m->q); 11 } 12 13 void lock(lock_t *m) { 14 while (TestAndSet(&m->guard, 1) == 1) 15 ; //acquire guard lock by spinning 16 if (m->flag == 0) { 17 m->flag = 1; // lock is acquired 18 m->guard = 0; 19 } else { 20 queue_add(m->q, gettid()); 21 m->guard = 0; 22 park(); 23 } 24 } 25 26 void unlock(lock_t *m) { 27 while (TestAndSet(&m->guard, 1) == 1) 28 ; //acquire guard lock by spinning 29 if (queue_empty(m->q)) 30 m->flag = 0; // let go of lock; no one wants it 31 else 32 unpark(queue_remove(m->q)); // hold lock (for next thread!) 33 m->guard = 0; 34 }
在這個例子中,咱們作了兩件事。首先,咱們將以前的測試並設置和等待隊列結合,實現了一個更高性能的鎖。其次,咱們經過隊列來控制誰會得到鎖,避免餓死。
你可能注意到,guard基本上起到了自旋鎖的做用,圍繞着flag和隊列操做。所以,這個方法並無徹底避免自旋等待。線程在獲取鎖或者釋放鎖時可能被中斷,從而致使其餘線程自旋等待。可是,這個自旋等待時間是頗有限的(不是用戶定義的臨界區,只是在lock和unlock代碼中的幾個指令)。
當要喚醒另外一個線程時,flag並無設置爲0。爲何呢?由於當線程被喚醒時,就像是從park()調用返回。此時它沒有持有guard,因此也不能將flag設置爲1。所以,咱們就直接把鎖從釋放的線程傳遞給下一個得到鎖的線程,期間flag沒必要設置爲0。
不過,代碼中仍是存在一點瑕疵。假設一個線程將要調用park休眠,可是不湊巧,系統切換到了正在持有鎖的線程。若是該線程隨後釋放了鎖,前面的線程調用park後可能會永遠休眠下去。爲了不這種狀況,咱們須要額外的工做。
Solaris經過增長了第三個系統調用setpark()來解決這一問題。經過setpark(),一個線程代表本身立刻要調用park。若是恰好另外一個線程被調度,而且調用了unpark,那麼後續的park調用就會直接返回,而不是一直睡眠。所以,示例代碼中獲得lock()調用能夠作一點小修改:
1 queue_add(m->q, gettid()); 2 setpark(); // new code 3 m->guard = 0;
另外一種方案就是將guard傳入內核。在這種狀況下,內核可以採起預防措施,保證原子地釋放鎖,把運行線程移出隊列。
兩階段鎖(two-phase lock)是一種古老的鎖方案,多年來不斷被採用。兩階段鎖意識到自旋可能頗有用,尤爲是在很快就要釋放鎖的場景。所以,兩階段鎖的第一階段會先自旋一段時間,但願它能夠獲取鎖。
可是,若是第一個自旋階段沒有得到鎖,第二階段調用者會睡眠,直到鎖可用。常見的方式是在循環中自旋固定的次數,而後睡眠。
咱們來討論一下如何在常見數據結構中使用鎖。咱們的挑戰是:對於特定數據結構,如何加鎖才能讓該結構功能正確?以及如何可以保證高性能?
實現一個簡單的併發計數器很簡單,代碼以下:
1 typedef struct counter_t { 2 int value; 3 pthread_mutex_t lock; 4 } counter_t; 5 6 void init(counter_t *c) { 7 c->value = 0; 8 Pthread_mutex_init(&c->lock, NULL); 9 } 10 11 void increment(counter_t *c) { 12 Pthread_mutex_lock(&c->lock); 13 c->value++; 14 Pthread_mutex_unlock(&c->lock); 15 } 16 17 void decrement(counter_t *c) { 18 Pthread_mutex_lock(&c->lock); 19 c->value--; 20 Pthread_mutex_unlock(&c->lock); 21 } 22 23 int get(counter_t *c) { 24 Pthread_mutex_lock(&c->lock); 25 int rc = c->value; 26 Pthread_mutex_unlock(&c->lock); 27 return rc; 28 }
這個併發計數器遵循了最簡單、最基本的併發數據結構中常見的數據模式:它只是加了一把鎖,在調用函數操做該數據結構時獲取鎖,從調用返回時釋放鎖。
如今讓咱們來考察一下它的性能。若是簡單的方案就能工做,同時運行速度沒有大幅降低。就不須要精巧的設計。
咱們運行了一個基準測試,每一個線程更新同一個共享計數器固定次數,而後咱們改變線程數。下圖給出了運行1個線程到4個線程的總耗時,其中每一個線程更新100萬次計數器。經過增長CPU,咱們但願單位時間可以完成更多的任務。從上方的曲線能夠看出,同步的計數器擴展性很差。單線程完成100萬次更新只須要很短的時間(大約0.03s),而兩個線程併發執行,性能降低不少(超過5s!)。線程更多時,性能更差。
咱們將介紹一種方法,稱爲懶惰計數器(sloppy counter)。
懶惰計數器經過多個局部計數器和一個全局計數器來實現一個邏輯計數器,其中每一個CPU核心有一個局部計數器。具體來講,在4個CPU的機器上,有4個局部計數器和1個全局計數器。除了這些計數器,還有鎖:每一個局部計數器有一個鎖,全局計數器有一個。
懶惰計數器的基本思想是這樣的:若是一個核心上的線程想增長計數器,那就增長它的局部計數器,訪問這個局部計數器是經過對應的局部鎖同步的。由於每一個CPU有本身的局部計數器,不一樣CPU上的線程不會競爭,因此計數器的更新操做可擴展性好。可是,爲了保持全局計數器更新,局部值會按期轉移給全局計數器,方法是獲取全局鎖,讓全局計數器加上局部計數器的值,而後將局部計數器置零。
局部轉全局的頻度,取決於一個閾值,這裏稱爲S(sloppiness)。S越小,懶惰計數器則越趨近於上面的同步計數器。S越大,擴展性越強,可是全局計數器與實際計數的誤差越大。
上面的基準測試效果圖中下方的線,是閾值S爲1024時懶惰計數器的性能,4個處理器更新400萬次的時間和一個處理器更新100萬次的幾乎同樣。下圖展現了隨着閾值S的變化,懶惰計數器的性能曲線。懶惰計數器就是在準確性和性能之間折中。
下面是懶惰計數器的基本實現:
1 typedef struct counter_t { 2 int global; // global count 3 pthread_mutex_t glock; // global lock 4 int local[NUMCPUS]; // local count (per cpu) 5 pthread_mutex_t llock[NUMCPUS]; // ... and locks 6 int threshold; // update frequency 7 } counter_t; 8 9 // init: record threshold, init locks, init values 10 // of all local counts and global count 11 void init(counter_t *c, int threshold) { 12 c->threshold = threshold; 13 14 c->global = 0; 15 pthread_mutex_init(&c->glock, NULL); 16 17 int i; 18 for (i = 0; i < NUMCPUS; i++) { 19 c->local[i] = 0; 20 pthread_mutex_init(&c->llock[i], NULL); 21 } 22 } 23 24 // update: usually, just grab local lock and update local amount 25 // once local count has risen by 'threshold', grab global 26 // lock and transfer local values to it 27 void update(counter_t *c, int threadID, int amt) { 28 pthread_mutex_lock(&c->llock[threadID]); 29 c->local[threadID] += amt; // assumes amt > 0 30 if (c->local[threadID] >= c->threshold) { // transfer to global 31 pthread_mutex_lock(&c->glock); 32 c->global += c->local[threadID]; 33 pthread_mutex_unlock(&c->glock); 34 c->local[threadID] = 0; 35 } 36 pthread_mutex_unlock(&c->llock[threadID]); 37 } 38 39 // get: just return global amount (which may not be perfect) 40 int get(counter_t *c) { 41 pthread_mutex_lock(&c->glock); 42 int val = c->global; 43 pthread_mutex_unlock(&c->glock); 44 return val; // only approximate! 45 }
接下來看一個更復雜的數據結構——鏈表,簡單起見,咱們只關注鏈表的插入操做。
下面展現了基本的實現代碼:
1 // basic node structure 2 typedef struct node_t { 3 int key; 4 struct node_t *next; 5 } node_t; 6 7 // basic list structure (one used per list) 8 typedef struct list_t { 9 node_t *head; 10 pthread_mutex_t lock; 11 } list_t; 12 13 void List_Init(list_t *L) { 14 L->head = NULL; 15 pthread_mutex_init(&L->lock, NULL); 16 } 17 18 int List_Insert(list_t *L, int key) { 19 pthread_mutex_lock(&L->lock); 20 node_t *new = malloc(sizeof(node_t)); 21 if (new == NULL) { 22 perror("malloc"); 23 pthread_mutex_unlock(&L->lock); 24 return -1; // fail 25 } 26 new->key = key; 27 new->next = L->head; 28 L->head = new; 29 pthread_mutex_unlock(&L->lock); 30 return 0; // success 31 } 32 33 int List_Lookup(list_t *L, int key) { 34 pthread_mutex_lock(&L->lock); 35 node_t *curr = L->head; 36 while (curr) { 37 if (curr->key == key) { 38 pthread_mutex_unlock(&L->lock); 39 return 0; // success 40 } 41 curr = curr->next; 42 } 43 pthread_mutex_unlock(&L->lock); 44 return -1; // failure 45 }
儘管咱們有了基本的併發鏈表,但又遇到了這個鏈表擴展性很差的問題。研究人員發現的增長鏈表併發度的技術中,有一種叫做過手鎖(hand-over-hand locking,也叫做鎖耦合,lock coupling)。
原理也很簡單:每一個節點都有一個鎖,替代以前整個鏈表一個鎖。遍歷鏈表的時候,首先搶佔下一個節點的鎖,而後釋放當前節點的鎖。
從概念上說,過手鎖鏈表有點道理,它增長了鏈表操做的併發程度。可是實際上,在遍歷的時候,每一個節點獲取鎖、釋放鎖的開銷巨大,很難比單鎖的方法快。即便有大量的線程和很大的鏈表,這種併發的方案也不必定會比單鎖的方案快。也許某種雜合的方案(必定數量的節點用一個鎖)值得去嘗試。
若是方案帶來了大量的開銷,那麼高併發就沒有什麼意義。若是簡單的方案不多用到高開銷的調用,一般會頗有效,增長更多的鎖和複雜性可能會拔苗助長。
對於上面的示例代碼,還有一個通用建議:注意控制流的變化或其餘錯誤狀況致使函數返回和中止執行。由於不少函數開始就會得到鎖,分配內存,或者進行其餘一些改變狀態的操做,若是錯誤發生,代碼須要在返回前恢復各類狀態,這容易出錯。所以,最好組織好代碼,減小這種模式。
下面是一個併發隊列的實現代碼:
1 typedef struct node_t { 2 int value; 3 struct node_t *next; 4 } node_t; 5 6 typedef struct queue_t { 7 node_t *head; 8 node_t *tail; 9 pthread_mutex_t headLock; 10 pthread_mutex_t tailLock; 11 } queue_t; 12 13 void Queue_Init(queue_t *q) { 14 node_t *tmp = malloc(sizeof(node_t)); 15 tmp->next = NULL; 16 q->head = q->tail = tmp; 17 pthread_mutex_init(&q->headLock, NULL); 18 pthread_mutex_init(&q->tailLock, NULL); 19 } 20 21 void Queue_Enqueue(queue_t *q, int value) { 22 node_t *tmp = malloc(sizeof(node_t)); 23 assert(tmp != NULL); 24 tmp->value = value; 25 tmp->next = NULL; 26 27 pthread_mutex_lock(&q->tailLock); 28 q->tail->next = tmp; 29 q->tail = tmp; 30 pthread_mutex_unlock(&q->tailLock); 31 } 32 33 int Queue_Dequeue(queue_t *q, int *value) { 34 pthread_mutex_lock(&q->headLock); 35 node_t *tmp = q->head; 36 node_t *newHead = tmp->next; 37 if (newHead == NULL) { 38 pthread_mutex_unlock(&q->headLock); 39 return -1; // queue was empty 40 } 41 *value = newHead->value; 42 q->head = newHead; 43 pthread_mutex_unlock(&q->headLock); 44 free(tmp); 45 return 0; 46 }
這段代碼中有兩個鎖,一個負責隊列頭,另外一個負責隊列尾。這兩個鎖使得入隊列操做和出隊列操做能夠併發執行,由於入隊列只訪問tail鎖,而出隊列只訪問head鎖。這裏還有一個技巧,即添加了一個假節點(在隊列初始化的代碼裏分配的),該假節點分開了頭和尾操做。
隊列在多線程程序裏普遍使用。然而,這裏的隊列一般不能徹底知足這種程序的需求。更完善的有界隊列,在隊列空或者滿時,能讓線程等待。
咱們的示例是不須要調整大小的簡單散列表。
1 #define BUCKETS (101) 2 3 typedef struct hash_t { 4 list_t lists[BUCKETS]; 5 } hash_t; 6 7 void Hash_Init(hash_t *H) { 8 int i; 9 for (i = 0; i < BUCKETS; i++) { 10 List_Init(&H->lists[i]); 11 } 12 } 13 14 int Hash_Insert(hash_t *H, int key) { 15 int bucket = key % BUCKETS; 16 return List_Insert(&H->lists[bucket], key); 17 } 18 19 int Hash_Lookup(hash_t *H, int key) { 20 int bucket = key % BUCKETS; 21 return List_Lookup(&H->lists[bucket], key); 22 }
本例的散列表使用咱們以前實現的簡單併發鏈表,每一個散列桶(每一個桶都是一個鏈表)都有一個鎖,而不是整個散列表只有一個鎖,從而支持許多併發操做。
在實現併發數據結構時,先從最簡單的方案開始,也就是加一把大鎖來同步。若是發現性能問題,那麼就改進方法,只要優化到知足須要便可。
許多操做系統,在最初過渡到多處理器時都是用一把大鎖,包括Sun和Linux。這個方案在不少年裏都頗有效,直到多CPU系統普及,內核只容許一個線程活動成爲性能瓶頸。Linux採用了簡單的方案,把一個鎖換成多個;Sun則實現了一個最開始就能併發的新系統Solaris。