因爲我對於Java併發庫JUC的深刻,一直以來有個想法,能不能把Java併發庫移植到純C語言環境下,而且在實現、使用方式上都與Java平臺保持至關程度的類似性呢?java
純C環境下內存模型與Java平臺不一致?加上內存屏障(fence)或者lock指令就行。 node
C環境下缺乏對象模型?無非是給每一個數據塊提供個*init方法(好比pthread_mutex_t的pthread_mutex_init,pthread_barrier_t的pthread_barrier_init),以後再將邏輯的部分直接寫到函數裏(phtread_mutex_lock、pthread_barrier_wait),無非是這些邏輯並非像Java平臺上同樣被綁定到某個"對象"。這些都不是本質上困難的地方。git
真正的困難在於內存回收。github
Java平臺,咱們可以對內存作的惟一事情就是申請、建立(new關鍵字),如此一來便獲得一個新的對象。以後,咱們沒法直接針對這塊內存釋放。咱們最多隻能把本身目所能及範圍內的關於這塊內存的"引用"置爲null,從而期待GC去回收(前提是這塊內存不存在其餘引用)。同時,咱們根本沒法知道究竟哪一個時刻這塊內存會被回收,咱們只能認爲,Runtime environment可以選擇最適當的時刻。web
同時,Java虛擬機給出了一個更強的保證:只要你的對象(引用)obj != null,那麼這個引用所指示的對象即是確定存在的,咱們能夠絕對安全地調用obj.method()而不用懼怕任何意外。
把這個場景放在C環境下比喻彷佛就在說:若是你能看到某個地址(數值),那麼就放心對他作任何合理的事吧!Runtime environment確保了你的安全!
總之,Java運行時給出了強大的保證:
1 看獲得的對象都存在,你能夠放心對它操做。
2 那些正在被回收的、你不能安全操做的對象(內存),你絕對沒法看到。算法
如此一來,不管是內存獨佔或共享、線程併發或非併發,咱們都無需擔憂內存自己的問題(Java GC回收全部內存,全平臺的垃圾回收器)。數組
那麼如今咱們回頭來看C環境。它的運行時環境根本不幫你作任何事,你甚至能夠虛構一個內存地址,而後將它強制轉化爲虛構的struct類型,接着對它操做。只不過這樣極可能破壞了內存,致使不可預期的結果,同時這個錯誤會一直潛伏,你根本不知道什麼時候出現。甚至於,都不須要定義結構體信息,假如你瞭解運行機器的具體信息,你均可以直接在一個地址上作地址偏移來操做內存......固然,前提是這塊內存是安全的。緩存
那麼問題就很清楚了,
Java環境下內存安全回收由虛擬機徹底負責。
C環境下,內存安全回收由邏輯單元(線程)自己來負責。安全
也就是說C環境下,咱們不只要處理算法自己的邏輯,同時也要額外去處理內存回收的問題。數據結構
那麼C環境下內存回收困難在哪呢?
首先,咱們不考慮不須要回收的內存。咱們知道,獨佔的內存容易回收,對於共享的內存,假若有個邏輯點,咱們確保全部線程當下以及未來都不會使用這塊內存,那麼也是能夠安全回收的。
因此真正難於回收的是知足如下3個條件的內存:
OK,咱們接着引出Maged M. Michael,在2004發表的《Hazard Pointers:Safe Memory Reclamation for Lock-Free Objects》。
Hazard Pointer能夠說是最知名的一種共享內存的回收方案。借用Erez Petrank的ppt,它能夠歸納以下:
以及後續變化的Lock-Free Data Structures with Hazard Pointers、Hazard Eras、Maged M. Michael等。
對於每一塊被共享的內存,這些技術都須要爲它配置另外N(MAX_THREAD)個內存來標記它是否被對應的線程訪問,同時爲了釋放一塊內存,都須要遍歷這N個內存位置從而斷定是否能夠安全回收(儘管能夠採用先排序,二分搜索等方式下降檢索的數據量,可是檢索過程的代價依然是隨着MAX_THREAD而增加的,就算你總操做數同樣。儘管它的算法是lock-free(甚至wait-free)。這裏的缺陷在於每一塊共享數據都與線程自己牢牢耦合,幾乎沒有擴展性。
而另外一種被普遍討論的回收方案是epoch-based ReclamationPractical lock-freedom以及類似的技術Performance of memory reclamation for lockless synchronization,它的性能很好,可是如內存管理規則所說,它並非無阻塞的算法,它在本質上就會阻塞,因此progress沒法獲得保證。
其中2005年,哥德堡大學的研究團隊發表的方案Practical and Efficient Lock-Free Garbage Collection Based on Reference Counting,儘管也採用了引用計數的方式(與我將要介紹的方式相似),可是卻並無解除線程自己與共享數據之間的依賴,而且scan時依然要遍歷整個線程組。
在不計較lock指令或者fence的狀況下,是否存在一種方法可以同時作到:
爲了達成以上幾個目標,我向你們介紹自創的SHP(Scalable Hazard Pointers)。
Scalable Hazard Pointers分爲以下三個部分來說述:
3RE&S指的是以下四個,針對被分配內存地址(pointer)的抽象操做:
同時,對於共享數據的讀、寫爲下面兩種方式:
這個協議總結了,包含了hazardpointer以及許多相似技術的處理方式,RECORD操做時將共享內存地址自己也做爲傳輸傳入。
幾乎任何一種算法,併發共享的數據均可經過以上方式回收。
這裏的關鍵點是:
這裏給出個針對Michael原始論文的對比例子:
void enqueue(int value){ NodeType* node; posix_memalign(&node, 64, sizeof(NodeType)); memset(node, 0, sizeof(NodeType)); node->value = value; node->next = NULL; NodeType* t; for(;;){ t = Tail; psly_record(&Tail, t); if(Tail != t) { psly_remove(t); continue; } NodeType* next = t->next; if(Tail != t) { psly_remove(t); continue; } if(next != NULL){ psly_remove(t); __sync_bool_compare_and_swap(&Tail, t, next); continue; } if(__sync_bool_compare_and_swap(&t->next, NULL, node)) { psly_remove(t); break; } psly_remove(t); } __sync_bool_compare_and_swap(&Tail, t, node); }
int dequeue(){ int data; NodeType* h; for(;;){ h = Head; //myhprec->HP[0] = h; psly_record(&Tail, h); if(Head != h) { psly_remove(h); continue; } NodeType* t = Tail; NodeType* next = h->next; psly_record(&(h->next), next); //myhprec->HP[1] = next; if(Head != h) { psly_remove(next); psly_remove(h); continue; } if(next == NULL) { psly_remove(next); psly_remove(h); return -1000000; } if(h == t){ psly_remove(next); psly_remove(h); __sync_bool_compare_and_swap(&Tail, t, next); continue; } data = next->value; //myhprec->HP[1] = NULL; //myhprec->HP[0] = NULL; if(__sync_bool_compare_and_swap(&Head, h, next)) { psly_remove(next); psly_remove(h); retireNode(h); break; } psly_remove(next); psly_remove(h); } //myhprec->HP[0] = NULL; //myhprec->HP[1] = NULL; return data; }
首先,爲何要爲每塊共享內存維護N個變量,這樣作不只浪費內存並且增長搜索代價。理想狀況下咱們應該只須要一個內存數據來處理一個共享內存。那麼咱們怎麼來處理多個線程引用它的狀況呢?這裏的一個天然的想法是引入引用計數(reference count),(注意,這裏的引用計數是線程引用共享數據,與另外一個對象層次間的引用無關),咱們用refcount表明當下訪問它的線程數,refCount>0的內存絕對不會被回收,refCount == 0表明沒有線程引用它,處於能夠被回收的狀態。咱們將這樣的一個內存數據稱爲Record,
struct Record{ //數據字段 void* pointer; int refcount; }
第二,因爲Record內存自己是要被維護的,因此咱們的策略是隨需分配,已分配的不在回收,咱們將Record做爲一種可重用的資源,用於追蹤那些共享內存。那麼意味着Record可以被高效地從資源庫併發獲取和返回。
以上斜體字是我以前的觀點,我有了點突破,如今我有了技術方案,能以較小代價實現Record的回收了!:)
這裏的技巧是用一個固定的self字段,標記Record自己,(好比低10位做爲indexNum,接下來4位做爲arrayNum),用於惟一的標記Record自身。這麼作的目的是爲了支持後面的map。
想象一下,咱們已經有了不少共享的pointer(地址),每一個分配一個Record,接着咱們要如何組織它們呢?從而高效的支持add,remove,search操做。
這裏的方式是用一個大的map,它具備一個大的buckets數目好比1024。
typedef struct RecordList { Record* volatile head ; Record* volatile tail ; } RecordList ; typedef struct RecordMap { RecordList* lists[1024] ; } RecordMap ;
RecordList初始化以後久擁有了固定的head/tail。
很明顯,咱們這裏天然的策略是將具備相同後綴的地址base到同一個list上,同時因爲開闢的內存地址通常是單調遞增,而且保持16字節對齊,咱們能夠根據地址自己來避開了hits。
咱們能夠抽象出以下的接口以下:
struct Record{ //Record庫維護字段 int volatile next ; int self; //數據字段 void* pointer; int refcount; } int record(void* pointer) { long key = ((long) pointer) >> 4 ; RecordList* list = map.lists[key & (1024-1)]; return handle_records(list, pointer, RECORD); }
那麼接下來的問題是如何在一個list上組織那些帶有一樣後綴地址的Record?
這裏推薦著名的Harris' list,它作到用一個併發無鎖單鏈表來組織有序數據,支持增長、刪除、搜索。
咱們須要給Record再加一個retire'bit和一個long字段nextRecord,它裏面包含四個字段:
struct Record{ //Record庫維護字段 int volatile next ; int self; //數據字段 void* pointer; int refcount; //retire bool retireBit; long nextRecord; }
而後,對它進行了至關程度的改造,改造的地方以下:
注意,因爲以上的操做存在相互依賴,好比新增的Record不能連接到已經邏輯刪除的Record上。
因此,當一個Record被從資源庫get到以後,生命週期爲:
這裏最核心的技術就是基於兩點斷定節點沒有失效:
咱們須要一些Record去持有地址,咱們採用隨需批量malloc的方式。這種方式的特色是:
以上方式,能夠如圖所示:
下面給出一個大概例子,採用的經典的MSQueue,固然,存在其餘更高效的方式lcrq:
int PSLY_Record_IDXNUM = 16; int PSLY_Record_IDXBIT = ((1 << 16) - 1); int PSLY_Record_ARRAYNUM_MAX = (1 << 4); int PSLY_Record_ARRAYNUM = (1 << 2); int PSLY_Record_ARRAYBITS = ((1 << 4) -1); int PSLY_Record_ARRBIT = (((1 << 4) - 1) << 16); int PSLY_Record_ARRBITR = ((1 << 4) - 1); int PSLY_Record_ARRIDXBIT = ((((1 << 4) - 1) << 16) | ((1 << 16) - 1)); int PSLY_Record_NEXTIDXNUM = 16; int PSLY_Record_NEXTIDXBIT = ((1 << 16) - 1); int PSLY_Record_NEXTTAILNUM = 1; int PSLY_Record_NEXTTAILBIT = (((1 << 1) - 1) << 16); int PSLY_Record_NEXTVERSIONNUM = (32 - 1 - 16); int PSLY_Record_NEXTVERSIONBIT = ((~0)^((((1 << 1) - 1) << 16) | ((1 << 16) - 1))); int PSLY_Record_NEXTVERSIONONE = (1 + ((((1 << 1) - 1) << 16) | ((1 << 16) - 1))); int PSLY_Record_TAILIDXNUM = 16; int PSLY_Record_TAILIDXBIT = ((1 << 16) - 1); int PSLY_Record_TAILVERSIONNUM = (32 - 16); int PSLY_Record_TAILVERSIONBIT = ((~0) ^ ((1 << 16) - 1)); int PSLY_Record_TAILVERSIONONE = (1 + ((1 << 16) - 1)); int PSLY_Record_HEADIDXNUM = 16; int PSLY_Record_HEADIDXBIT = ((1 << 16) - 1); int PSLY_Record_HEADVERSIONNUM = (32 - 16); int PSLY_Record_HEADVERSIONBIT = ((~0) ^ ((1 << 16) - 1)); int PSLY_Record_HEADVERSIONONE = (1 + ((1 << 16) - 1)); typedef struct Record { int volatile next __attribute__((aligned(128))); int self ; long volatile nextRecord __attribute__((aligned(128))); void* volatile pointer ; } Record __attribute__((aligned(128))); typedef struct RecordQueue { int volatile head ; int volatile tail ; } RecordQueue ; static Record* volatile psly_Records[1 << 4]; static RecordQueue volatile psly_Record_queues[1 << 4]; static int volatile recordTake = 0; Record* idx_Record(int index) { return psly_Records[(index & PSLY_Record_ARRBIT) >> PSLY_Record_IDXNUM] + (index & PSLY_Record_IDXBIT); } Record* get_Record() { for(;;) { int localArrayNum = PSLY_Record_ARRAYNUM; //取最高隊列 int array = localArrayNum - 1; RecordQueue* queue = psly_Record_queues + array; Record* arr = psly_Records[array]; for(;;){ int headIndex = (queue->head); int indexHead = headIndex & PSLY_Record_HEADIDXBIT; Record* head = arr + indexHead; int tailIndex = (queue->tail); int indexTail = tailIndex & PSLY_Record_TAILIDXBIT; int nextIndex = (head->next); if(headIndex == (queue->head)) { if(indexHead == indexTail){ if((nextIndex & PSLY_Record_NEXTTAILBIT) == PSLY_Record_NEXTTAILBIT) break; __sync_bool_compare_and_swap(&queue->tail, tailIndex, (((tailIndex & PSLY_Record_TAILVERSIONBIT) + PSLY_Record_TAILVERSIONONE ) & PSLY_Record_TAILVERSIONBIT)|(nextIndex & PSLY_Record_TAILIDXBIT)); } else { if(__sync_bool_compare_and_swap(&queue->head, headIndex, (((headIndex & PSLY_Record_HEADVERSIONBIT) + PSLY_Record_HEADVERSIONONE) & PSLY_Record_HEADVERSIONBIT)|(nextIndex & PSLY_Record_HEADIDXBIT))) { return head; } else { break; } } } } // 輪詢某些隊列 for(int i = 0; i < localArrayNum; ++i) { int array = __sync_fetch_and_add(&recordTake, 1) % localArrayNum; RecordQueue* queue = psly_Record_queues + array; Record* arr = psly_Records[array]; for(;;){ int headIndex = (queue->head); int indexHead = headIndex & PSLY_Record_HEADIDXBIT; Record* head = arr + indexHead; int tailIndex = (queue->tail); int indexTail = tailIndex & PSLY_Record_TAILIDXBIT; int nextIndex = (head->next); if(headIndex == (queue->head)) { if(indexHead == indexTail){ if((nextIndex & PSLY_Record_NEXTTAILBIT) == PSLY_Record_NEXTTAILBIT) break; __sync_bool_compare_and_swap(&queue->tail, tailIndex, (((tailIndex & PSLY_Record_TAILVERSIONBIT) + PSLY_Record_TAILVERSIONONE ) & PSLY_Record_TAILVERSIONBIT)|(nextIndex & PSLY_Record_TAILIDXBIT)); } else { if(__sync_bool_compare_and_swap(&queue->head, headIndex, (((headIndex & PSLY_Record_HEADVERSIONBIT) + PSLY_Record_HEADVERSIONONE) & PSLY_Record_HEADVERSIONBIT)|(nextIndex & PSLY_Record_HEADIDXBIT))) { return head; } else { break; } } } } } // 遍歷全部隊列 for(int i = 0; i < localArrayNum; ++i) { int array = i; RecordQueue* queue = psly_Record_queues + array; Record* arr = psly_Records[array]; for(;;){ int headIndex = (queue->head); int indexHead = headIndex & PSLY_Record_HEADIDXBIT; Record* head = arr + indexHead; int tailIndex = (queue->tail); int indexTail = tailIndex & PSLY_Record_TAILIDXBIT; int nextIndex = (head->next); if(headIndex == (queue->head)) { if(indexHead == indexTail){ if((nextIndex & PSLY_Record_NEXTTAILBIT) == PSLY_Record_NEXTTAILBIT) break; __sync_bool_compare_and_swap(&queue->tail, tailIndex, (((tailIndex & PSLY_Record_TAILVERSIONBIT) + PSLY_Record_TAILVERSIONONE ) & PSLY_Record_TAILVERSIONBIT)|(nextIndex & PSLY_Record_TAILIDXBIT)); } else { if(__sync_bool_compare_and_swap(&queue->head, headIndex, (((headIndex & PSLY_Record_HEADVERSIONBIT) + PSLY_Record_HEADVERSIONONE) & PSLY_Record_HEADVERSIONBIT)|(nextIndex & PSLY_Record_HEADIDXBIT))) { return head; } } } } } //不夠增長 if(localArrayNum == PSLY_Record_ARRAYNUM_MAX) return NULL; if(localArrayNum == PSLY_Record_ARRAYNUM) { if(psly_Records[localArrayNum] == NULL) { int array_ = localArrayNum; Record* record; void * ptr; int ret = posix_memalign(&ptr, 4096, (1 << PSLY_Record_IDXNUM) * sizeof(Record)); record = ptr; memset(record, 0, (1 << PSLY_Record_IDXNUM) * sizeof(Record)); for(int j = 0; j < (1 << PSLY_Record_IDXNUM) - 1; ++j){ record->self = (array_ << PSLY_Record_IDXNUM) | j; record->next = j+1; record->pointer = NULL;\ record->nextRecord = 0; record += 1; } record->self = (array_ << PSLY_Record_IDXNUM) | ((1 << PSLY_Record_IDXNUM) - 1); record->next = PSLY_Record_NEXTTAILBIT; record->pointer = NULL; record->nextRecord = 0; //printf("I'm here %d %ld\n", localArrayNum, pthread_self()); if(!__sync_bool_compare_and_swap(&psly_Records[array_], NULL, ptr)) {free(ptr);} else /*printf("extend to %d\n", localArrayNum + 1)*/; } if(localArrayNum == PSLY_Record_ARRAYNUM) __sync_bool_compare_and_swap(&PSLY_Record_ARRAYNUM, localArrayNum, localArrayNum + 1); } } } void return_Record(Record* record) { long local = (record->next); local |= PSLY_Record_NEXTTAILBIT; record->next = local; int self = record->self; int array = (self >> PSLY_Record_IDXNUM) & PSLY_Record_ARRBITR; Record* arr = psly_Records[array]; RecordQueue* queue = psly_Record_queues + array; for(;;) { int tailIndex = (queue->tail); int indexTail = tailIndex & PSLY_Record_TAILIDXBIT; Record* tail = arr + indexTail; int nextIndex = (tail->next); if(tailIndex == (queue->tail)){ if((nextIndex & PSLY_Record_NEXTTAILBIT) == PSLY_Record_NEXTTAILBIT) { if(__sync_bool_compare_and_swap(&tail->next, nextIndex, (((nextIndex & PSLY_Record_NEXTVERSIONBIT) + PSLY_Record_NEXTVERSIONONE) & PSLY_Record_NEXTVERSIONBIT)|(self & PSLY_Record_NEXTIDXBIT))){ __sync_bool_compare_and_swap(&queue->tail, tailIndex, (((tailIndex & PSLY_Record_TAILVERSIONBIT) + PSLY_Record_TAILVERSIONONE) & PSLY_Record_TAILVERSIONBIT)|(self & PSLY_Record_TAILIDXBIT)); return; } } else { __sync_bool_compare_and_swap(&queue->tail, tailIndex, (((tailIndex & PSLY_Record_TAILVERSIONBIT) + PSLY_Record_TAILVERSIONONE) & PSLY_Record_TAILVERSIONBIT)|(nextIndex & PSLY_Record_TAILIDXBIT)); } } } } typedef struct RecordList { Record* volatile head ; Record* volatile tail ; } RecordList ; typedef struct RecordMap { volatile RecordList* lists[131070] ; } RecordMap ; static volatile RecordMap map; #define INIT_RESOURCE(listNum) \ for(int i = 0; i < (PSLY_Record_ARRAYNUM); ++i){ \ Record* record; \ void * ptr;\ int ret = posix_memalign(&ptr, 4096, (1 << PSLY_Record_IDXNUM) * sizeof(Record));\ psly_Records[i] = record = ptr; \ memset(record, 0, (1 << PSLY_Record_IDXNUM) * sizeof(Record)); \ for(int j = 0; j < (1 << PSLY_Record_IDXNUM) - 1; ++j){ \ record->self = (i << PSLY_Record_IDXNUM) | j; \ record->next = j+1; \ record->pointer = NULL;\ record->nextRecord = 0;\ record += 1; \ } \ record->self = (i << PSLY_Record_IDXNUM) | ((1 << PSLY_Record_IDXNUM) - 1); \ record->next = PSLY_Record_NEXTTAILBIT; \ record->pointer = NULL;\ record->nextRecord = 0;\ }\ for(int i = 0; i < PSLY_Record_ARRAYNUM_MAX; ++i){\ psly_Record_queues[i].head = 0; \ psly_Record_queues[i].tail = (1 << PSLY_Record_IDXNUM) - 1; \ } \ for(int i = 0; i < listNum; ++i) { \ void* ptr;\ int ret = posix_memalign(&ptr, 4096, sizeof(RecordList));\ Record* head = get_Record();\ Record* tail = get_Record();\ head->nextRecord = newNext(head->nextRecord, tail); \ map.lists[i] = ptr;\ map.lists[i]->head = head; \ map.lists[i]->tail = tail; \ } #define UNINIT_RESOURCE(listNum) \ for(int i = 0; i < (PSLY_Record_ARRAYNUM); ++i){ \ free(psly_Records[i]); \ } \ for(int i = 0; i < listNum; ++i) {\ free(map.lists[i]);\ }
這裏的psly_Records[1 << 4],psly_Record_queues[1 << 4] 表明總共能夠提供16組,每組65536個數據(PSLY_Record_IDXNUM = 16),初始分配4組數據(PSLY_Record_ARRAYNUM = (1 << 2)),以後若是不夠就擴充一組。
局部變量:
對於一塊共享內存S,咱們爲它的Record保持一個局部變量reordS,只須要在RECORD時候返回,後續的REMOVE跟RETIRE就不須要去查詢了,相對於以前的enqueue給出的例子以下:
+++爲變更代碼
void enqueue(int value){ NodeType* node; posix_memalign(&node, 64, sizeof(NodeType)); memset(node, 0, sizeof(NodeType)); node->value = value; node->next = NULL; NodeType* t; for(;;){ t = Tail; +++ Record* recordT = psly_record(&Tail, t); +++ if(recordT == NULL) +++ continue; if(Tail != t) { +++ psly_remove(recordT); continue; } NodeType* next = t->next; if(Tail != t) { +++ psly_remove(recordT); continue; } if(next != NULL){ +++ psly_remove(recordT); __sync_bool_compare_and_swap(&Tail, t, next); continue; } if(__sync_bool_compare_and_swap(&t->next, NULL, node)) { +++ psly_remove(recordT); break; } +++ psly_remove(recordT); } __sync_bool_compare_and_swap(&Tail, t, node); }
這樣以來,每次操做,最須要在RECORD時候遍歷一次。
咱們還能夠作的更好
線程私有數據:
有些場景的共享內存,會在一段長期時間內不會改變,這種狀況的話每次都去查詢maplist顯得很浪費,咱們能夠作個緩存來節省查詢。
對於一塊肯定的共享內存S,咱們嘗試爲每線程配置一個私有變量(static __tread),用一段結構化的代碼跟蹤它的Record,這樣以來就不須要每次都查詢maplist了,
雖然不是很是合適,但咱們仍是拿前面Hazard pointer的例子作個示例,代碼以下:
void enqueue(int value){ NodeType* node; posix_memalign(&node, 64, sizeof(NodeType)); memset(node, 0, sizeof(NodeType)); node->value = value; node->next = NULL; NodeType* t; for(;;){ t = Tail; +++ static __thread LocalRecord localRecordT; Record* recordT; +++ if(localRecordT.pointer == NULL) { +++ recordT = psly_record(&Tail, t, NULL, NULL); if(recordT == NULL) continue; +++ else { +++ localRecordT.pointer = t; +++ localRecordT.record = recordT; +++ localRecordT.nextRecord = recordT->nextRecord; +++ } +++ } else { +++ if(t != localRecordT.pointer || isChange(localRecordT.record, localRecordT.nextRecord) || t != localRecordT.record->pointer) { +++ localRecordT.pointer = NULL; +++ continue; +++ } +++ recordT = psly_record(&Tail, t, localRecordT.record, localRecordT.nextRecord); +++ if(recordT == NULL) { +++ localRecordT.pointer = NULL; +++ continue; +++ } +++ } if(Tail != t) { psly_remove(recordT); continue; } NodeType* next = t->next; if(Tail != t) { psly_remove(recordT); continue; } if(next != NULL){ psly_remove(recordT); __sync_bool_compare_and_swap(&Tail, t, next); continue; } if(__sync_bool_compare_and_swap(&t->next, NULL, node)) { psly_remove(recordT); break; } psly_remove(recordT); } __sync_bool_compare_and_swap(&Tail, t, node); }
這種場景下,假如咱們系統共有N個線程,那麼對於一個內存,在它的整個生命週期裏,須要查詢maplist的次數上限爲N+1次!
這種方式極大地減小了查詢的次數,從而爲設計某些高效的共享數據結構提供了可能。
數據結構自己帶Record
void enqueue(int value){ NodeType* node; posix_memalign(&node, 64, sizeof(NodeType)); memset(node, 0, sizeof(NodeType)); node->value = value; node->next = NULL; NodeType* t; for(;;){ +++ Record* recordT = TailRecord; +++ int versionT= recordT->version; t = Tail; +++ if(recordT->pointer != t) { +++ if(recordT == TailRecord && recordT->version == versionT) { +++ Record* localRecordT = NULL; //udpate記錄Record並給引用計數設置1,Atomicity; +++ if(CAS(&TailRecord, recordT, localRecordT = get_Record(t, ONE))) { +++ return_Record(recordT); +++ } else { +++ return_Record(localRecordT); +++ continue; +++ } +++ } else { +++ continue; +++ } +++ } else if(recordT != TailRecord || recordT->version != versionT) { +++ continue; +++ } else { +++ psly_addOneRecord(recordT); // 增長引用計數 +++ } //now the t and RecordT is match!; if(Tail != t) { psly_remove(recordT); continue; } NodeType* next = t->next; if(Tail != t) { psly_remove(recordT); continue; } if(next != NULL){ psly_remove(recordT); __sync_bool_compare_and_swap(&Tail, t, next); continue; } if(__sync_bool_compare_and_swap(&t->next, NULL, node)) { psly_remove(recordT); break; } psly_remove(recordT); } __sync_bool_compare_and_swap(&Tail, t, node); }
這裏的TailRecord伴隨Tail更新。
鏈表遍歷保留前驅:
當咱們查詢maplist時候,有可能會由於前驅節點的失效,而要從新在該list的head開始遍歷,假如鏈表過長會代價較大,因此咱們在遍歷過程當中維護些前驅節點可能會好點。
示例代碼以下:
保留:
if(currKey != key) { int bucket; if((steps & STEPS_) == 0 && (bucket = (steps >> STEPBIT)) < MAXPREV) { Prevs* step = &prevs_[bucket]; step->r = prev; step->rNext = prevNext; } ++steps; prev = curr; prevNext = currNext; }
失效以後啓用保留的前驅:
--steps; for(;;) { int bucket = steps >> STEPBIT; bucket = bucket < MAXPREV ? bucket: (MAXPREV - 1); Prevs* prevs = &prevs_[bucket]; prev = prevs->r; prevNext = prev->nextRecord; long prevNextKeep = prevs->rNext; if((prevNextKeep & NODEBITS) != (prevNext & NODEBITS) || (prevNext & REFCBITS) == DELETED) { steps -= STEPS; } else { prevs->rNext = prevNext; curr = idx_Record(prevNext); break; } } steps = steps & (~STEPS_);
這裏的steps記錄咱們目前所在的位置,STEPS表達咱們隔幾個節點記錄一次(極端狀況下能夠拷貝全部遍歷過的節點)。
批量獲取Record方式指的是,因爲獲取Record的競爭過於激烈,咱們再也不每次獲取一個,而是每次獲取一批,剩餘的做爲線程私有以後使用,維護好數據的 未使用/使用 狀態,以及做爲總體返回給資源庫。從而極大地減小了線程間的競爭。
固然,最好的是咱們再也不讓線程競爭Record了,咱們從新組織Record,每一個隊列尺寸小一點。對於一個全新的線程,直接給它一個Record隊列,這個隊列如同以前同樣是全局做用域。不夠用繼續獲取整個隊列,維護好單個Record元素的使用情況。
這樣以來,對於每一個線程而言,它擁有的就是Record[0],Record[2]....等整個隊列了。若是一來,每一個資源隊列都是被某個線程私有,即單讀多寫的隊列。極大地減小了競爭。
對於某些場景,許多小塊的共享數據同時產生,又能夠同時回收。咱們再也不爲每一個內存地址分配一個Record,咱們嘗試將一塊大內存的分割爲許多小內存來使用,如此一來小內存統一映射到大內存首地址的Record,直接省去了插入鏈表的操做。我須要對Record進行改造,retireBit再也不做爲一個元素使用,這裏能夠換成short
struct Record{ //Record庫維護字段 int volatile next ; int self; //數據字段 void* pointer; int refcount; //retire +++ short retireNum; long nextRecord; }
同時,咱們的psy_record接口增長一個參數:
recordT = psly_record(void** ppointer, void* pointer, Record* record, long nextRecord, short retireNum);
最後講一下,惟一須要注意的是,若是內存已經處於可回收狀態:
那麼咱們即可以當即回收內部,由於對於企圖使用該內存的線程而言,要麼正在遞增引用計數,要麼已經完成訪問。完成訪問的不要緊,根據咱們的設計,遞增引用計數的線程稍後會回退減一,從而再也不訪問這塊內存。
假如競爭激烈,那麼咱們再也不將引用計數彙集到一個refcount字段上,咱們能夠採用多個refcount[M]來分開計數,從而改善競爭狀況,有效提升吞吐量。
具體作法能夠參考Striped64。
假如咱們在開發一個併發數據結構,它自己將會被共享/動態開闢/回收,數據自己帶有指針,指針指向的數據隨着程序的執行變得不知足需求,從而咱們要從新配置這一數據,而且回收原數據。
這種狀況下咱們能不能正確回收全部數據呢?
答案是能夠的。
對於全部內存,若是知足
共享的 / 須要回收的 / 沒有明確的能夠安全回收內存的邏輯點
咱們都採用3RE&S Protocol提供的語義來回收內存。
由於
最後,咱們必須本身提供freeMemory函數用於先回收內部內存,再回收外部對象的內存。