關於併發,百科中給出的精準的定義:「 在計算機科學中,並行性(英語:Concurrency)是指在一個系統中,擁有多個計算,這些計算有同時執行的特性,並且他們之間有着潛在的互動。所以系統可進行的執行路徑會有至關多個,並且結果可能具備不肯定性。併發計算可能會在具有多核心的同一個晶片中複合運行,以優先分時線程在同一個處理器中執行,或在不一樣的處理器執行」。這裏強調一下,「併發」不是「並行」,或者說「併發」僅僅是「並行」的一個子集,具備並行的「表面特性」,然而概念上是不一樣的,能夠用下圖簡單說明一下:html
併發的優勢就不贅述了,主要想介紹一下併發編程相關的技術點。併發常規作法主要有:多進程、單進程多線程、多路複用等,不管哪一種方案都存在資源競爭問題,以多線程爲例,介紹一下如何保證併發安全性問題,傳統的手段:鎖、原子操做等,固然傳統的手段很難保證開發效率,因而有涌現了不少更高級的併發方案,如Actor、CSP併發模型等,下面簡單介紹一下。node
前輩們爲了作高性能併發也是費盡苦心了,提出了各類鎖來保證併發的效率和安全性,常見的鎖:互斥鎖(Mutex)、信號量(Semaphore)、讀寫鎖(RWLock)、條件變量(Cond)、臨界區(Critical Section)shell
信號量(Semaphore),有時被稱爲信號燈,是在多線程環境下使用的一種設施,是能夠用來保證兩個或多個關鍵代碼段不被併發調用,核心的三個方法簽名:數據庫
#include <semaphore.h> //初始化一個信號量,Value指該資源可被同時使用的個數 int sem_init(sem_t *sem, int Value) //嘗試得到一個信號量,--Value int sem_wait(sem_t c) //釋放信號量,++Value int sem_post(sem_t *sem)
信號量其實就是操做系統中P、V原語的封裝,屬於內核對象,其工做原理大體以下:編程
信號量只能進行兩種操做等待和發送信號,即P(s)和V(s),他們的行爲是這樣的:緩存
P(s):若是s的值大於零,就給它減1;若是它的值爲零,就掛起該進程的執行安全
V(s):若是有其餘進程因等待s而被掛起,就讓它恢復運行,若是沒有進程因等待sv而掛起,就給它加1.多線程
和信號量之間有啥關係呢,Mutex能夠看作Semaphore的一種特殊狀況,也就是Value被初始化爲1的時候,這種叫作「二元信號量(binary semaphore),一般叫作 「互斥鎖」,大多數用法中都對它提供了封裝,如C++的std::mutex,C#的Mutex類等架構
public sealed class Mutex : WaitHandle{ //Blocks the current thread until the current //WaitHandle receives a signal.(Inherited from WaitHandle.) public virtual bool WaitOne() //Releases the Mutex once. public void ReleaseMutex() }
有了互斥鎖彷佛就能解決大部分「共享變量」問題了(不考慮效率問題),但也並非全部,考慮下面的方法調用需求:併發
public class MutexText { private Mutex mutex = new Mutex(); public void Method_A() { mutex.WaitOne(); Method_B(); //Do Somthing…… mutex.ReleaseMutex(); } public void Method_B() { mutex.WaitOne(); //Do Somthing…… mutex.ReleaseMutex(); } }
Method_A得到鎖後再調Method_B,然而Method_B中卻也在請求鎖,會發生」死鎖「嗎? 答案是:不會,若是是PV原語的二元信號量實現的」單純「互斥鎖的確會出現死鎖的問題,可是這裏有個」遞歸鎖「的概念,簡單說就是一個鎖維護了線程對鎖的請求次數,若是在線程A已經擁有所mlock的狀況下再次請求鎖,mlock會將該線程擁有次數加1,固然若是此時有另一個線程請求mlock,就會進入等待線程隊列。
相對互斥量只有加鎖和不加鎖兩種狀態,讀寫鎖有三種狀態:讀模式下的加鎖,寫模式下的加鎖,不加鎖。
讀寫鎖的使用規則:
1.只要沒有寫模式下的加鎖,任意線程均可以進行讀模式下的加鎖;
2.只有讀寫鎖處於不加鎖狀態時,才能進行寫模式下的加鎖;
#include <pthread.h> /* 初始化讀寫鎖屬性對象 */ int pthread_rwlockattr_init (pthread_rwlockattr_t *__attr); /* 申請讀鎖 */ int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); /* 申請寫鎖 */ int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); /* 釋放鎖 */ int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
爲何須要讀寫鎖呢?考慮一個簡單的應用場景:有個模塊維護了一個鏈表mList,大多數狀況下會有不少線程併發訪問,好比提供個readData方法給外部索引鏈表中的元素,然而有極少狀況須要增刪mList,這時若是使用互斥鎖,就會出現訪問時只能單線程調用,模塊的吞吐能力急劇降低,這時RWLock就派上用場了,示例代碼以下:
struct{ pthread_rwlock_t rwlock; int product; }sharedData = {PTHREAD_RWLOCK_INITIALIZER, 0}; List books = new List(); void * RemoveBook(int index) { pthread_rwlock_wrlock(&sharedData.rwlock); books.remove(index); pthread_rwlock_unlock(&sharedData.rwlock); } void Book* GetBook(int i) { Book book = null; pthread_rwlock_rdlock(&sharedData.rwlock); book = books.Get(i); pthread_rwlock_unlock(&sharedData.rwlock); return book; } void Book* PeekBook(int i) { Book book = null; pthread_rwlock_rdlock(&sharedData.rwlock); book = books.Get(i); pthread_rwlock_unlock(&sharedData.rwlock); return book; }
條件變量是利用線程間共享的全局變量進行同步的一種機制,主要包括兩個動做:一個線程等待"條件變量的條件成立"而掛起;另外一個線程使"條件成立"(給出條件成立信號)。爲了防止競爭,條件變量的使用老是和一個互斥鎖結合在一塊兒。
/* 常量初始化 */ pthread_cond_t cond = PTHREAD_COND_INITIALIZER /* 動態初始化 */ int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr) /* 等待被觸發 */ int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) /* 銷燬 */ int pthread_cond_destroy(pthread_cond_t *cond)
那麼什麼狀況下須要用條件變量去同步呢,舉個簡單的例子:有個處理文件的過濾器線程,但該文件建立的時機不肯定,此時能夠考慮讓過濾器線程等待文件建立條件,當文件建立完成時,發送通知,喚醒過濾器線程開始工做……貼個示例代碼:
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; static void *Filter_File(File *f) { while (f == NULL) { pthread_cond_wait(&cond, &mtx); } //Filter something…… } void main(void) { //建立等待線程 pthread_create(&tid, NULL, Filter_File, NULL); //do something…… laoding file or …… sleep(1000); //發送信號,喚醒等待線程 pthread_cond_signal(&cond); }
臨界區的概念其實和互斥鎖有些重合,甚至不少文章中直接是:臨界區 == 互斥鎖,可是這裏提一個在實現原理上稍有不一樣的「臨界區」,在MFC編程中有個臨界區的API,簽名大體長這樣:
/* 初始化臨界區,指定自旋次數 */ BOOL WINAPI InitializeCriticalSectionAndSpinCount( _Out_ LPCRITICAL_SECTION lpCriticalSection, _In_ DWORD dwSpinCount ); /* 進入臨界區 */ void WINAPI EnterCriticalSection( _Inout_ LPCRITICAL_SECTION lpCriticalSection ); /* 離開臨界區 */ void WINAPI LeaveCriticalSection( _Inout_ LPCRITICAL_SECTION lpCriticalSection );
當一個線程進入關鍵代碼段,其它請求進入關鍵代碼段的線程就會進入等待狀態,這意味着該線程必須從用戶方式轉入內核方式(大約1000個C P U週期),這種轉換是要付出很大代價的。 而對於多CPU系統,有時候這是沒有必要的,實際上擁有資源的線程能夠在另外一個線程完成轉入內核方式以前釋放資源。
因此,爲了提升性能,Microsoft將自旋鎖引入臨界區。當EnterCriticalSection函數被調用時,若是臨界區已經被其它線程持有時,它就原地自旋,當自旋必定次數後還不能獲取關鍵代碼段,此時線程才轉入內核方式,進入等待狀態。找個簡單的示例代碼:
const int COUNT = 10; int g_nSum = 0; CRITICAL_SECTION g_cs; DWORD WINAPI FirstThread(PVOID pvParam) { EnterCriticalSection(&g_cs); g_nSum = 0; for (int n = 1; n <= COUNT; n++) { g_nSum += n; } LeaveCriticalSection(&g_cs); return(g_nSum); }
當線程A已經在執行FirstThread的狀況下,線程B請求進入臨界區代碼,此時線程B不會直接進入等待,而是不停的自旋,當自旋必定次數,發現仍是不能進入臨界區,此時再進入等待狀態。
水了那麼多關於鎖的內容,下面簡單寫一個多線程中「完備的」懶單例,示例代碼以下:
/* 所謂的雙檢鎖實現的單例 */ public sealed class Singleton { private Singleton() { } private static Object o_lock = new Object(); private static Singleton inst = null; public Singleton GetInst() { if (inst != null) return inst; Monitor.Enter(o_lock); if (inst == null) { Singleton temp = new Singleton(); Volatile.Write(ref inst, temp); } Monitor.Exit(o_lock); return inst; } }
用這種方案寫出的單例,若非真的有「懶初始化」需求,那麼多少存在一些「炫技」的嫌疑,哈哈……Monitor鎖的做用很簡單,就是爲了防止多線程併發訪問時建立多個Singleton,那麼Volatile.Write有什麼做用,爲何不寫成「inst = new Singleton()」呢?主要防止編譯器在處理new Singleton()時,先將inst先賦值了引用,在暗搓搓地去調構造方法,若是此時有其餘線程訪問Singleton,它會判斷inst!=null,而後開心地去用inst,因而dump啦……因此用了一個Volatile.Write,保證了new Singleton()初始化完成,而後再複製。
自旋鎖(Spinlock)是一種普遍運用的底層同步機制。自旋鎖是一個互斥設備,它只有兩個值:「鎖定」和「解鎖」。它一般實現爲某個整數值中的某個位。但願得到某個特定鎖得代碼測試相關的位。若是鎖可用,則「鎖定」被設置,而代碼繼續進入臨界區;相反,若是鎖被其餘人得到,則代碼進入忙循環(而不是休眠,這也是自旋鎖和通常鎖的區別)並重複檢查這個鎖,直到該鎖可用爲止,這就是自旋的過程。「測試並設置位」的操做必須是原子的,這樣,即便多個線程在給定時間自旋,也只有一個線程可得到該鎖。大多數語言都對自旋鎖提供了支持,這裏簡單實現一個暴力自旋鎖:
public class SimpleSpinLock { //0:未被佔用, 1:已經佔用 private int resRefCount = 0; //請求鎖,進入代碼段 public void Enter() { while(true){ /** * 原子操做,CAS * 只有resRefCount == 0時,resRefCount纔會被賦值1 * 返回值是resRefCount原始值 * 因此,若是該鎖已經被佔用,則該線程會一直自旋 */ if (Interlocked.CompareExchange(ref resRefCount, 1, 0) == 0) { return; } } } //釋放鎖,離開代碼段 public void Leave() { //重置resRefCount狀態 Interlocked.Exchange(ref resRefCount, 0); } }
自旋鎖還有些優化方案,好比自旋必定次數後仍未得到鎖就走內核調用,轉入等待隊列。如上述鎖中的「Cirtical Section」 和 「Monitor」鎖,在實現上都作了自旋機制。
原子操做(atomic operation)是不須要synchronized",這是Java多線程編程的老生常談了。所謂原子操做是指不會被線程調度機制打斷的操做;這種操做一旦開始,就一直運行到結束,中間不會有任何 context switch,
原子性不可能由軟件單獨保證--必須須要硬件的支持,所以是和架構相關的。在x86 平臺上,CPU提供了在指令執行期間對總線加鎖的手段。CPU芯片上有一條引線#HLOCK pin,若是彙編語言的程序中在一條指令前面加上前綴"LOCK",通過彙編之後的機器代碼就使CPU在執行這條指令的時候把#HLOCK pin的電位拉低,持續到這條指令結束時放開,從而把總線鎖住,這樣同一總線上別的CPU就暫時不能經過總線訪問內存了,保證了這條指令在多處理器環境中的原子性。
簡單說,原子操做保證一條語句不會被中斷,能夠一次性執行完成,而這種保證是硬件提供的,大多數語言都提供了CAS(compare and swap)的操做接口,如C#的Interlocked類,C++:
bool atomic_compare_exchange_weak( volatile std::atomic* obj, T* expected, T desired );
有篇不錯的文章推薦一下CAS 和 無鎖隊列,大體講清楚了CAS使用,和無鎖隊列的原理
//簡單版無鎖隊列 public class LockFreeQueue<T> { //隊列節點 private class Node<T> { public T value; public Node<T> next; } private Node<T> head; private Node<T> tail; private int count; public LockFreeQueue() { head = new Node<T>(); tail = head; } public int Count { get { return count; } } //進隊列 public void EnQueue(T item) { var node = new Node<T>(); node.value = item; node.next = null; Node<T> tmpTail = null; bool isReplace = false; do { tmpTail = tail; //強制取到隊列尾指針 while (tmpTail.next != null) { tmpTail = tmpTail.next; } //保證替換中tmpTail是尾指針 var result = Interlocked.CompareExchange<Node<T>>(ref tmpTail.next, node, null); //替換是否成功 isReplace = result != tmpTail.next; } while (!isReplace);//替換不成功就自旋 Interlocked.Exchange<Node<T>>(ref tail, node); Interlocked.Increment(ref count); } //出隊列 public T Dequeue() { bool isReplace = false; Node<T> tmpHead = null; Node<T> oldHeadNext = null; do { //緩存頭部相關信息 tmpHead = head; oldHeadNext = tmpHead.next; //空隊列 if (oldHeadNext == null) { return default(T); } else { //出隊列前頭部指針爲發生變化 var result = Interlocked.CompareExchange<Node<T>>(ref head, oldHeadNext, tmpHead); isReplace = result != oldHeadNext; } } while (!isReplace); Interlocked.Decrement(ref count); return oldHeadNext.value; } }
死鎖: 是指兩個或兩個以上的進程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程。 因爲資源佔用是互斥的,當某個進程提出申請資源後,使得有關進程在無外力協助下,永遠分配不到必需的資源而沒法繼續運行,這就產生了一種特殊現象:死鎖
活鎖(英文 livelock),指線程1可使用資源,但它讓其餘線程先使用資源;線程2可使用資源,但它也讓其餘線程先使用資源,因而二者一直謙讓,都沒法使用資源。
所謂飢餓,是指若是事務T1封鎖了數據R,事務T2又請求封鎖R,因而T2等待。T3也請求封鎖R,當T1釋放了R上的封鎖後,系統首先批准了T3的請求,T2仍然等待。而後T4又請求封鎖R,當T3釋放了R上的封鎖以後,系統又批准了T4的請求......T2可能永遠等待,這就是飢餓。
活鎖有必定概率解開。而死鎖(deadlock)是沒法解開的。避免活鎖的簡單方法是採用先來先服務的策略。當多個事務請求封鎖同一數據對象時,封鎖子系統按請求封鎖的前後次序對事務排隊,數據對象上的鎖一旦釋放就批准申請隊列中第一個事務得到鎖。
悲觀鎖,正如其名,它指的是對數據被外界(包括本系統當前的其餘事務,以及來自外部系統的事務處理)修改持保守態度,所以,在整個數據處理過程當中,將數據處於鎖定狀態。悲觀鎖的實現,每每依靠數據庫提供的鎖機制(也只有數據庫層提供的鎖機制才能真正保證數據訪問的排他性,不然,即便在本系統中實現了加鎖機制,也沒法保證外部系 統不會修改數據)。
樂觀鎖( Optimistic Locking ) 相對悲觀鎖而言,樂觀鎖機制採起了更加寬鬆的加鎖機制。悲觀鎖大多數狀況下依靠數據庫的鎖機制實現,以保證操做最大程度的獨佔性。但隨之而來的就是數據庫 性能的大量開銷,特別是對長事務而言,這樣的開銷每每沒法承受。 而樂觀鎖機制在必定程度上解決了這個問題。樂觀鎖,大可能是基於數據版本( Version )記錄機制實現。何謂數據版本?即爲數據增長一個版本標識,在基於數據庫表的版本解決方案中,通常是經過爲數據庫表增長一個 「version」 字段來實現。讀取出數據時,將此版本號一同讀出,以後更新時,對此版本號加一。此時,將提交數據的版本數據與數據庫表對應記錄的當前版本信息進行比對,如 果提交的數據版本號大於數據庫表當前版本號,則予以更新,不然認爲是過時數據。