鎖html
鎖以及信號量對大部分人來講都是很是熟悉的,特別是經常使用的mutex。鎖有不少種,互斥鎖,自旋鎖,讀寫鎖,順序鎖,等等,這裏就只介紹常見到的,java
互斥鎖linux
這個是最經常使用的,win32:CreateMutex-WaitForSingleObject-ReleaseMutex,linux的pthread_mutex_lock-pthread_mutex_unlock,c#的lock和Monitor,java的lock,這些都是互斥鎖。互斥鎖的做用你們都知道,是讓一段代碼同時只能有一個線程運行,編程
自旋鎖c#
不經常使用,linux的pthread_spin系列函數就是自旋鎖,(網上不少用原子操做寫的自旋鎖),做用和互斥鎖大同小異。windows
信號量緩存
win下的CreateSemaphore、OpenSemaphore、ReleaseSemaphore、WaitForSingleObject,linux也有一樣的semaphore系列,還有c#的AutoResetEvent或者semaphore。這個用的也不少,信號兩個狀態,阻塞和經過,做用是保證多線程代碼的業務順序!多線程
先嘮一嘮這些鎖的原理,(爲何我把信號量也歸結於鎖?)
架構
首先互斥鎖,互斥鎖其實是由原子操做來實現的,函數
好比,當變量A爲0的時候爲非鎖,爲1的時候爲鎖,當第一個線程將變量A從0變爲1(原子操做)成功的時候,就至關於獲取鎖成功了,另外的線程再次獲取鎖的時候發現A爲1了,(或者說兩個線程同時獲取鎖->原子操做,某一個會失敗),表示獲取鎖失敗,當第一個線程用完了,就釋放鎖,將A=0(原子操做)。
互斥鎖的特色是,當鎖獲取失敗了,當前代碼上下文(線程)會休眠,而且把當前線程添加到這個內核維護的互斥鎖的鏈表裏,當後面的鎖再次獲取失敗,也是將當前線程和執行信息放到這個鏈表裏。當前佔用的互斥鎖的人用完了鎖,內核會抽取互斥鎖等待鏈表上的下一個線程開始喚醒繼續執行,當內核鏈表上爲空,就是沒人搶鎖了,就將鎖狀態設置爲非鎖,以次類推~
而後呢,咱們講自旋鎖,自旋鎖很簡單,他和互斥鎖大同小異,區別就是不休眠,當獲取鎖失敗了,就一直while(獲取),一直到成功,因此,自旋鎖在大部分場景都是不適用的,由於獲取鎖的時間裏,cpu一直是100%的!!
最後講信號量,上面問爲何我將信號量也歸結於鎖這一類?
由於信號量也是原子操做來實現的!道理和互斥鎖同樣的,信號量也有一個鏈表,當等待信號的時候,系統也是把當前線程休眠,把線程和代碼執行信息存儲到這個信號量的鏈表裏,當內核接受到信號的時候,就把這個信號量上的全部等待線程激活運行,這就是信號量!
原子操做
到底什麼是原子操做?
百度百科 所謂原子操做是指不會被線程調度機制打斷的操做;這種操做一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另外一個線程)。
因此,原子操做保證了多個線程對內存操做某個值得準確性!那麼原子操做具體如何實現的?
首先是inter cpu,熟悉彙編的人都知道,inter指令集有個lock,若是某個指令集前面加個lock,那麼在多核狀態下,某個核執行到這個前面加lock的指令的時候,inter會讓總線鎖住,當這個核把這個指令執行完了,再開啓總線!這是一種最最底層的鎖!!
好比 lock cmpxchg dword ptr [rcx],edx cmpxchg這個指令就被加鎖了!
inter指令參考可查閱http://www.intel.cn/content/www/cn/zh/processors/architectures-software-developer-manuals.html
來自IA-32券3:
HLT 指令(中止處理器)中止處理器直至接收到一個啓用中斷(好比 NMI 或 SMI,正 常狀況下這些都是開啓的)、調試異常、BINIT#信號、INIT#信號或 RESET#信號。處理 器產生一個特殊的總線週期以代表進入中止模式。 硬件對這個信號的響應有好幾個方面。前面板上的指示燈會打亮,產生一個記錄 診斷信息的 NMI 中斷,調用復位初始化過程(注意 BINIT#引腳是在 Pentium Pro 處理器 引入的)。若是停機過程當中有非喚醒事件(好比 A20M#中斷)未處理,它們將在喚醒停 機事件處理以後的進行處理。
在修改內存操做時,使用 LOCK 前綴去調用加鎖的讀-修改-寫操做(原子的)。這種 機制用於多處理器系統中處理器之間進行可靠的通信,具體描述以下: 在 Pentium 和早期的 IA-32 處理器中,LOCK 前綴會使處理器執行當前指令時產生 一個 LOCK#信號,這老是引發顯式總線鎖定出現。 在 Pentium 四、Intel Xeon 和 P6 系列處理器中,加鎖操做是由高速緩存鎖或總線 鎖來處理。若是內存訪問有高速緩存且隻影響一個單獨的高速緩存線,那麼操做中就 會調用高速緩存鎖,而系統總線和系統內存中的實際內存區域不會被鎖定。同時,這 條總線上的其它 Pentium 四、Intel Xeon 或者 P6 系列處理器就回寫全部的已修改數據 並使它們的高速緩存失效,以保證系統內存的一致性。若是內存訪問沒有高速緩存且/ 或它跨越了高速緩存線的邊界,那麼這個處理器就會產生 LOCK#信號,並在鎖定操做期 間不會響應總線控制請求。
IA-32 處理器提供有一個 LOCK#信號,會在某些關鍵內存操做期間被自動激活,去鎖定系統總線。當這個輸出信號發出的時候,來自其它處理器或總線代理的總線控制請求將被阻塞。軟件可以經過預先在指令前添加 LOCK 前綴來指定須要 LOCK 語義的其它場合。在 Intel38六、Intel48六、Pentium 處理器中,明確地對指令加鎖會致使 LOCK#信號的產生。由硬件設計人員來保證系統硬件中 LOCK#信號的可用性,以控制處理器間的內IA-32 架構軟件開發人員指南 卷 3:系統編程指南170存訪問。對於 Pentium 四、Intel Xeon 以及 P6 系列處理器,若是被訪問的內存區域是在處理器內部進行高速緩存的,那麼一般不發出 LOCK#信號;相反,加鎖只應用於處理器的高速緩存(參見 7.1.4.LOCK 操做對處理器內部高速緩存的影響) 。
可參考inter的 IA-32券3 第七章第一小節!
固然inter還有其餘方式保證原子操做!
而後是ARM cpu, arm主要是靠兩個指令來保證原子操做的,LDREX 和 STREX
LDREX
LDREX 可從內存加載數據。
若是物理地址有共享 TLB 屬性,則 LDREX 會將該物理地址標記爲由當前處理器獨佔訪問,而且會清除該處理器對其餘任何物理地址的任何獨佔訪問標記。
不然,會標記:執行處理器已經標記了一個物理地址,但訪問還沒有完畢。
STREX
STREX 可在必定條件下向內存存儲數據。 條件具體以下:
若是物理地址沒有共享 TLB 屬性,且執行處理器有一個已標記但還沒有訪問完畢的物理地址,那麼將會進行存儲,清除該標記,並在Rd 中返回值 0。
若是物理地址沒有共享 TLB 屬性,且執行處理器也沒有已標記但還沒有訪問完畢的物理地址,那麼將不會進行存儲,而會在Rd 中返回值 1。
若是物理地址有共享 TLB 屬性,且已被標記爲由執行處理器獨佔訪問,那麼將進行存儲,清除該標記,並在Rd 中返回值 0。
若是物理地址有共享 TLB 屬性,但沒有標記爲由執行處理器獨佔訪問,那麼不會進行存儲,且會在Rd 中返回值 1。
參考:http://blog.csdn.net/duanlove/article/details/8212123
原子CAS操做
原子操做指令裏,有原子加,原子減,cas究竟是什麼呢?
首先看一段代碼,
bool compare_and_swap(int *accum, int *dest, int newval) { if (*accum == *dest) { *dest = newval; return true; } else { *accum = *dest; return false; } }
cas便是Compare-and-swap,先比較再互換,即修改,意思就是,當reg等oldvalue的時候,將reg設置爲newval,這段代碼在非原子狀況下(多線程)是沒用的,可是若是這段代碼是原子操做,那麼他的威力就很是大, 互斥鎖就和這個cas有關,
上面咱們也看到inter這個指令了,lock cmpxchg,cmpxchg做用就是cas這個函數的做用,比較並交換操做數,這就是cas原子操做,神奇吧,上面一個函數的做用,被inter一個指令搞定了,再cmpxchg前面加一個lock,那麼這就是一個真正發揮威力的cas!
在win32內核中有個InterlockedCompareExchange函數,這個函數就是cas功能,在inter cpu上的實現就是這段指令=》lock cmpxchg!
linux下有__sync_bool_compare_and_swap 和 __sync_val_compare_and_swap 。
在dotnet下有 interlocked.compareexchange。java參考sun.misc.Unsafe類。
CAS操做,到底有什麼威力?
若是要修改一個變量,在多線程下,應該要加鎖,代碼是這樣的
int num = 0; void add() { lock(); num = num + 123; unlock(); }
可是若是不要鎖,cas來操做??
int num = 0; void add() { int temp; do { temp = num; } while (cas(num, temp, temp+123)==true) }
咱們看到用一個do while來無限判斷cas的修改結果,若是修改完成,那就成功+1,若是cas沒有修改爲功,繼續while,temp將獲取最新的num,再次cas操做!
當一個線程的時候,num一我的操做,不會出現差錯,當兩我的的時候,某我的先進行cas原子操做,num+1,第二個線程拿着舊值去加操做,返現返回的就是false,因而從新複製temp獲取最新的num,這就是cas的核心價值!無鎖!
cas其實這也算一種鎖,樂觀鎖!相同於自旋鎖也循環!
貼下cas互斥鎖的代碼(本身寫的),固然也能夠去用原子+-來判斷,反正都是原子操做~~
int i = 0;//0非鎖,1鎖住 //嘗試獲取鎖,當cas返回失敗,獲取鎖失敗,返回true,獲取鎖成功 獲取失敗就休眠,等待系統喚醒 bool lock() { return cas(i, 0, 1); } bool unlock() { return cas(i, 1, 0); }
CAS無鎖Queue
簡單發下我寫的cas環形隊列,很簡單的!
// .h #pragma once #ifndef _cas_queue #define _cas_queue #ifndef C_BOOL #define C_BOOL typedef int cbool; #define false 0 #define true 1 #endif // //typedef struct _cas_queue //{ // int size; //} cas_queue; #define QUEUE_SIZE 65536 #ifdef __cplusplus extern "C" { #endif /* compare and swap: CAS(*ptr,outvalue,newvalue); return bool */ cbool compare_and_swap(void ** ptr,long outvalue,long newvalue); void cas_queue_init(int queue_size); void cas_queue_free(); cbool cas_queue_try_enqueue(void * p); cbool cas_queue_try_dequeue(void ** p); #ifdef __cplusplus } #endif #endif //.c #include "cas_queue.h" #ifdef _MSC_VER #include <windows.h> #else #endif volatile unsigned long read_index = 0; volatile unsigned long write_index = 0; long* read_index_p = &read_index; long* write_index_p = &write_index; void** ring_queue_buffer_head; int ring_queue_size = QUEUE_SIZE; cbool is_load = 0; cbool compare_and_swap(void * ptr, long outvalue, long newvalue) { #ifdef _MSC_VER // vs long return_outvalue = InterlockedCompareExchange(ptr, newvalue, outvalue); return return_outvalue == outvalue; /*InterlockedCompareExchange64 No success!!*/ //#ifndef _WIN64 // //32 bit // long return_outvalue = InterlockedCompareExchange(ptr, newvalue, outvalue); // return return_outvalue == outvalue; //#else // //64 bit // long return_outvalue = InterlockedCompareExchange64(ptr, newvalue, outvalue); // return return_outvalue == outvalue; //#endif #else //linux #endif } void cas_queue_init(int queue_size) { if (queue_size > 0) ring_queue_size = queue_size; int size = sizeof(void**)*ring_queue_size; ring_queue_buffer_head = malloc(size); memset(ring_queue_buffer_head, 0, size); is_load = 1; read_index = 0; write_index = 0; } void cas_queue_free() { is_load = 0; free(ring_queue_buffer_head); } cbool cas_queue_try_enqueue(void * p) { if (!is_load) return false; unsigned long index; do { //queue full if (read_index != write_index && read_index%ring_queue_size == write_index%ring_queue_size) return false; index = write_index; } while (compare_and_swap(&write_index, index, index + 1) != true); ring_queue_buffer_head[index%ring_queue_size] = p; return true; } cbool cas_queue_try_dequeue(void ** p) { if (!is_load) return false; unsigned long index; do { //queue empty if (read_index == write_index) return false; index = read_index; } while (compare_and_swap(read_index_p, index, index + 1) != true); *p = ring_queue_buffer_head[index%ring_queue_size]; return true; }
具體我測試過,在4個線程狀況下,80萬個消息,同時入和出,出完只須要150毫秒左右!固然線程過多並且集火的話確定會慢的。
這個demo不是很實用,看下篇:CAS原子鎖 高效自旋無鎖的正確用法